You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/19 08:19:01 UTC

[1/2] TEZ-1495. ATS integration for TezClient (Prakash Ramachandran via bikas)

Repository: tez
Updated Branches:
  refs/heads/master 82ec16baf -> 4cf6472e3


http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 8057714..d9cafc7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,12 +53,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
   private TezCounters tezCounters;
   private boolean fromSummary = false;
   private VertexStats vertexStats;
+  private Map<String, Integer> vertexTaskStats;
 
-  public VertexFinishedEvent(TezVertexID vertexId,
-      String vertexName, long initRequestedTime, long initedTime,
-      long startRequestedTime, long startedTime, long finishTime,
-      VertexState state, String diagnostics, TezCounters counters,
-      VertexStats vertexStats) {
+  public VertexFinishedEvent(TezVertexID vertexId, String vertexName, long initRequestedTime,
+                             long initedTime, long startRequestedTime, long startedTime,
+                             long finishTime, VertexState state, String diagnostics,
+                             TezCounters counters, VertexStats vertexStats,
+                             Map<String, Integer> vertexTaskStats) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
     this.initRequestedTime = initRequestedTime;
@@ -69,6 +71,7 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
     this.diagnostics = diagnostics;
     this.tezCounters = counters;
     this.vertexStats = vertexStats;
+    this.vertexTaskStats = vertexTaskStats;
   }
 
   public VertexFinishedEvent() {
@@ -138,10 +141,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
         + ", status=" + state.name()
         + ", diagnostics=" + diagnostics
         + ", counters=" + ( tezCounters == null ? "null" :
-          tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " "))
-        + ", vertexStats=" + (vertexStats == null ? "null"
-              : vertexStats.toString());
+          tezCounters.toString().replaceAll("\\n", ", ").replaceAll("\\s+", " "))
+        + ", vertexStats=" + (vertexStats == null ? "null" : vertexStats.toString())
+        + ", vertexTaskStats=" + (vertexTaskStats == null ? "null" : vertexTaskStats.toString());
   }
 
   public TezVertexID getVertexID() {
@@ -176,6 +178,10 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
     return startTime;
   }
 
+  public Map<String, Integer> getVertexTaskStats() {
+    return vertexTaskStats;
+  }
+
   @Override
   public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
     VertexFinishStateProto finishStateProto =

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index 136c5f1..a8bd21e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TezVertexID;
@@ -105,4 +106,7 @@ public class VertexStartedEvent implements HistoryEvent {
     return startTime;
   }
 
+  public VertexState getVertexState() {
+    return VertexState.RUNNING;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 12dcf1c..a9987d6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.logging.impl;
 
+import org.apache.tez.common.ATSConstants;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
@@ -36,7 +37,6 @@ import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
deleted file mode 100644
index 0188a8e..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.utils;
-
-public class ATSConstants {
-
-  // TODO remove once YARN exposes proper constants
-
-  /* Top level keys */
-  public static final String ENTITY = "entity";
-  public static final String ENTITY_TYPE = "entitytype";
-  public static final String EVENTS = "events";
-  public static final String EVENT_TYPE = "eventtype";
-  public static final String TIMESTAMP = "ts";
-  public static final String EVENT_INFO = "eventinfo";
-  public static final String RELATED_ENTITIES = "relatedEntities";
-  public static final String PRIMARY_FILTERS = "primaryfilters";
-  public static final String SECONDARY_FILTERS = "secondaryfilters";
-  public static final String OTHER_INFO = "otherinfo";
-
-  /* Section for related entities */
-  public static final String APPLICATION_ID = "applicationId";
-  public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
-  public static final String CONTAINER_ID = "containerId";
-  public static final String NODE_ID = "nodeId";
-  public static final String USER = "user";
-
-  /* Keys used in other info */
-  public static final String APP_SUBMIT_TIME = "appSubmitTime";
-
-  /* Tez-specific info */
-  public static final String DAG_PLAN = "dagPlan";
-  public static final String DAG_NAME = "dagName";
-  public static final String VERTEX_NAME = "vertexName";
-  public static final String SCHEDULED_TIME = "scheduledTime";
-  public static final String INIT_REQUESTED_TIME = "initRequestedTime";
-  public static final String INIT_TIME = "initTime";
-  public static final String START_REQUESTED_TIME = "startRequestedTime";
-  public static final String START_TIME = "startTime";
-  public static final String FINISH_TIME = "endTime";
-  public static final String TIME_TAKEN = "timeTaken";
-  public static final String STATUS = "status";
-  public static final String DIAGNOSTICS = "diagnostics";
-  public static final String COUNTERS = "counters";
-  public static final String STATS = "stats";
-  public static final String NUM_TASKS = "numTasks";
-  public static final String PROCESSOR_CLASS_NAME = "processorClassName";
-  public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
-  public static final String COMPLETED_LOGS_URL = "completedLogsURL";
-  public static final String EXIT_STATUS = "exitStatus";
-
-  /* Counters-related keys */
-  public static final String COUNTER_GROUPS = "counterGroups";
-  public static final String COUNTER_GROUP_NAME = "counterGroupName";
-  public static final String COUNTER_GROUP_DISPLAY_NAME = "counterGroupDisplayName";
-  public static final String COUNTER_NAME = "counterName";
-  public static final String COUNTER_DISPLAY_NAME = "counterDisplayName";
-  public static final String COUNTER_VALUE = "counterValue";
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 232a3b2..cab3c83 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 9042a93..b278d8f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -572,7 +572,7 @@ public class TestVertexRecovery {
         vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
             "vertex1", initRequestedTime, initedTime, startRequestedTime,
             startTime, finishTime, VertexState.SUCCEEDED, "",
-            new TezCounters(), new VertexStats()));
+            new TezCounters(), new VertexStats(), null));
     assertEquals(finishTime, vertex1.finishTime);
     assertEquals(VertexState.SUCCEEDED, recoveredState);
     assertEquals(false, vertex1.recoveryCommitInProgress);
@@ -801,7 +801,7 @@ public class TestVertexRecovery {
     recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
         "vertex1", initRequestedTime, initedTime, initRequestedTime + 300L,
         initRequestedTime + 400L, initRequestedTime + 500L,
-        VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats()));
+        VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), null));
     assertEquals(VertexState.SUCCEEDED, recoveredState);
 
     vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index bcbe6f1..f52671c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -346,7 +346,7 @@ public class TestHistoryEventsProtoConversion {
           new VertexFinishedEvent(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
               "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
-              null, null, null);
+              null, null, null, null);
       VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
@@ -361,7 +361,7 @@ public class TestHistoryEventsProtoConversion {
           new VertexFinishedEvent(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
               "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
-              "diagnose", new TezCounters(), new VertexStats());
+              "diagnose", new TezCounters(), new VertexStats(), null);
       VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index f674fc0..2149053 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -127,7 +127,7 @@ public class TestHistoryEventJsonConversion {
         case VERTEX_FINISHED:
           event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
               random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
-              null, null, null);
+              null, null, null, null);
           break;
         case TASK_STARTED:
           event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index f078f1d..a81bdf4 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -19,9 +19,11 @@
 package org.apache.tez.dag.history.logging.ats;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.common.ATSConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -41,7 +43,6 @@ import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
 
 public class HistoryEventTimelineConversion {
@@ -253,6 +254,7 @@ public class HistoryEventTimelineConversion {
     atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
 
     atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString());
 
     return atsEntity;
   }
@@ -422,6 +424,13 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.STATS,
         DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
 
+    final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
+    if (vertexTaskStats != null) {
+      for(String key : vertexTaskStats.keySet()) {
+        atsEntity.addOtherInfo(key, vertexTaskStats.get(key));
+      }
+    }
+
     return atsEntity;
   }
 
@@ -464,6 +473,7 @@ public class HistoryEventTimelineConversion {
 
     atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
     atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString());
 
     return atsEntity;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index b04b8d4..d2e366d 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -127,7 +127,7 @@ public class TestHistoryEventTimelineConversion {
         case VERTEX_FINISHED:
           event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
               random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
-              null, null, null);
+              null, null, null, null);
           break;
         case TASK_STARTED:
           event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());


[2/2] git commit: TEZ-1495. ATS integration for TezClient (Prakash Ramachandran via bikas)

Posted by bi...@apache.org.
TEZ-1495. ATS integration for TezClient (Prakash Ramachandran via bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4cf6472e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4cf6472e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4cf6472e

Branch: refs/heads/master
Commit: 4cf6472e39018d8809a945f4ccb39155d8c03220
Parents: 82ec16b
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Sep 18 23:16:53 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Sep 18 23:18:07 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../java/org/apache/tez/client/TezClient.java   |   6 +-
 .../org/apache/tez/common/ATSConstants.java     |  94 ++++
 .../tez/dag/api/DAGNotRunningException.java     |  34 ++
 .../tez/dag/api/client/DAGClientImpl.java       | 477 ++++++++++++++++++
 .../dag/api/client/DAGClientTimelineImpl.java   | 501 +++++++++++++++++++
 .../apache/tez/dag/api/client/VertexStatus.java |   2 +-
 .../dag/api/client/rpc/DAGClientRPCImpl.java    | 228 +--------
 .../tez/dag/api/client/TestATSHttpClient.java   | 167 +++++++
 .../tez/dag/api/client/rpc/TestDAGClient.java   |  24 +-
 .../tez/dag/api/client/DAGClientHandler.java    |  25 +-
 .../java/org/apache/tez/dag/app/AppContext.java |   3 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   7 +
 .../org/apache/tez/dag/app/RecoveryParser.java  |   1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  27 +-
 .../tez/dag/history/events/DAGStartedEvent.java |   4 +
 .../dag/history/events/VertexFinishedEvent.java |  24 +-
 .../dag/history/events/VertexStartedEvent.java  |   4 +
 .../impl/HistoryEventJsonConversion.java        |   2 +-
 .../tez/dag/history/utils/ATSConstants.java     |  76 ---
 .../apache/tez/dag/history/utils/DAGUtils.java  |   1 +
 .../dag/app/dag/impl/TestVertexRecovery.java    |   4 +-
 .../TestHistoryEventsProtoConversion.java       |   4 +-
 .../impl/TestHistoryEventJsonConversion.java    |   2 +-
 .../ats/HistoryEventTimelineConversion.java     |  12 +-
 .../ats/TestHistoryEventTimelineConversion.java |   2 +-
 26 files changed, 1405 insertions(+), 328 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3ae8f59..c0c03f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,7 +19,6 @@ ALL CHANGES:
   TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
   TEZ-1524. Resolve user group information only if ACLs are enabled.
   TEZ-1581. GroupByOrderByMRRTest no longer functional.
-  TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless of DAG submission.
   TEZ-1157. Optimize broadcast shuffle to download data only once per host. 
 
 Release 0.5.1: Unreleased
@@ -45,6 +44,7 @@ ALL CHANGES
   TEZ-1533. Request Events more often if a complete set of events is received by a task.
   TEZ-1587. Some tez-examples fail in local mode.
   TEZ-1597. ImmediateStartVertexManager should handle corner case of vertex having zero tasks.
+  TEZ-1495. ATS integration for TezClient
 
 Release 0.5.0: 2014-09-03
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 5cec8b0..0955c20 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -54,7 +54,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRespo
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+import org.apache.tez.dag.api.client.DAGClientImpl;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -409,7 +409,7 @@ public class TezClient {
         + ", sessionName=" + clientName
         + ", applicationId=" + sessionAppId
         + ", dagName=" + dag.getName());
-    return new DAGClientRPCImpl(sessionAppId, dagId,
+    return new DAGClientImpl(sessionAppId, dagId,
         amConfig.getTezConfiguration(), frameworkClient);
   }
 
@@ -711,7 +711,7 @@ public class TezClient {
   static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
                                 FrameworkClient frameworkClient)
       throws IOException, TezException {
-    return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient);
+    return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient);
   }
 
   // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
new file mode 100644
index 0000000..065614c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+@Private
+public class ATSConstants {
+
+  // TODO remove once YARN exposes proper constants
+
+  /* Top level keys */
+  public static final String ENTITIES = "entities";
+  public static final String ENTITY = "entity";
+  public static final String ENTITY_TYPE = "entitytype";
+  public static final String EVENTS = "events";
+  public static final String EVENT_TYPE = "eventtype";
+  public static final String TIMESTAMP = "ts";
+  public static final String EVENT_INFO = "eventinfo";
+  public static final String RELATED_ENTITIES = "relatedEntities";
+  public static final String PRIMARY_FILTERS = "primaryfilters";
+  public static final String SECONDARY_FILTERS = "secondaryfilters";
+  public static final String OTHER_INFO = "otherinfo";
+
+  /* Section for related entities */
+  public static final String APPLICATION_ID = "applicationId";
+  public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
+  public static final String CONTAINER_ID = "containerId";
+  public static final String NODE_ID = "nodeId";
+  public static final String USER = "user";
+
+  /* Keys used in other info */
+  public static final String APP_SUBMIT_TIME = "appSubmitTime";
+
+  /* Tez-specific info */
+  public static final String DAG_PLAN = "dagPlan";
+  public static final String DAG_NAME = "dagName";
+  public static final String VERTEX_NAME = "vertexName";
+  public static final String SCHEDULED_TIME = "scheduledTime";
+  public static final String INIT_REQUESTED_TIME = "initRequestedTime";
+  public static final String INIT_TIME = "initTime";
+  public static final String START_REQUESTED_TIME = "startRequestedTime";
+  public static final String START_TIME = "startTime";
+  public static final String FINISH_TIME = "endTime";
+  public static final String TIME_TAKEN = "timeTaken";
+  public static final String STATUS = "status";
+  public static final String DIAGNOSTICS = "diagnostics";
+  public static final String COUNTERS = "counters";
+  public static final String STATS = "stats";
+  public static final String NUM_TASKS = "numTasks";
+  public static final String NUM_COMPLETED_TASKS = "numCompletedTasks";
+  public static final String NUM_SUCCEEDED_TASKS = "numSucceededTasks";
+  public static final String NUM_FAILED_TASKS = "numFailedTasks";
+  public static final String NUM_KILLED_TASKS = "numKilledTasks";
+  public static final String PROCESSOR_CLASS_NAME = "processorClassName";
+  public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
+  public static final String COMPLETED_LOGS_URL = "completedLogsURL";
+  public static final String EXIT_STATUS = "exitStatus";
+
+  /* Counters-related keys */
+  public static final String COUNTER_GROUPS = "counterGroups";
+  public static final String COUNTER_GROUP_NAME = "counterGroupName";
+  public static final String COUNTER_GROUP_DISPLAY_NAME = "counterGroupDisplayName";
+  public static final String COUNTER_NAME = "counterName";
+  public static final String COUNTER_DISPLAY_NAME = "counterDisplayName";
+  public static final String COUNTER_VALUE = "counterValue";
+
+  /* Url related */
+  public static final String RESOURCE_URI_BASE = "/ws/v1/timeline";
+  public static final String TEZ_DAG_ID = "TEZ_DAG_ID";
+  public static final String TEZ_VERTEX_ID = "TEZ_VERTEX_ID";
+
+  /* In Yarn but not present in 2.2 */
+  public static final String TIMELINE_SERVICE_WEBAPP_HTTP_ADDRESS_CONF_NAME =
+      "yarn.timeline-service.webapp.address";
+  public static final String TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME =
+      "yarn.timeline-service.webapp.https.address";
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java
new file mode 100644
index 0000000..cbc93a9
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ *  Checked Exception thrown upon error
+ */
+@Private
+public class DAGNotRunningException extends TezException {
+  private static final long serialVersionUID = 6337442733802964448L;
+  public DAGNotRunningException(Throwable cause) { super(cause); }
+  public DAGNotRunningException(String message) { super(message); }
+  public DAGNotRunningException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
new file mode 100644
index 0000000..0c8ef1a
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.client.FrameworkClient;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAGNotRunningException;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+import org.apache.tez.dag.api.records.DAGProtos;
+
+@Private
+public class DAGClientImpl extends DAGClient {
+  private static final Log LOG = LogFactory.getLog(DAGClientImpl.class);
+
+  private final ApplicationId appId;
+  private final String dagId;
+  private final TezConfiguration conf;
+  private final FrameworkClient frameworkClient;
+
+  @VisibleForTesting
+  protected DAGClient realClient;
+  private boolean dagCompleted = false;
+  private boolean isATSEnabled = false;
+  private DAGStatus cachedDagStatus = null;
+  Map<String, VertexStatus> cachedVertexStatus = new HashMap<String, VertexStatus>();
+
+  private static final long SLEEP_FOR_COMPLETION = 500;
+  private static final long PRINT_STATUS_INTERVAL_MILLIS = 5000;
+  private final DecimalFormat formatter = new DecimalFormat("###.##%");
+  private long lastPrintStatusTimeMillis;
+  private EnumSet<VertexStatus.State> vertexCompletionStates = EnumSet.of(
+      VertexStatus.State.SUCCEEDED, VertexStatus.State.FAILED, VertexStatus.State.KILLED,
+      VertexStatus.State.ERROR);
+
+  public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration conf,
+                       @Nullable FrameworkClient frameworkClient) {
+    this.appId = appId;
+    this.dagId = dagId;
+    this.conf = conf;
+    if (frameworkClient != null &&
+        conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) {
+      this.frameworkClient = frameworkClient;
+    } else {
+      this.frameworkClient = FrameworkClient.createFrameworkClient(conf);
+      this.frameworkClient.init(conf, new YarnConfiguration(conf));
+      this.frameworkClient.start();
+    }
+    isATSEnabled = conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "")
+            .equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService");
+
+    if (UserGroupInformation.isSecurityEnabled()){
+      //TODO: enable ATS integration in kerberos secured cluster - see TEZ-1529
+      isATSEnabled = false;
+    }
+
+    realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
+  }
+
+  @Override
+  public String getExecutionContext() {
+    return realClient.getExecutionContext();
+  }
+
+  @Override
+  protected ApplicationReport getApplicationReportInternal() {
+    return realClient.getApplicationReportInternal();
+  }
+
+  @Override
+  public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions) throws
+      TezException, IOException {
+
+    if (!dagCompleted) {
+      // fetch from AM. on Error and while DAG is still not completed (could not reach AM, AM got
+      // killed). return cached status. This prevents the progress being reset (for ex fetching from
+      // RM does not give status).
+      final DAGStatus dagStatus = getDAGStatusViaAM(statusOptions);
+
+      if (!dagCompleted) {
+        if (dagStatus != null) {
+          cachedDagStatus = dagStatus;
+          return dagStatus;
+        }
+        if (cachedDagStatus != null) {
+          // could not get from AM (not reachable/ was killed). return cached status.
+          return cachedDagStatus;
+        }
+      }
+
+      if (isATSEnabled && dagCompleted) {
+        switchToTimelineClient();
+      }
+    }
+
+    if (isATSEnabled && dagCompleted) {
+      try {
+        // fetch from ATS and return only if status is completed.
+        DAGStatus dagStatus = realClient.getDAGStatus(statusOptions);
+        if (dagStatus.isCompleted()) {
+          return dagStatus;
+        }
+      } catch (TezException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.info("DAGStatus fetch failed." + e.getMessage());
+        }
+      }
+    }
+
+    // dag completed and Timeline service is either not enabled or does not have completion status
+    // return cached status if completion info is present.
+    if (dagCompleted && cachedDagStatus != null && cachedDagStatus.isCompleted()) {
+      return cachedDagStatus;
+    }
+
+    // everything else fails rely on RM.
+    return getDAGStatusViaRM();
+  }
+
+  @Override
+  public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) throws
+      IOException, TezException {
+
+    if (!dagCompleted) {
+      VertexStatus vertexStatus = getVertexStatusViaAM(vertexName, statusOptions);
+
+      if (!dagCompleted) {
+        if (vertexStatus != null) {
+          cachedVertexStatus.put(vertexName, vertexStatus);
+          return vertexStatus;
+        }
+        if (cachedVertexStatus.containsKey(vertexName)) {
+          return cachedVertexStatus.get(vertexName);
+        }
+      }
+
+      if (isATSEnabled && dagCompleted) {
+        switchToTimelineClient();
+      }
+    }
+
+    if (isATSEnabled && dagCompleted) {
+      try {
+        final VertexStatus vertexStatus = realClient.getVertexStatus(vertexName, statusOptions);
+        if (vertexCompletionStates.contains(vertexStatus.getState())) {
+          return vertexStatus;
+        }
+      } catch (TezException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("ERROR fetching vertex data from Yarn Timeline. " + e.getMessage());
+        }
+      }
+    }
+
+    if (cachedVertexStatus.containsKey(vertexName)) {
+      final VertexStatus vertexStatus = cachedVertexStatus.get(vertexName);
+      if (vertexCompletionStates.contains(vertexStatus.getState())) {
+        return vertexStatus;
+      }
+    }
+
+    return null;
+  }
+
+  @Override
+  public void tryKillDAG() throws IOException, TezException {
+    if (!dagCompleted) {
+      realClient.tryKillDAG();
+    } else {
+      LOG.info("TryKill for app: " + appId + " dag:" + dagId + " dag already completed.");
+    }
+  }
+
+  @Override
+  public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException {
+    return _waitForCompletionWithStatusUpdates(false, EnumSet.noneOf(StatusGetOpts.class));
+  }
+
+  @Override
+  public DAGStatus waitForCompletionWithStatusUpdates(
+      @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException,
+      InterruptedException {
+    return _waitForCompletionWithStatusUpdates(true, statusGetOpts);
+  }
+
+  @Override
+  public void close() throws IOException {
+    realClient.close();
+    if (frameworkClient != null) {
+      frameworkClient.stop();
+    }
+  }
+
+  private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions) throws
+      IOException {
+    DAGStatus dagStatus = null;
+    try {
+      dagStatus = realClient.getDAGStatus(statusOptions);
+    } catch (DAGNotRunningException e) {
+      dagCompleted = true;
+    } catch (TezException e) {
+      // can be either due to a n/w issue of due to AM completed.
+    }
+
+    if (dagStatus == null && !dagCompleted) {
+      checkAndSetDagCompletionStatus();
+    }
+
+    return dagStatus;
+  }
+
+  private VertexStatus getVertexStatusViaAM(String vertexName, Set<StatusGetOpts> statusOptions) throws
+      IOException {
+    VertexStatus vertexStatus = null;
+    try {
+      vertexStatus = realClient.getVertexStatus(vertexName, statusOptions);
+    } catch (DAGNotRunningException e) {
+      dagCompleted = true;
+    } catch (TezException e) {
+      // can be either due to a n/w issue of due to AM completed.
+    }
+
+    if (vertexStatus == null && !dagCompleted) {
+      checkAndSetDagCompletionStatus();
+    }
+
+    return vertexStatus;
+  }
+
+  DAGStatus getDAGStatusViaRM() throws TezException, IOException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
+    }
+    ApplicationReport appReport;
+    try {
+      appReport = frameworkClient.getApplicationReport(appId);
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+
+    if(appReport == null) {
+      throw new TezException("Unknown/Invalid appId: " + appId);
+    }
+
+    DAGProtos.DAGStatusProto.Builder builder = DAGProtos.DAGStatusProto.newBuilder();
+    DAGStatus dagStatus = new DAGStatus(builder);
+    DAGProtos.DAGStatusStateProto dagState;
+    switch (appReport.getYarnApplicationState()) {
+      case NEW:
+      case NEW_SAVING:
+      case SUBMITTED:
+      case ACCEPTED:
+        dagState = DAGProtos.DAGStatusStateProto.DAG_SUBMITTED;
+        break;
+      case RUNNING:
+        dagState = DAGProtos.DAGStatusStateProto.DAG_RUNNING;
+        break;
+      case FAILED:
+        dagState = DAGProtos.DAGStatusStateProto.DAG_FAILED;
+        break;
+      case KILLED:
+        dagState = DAGProtos.DAGStatusStateProto.DAG_KILLED;
+        break;
+      case FINISHED:
+        switch(appReport.getFinalApplicationStatus()) {
+          case UNDEFINED:
+          case FAILED:
+            dagState = DAGProtos.DAGStatusStateProto.DAG_FAILED;
+            break;
+          case KILLED:
+            dagState = DAGProtos.DAGStatusStateProto.DAG_KILLED;
+            break;
+          case SUCCEEDED:
+            dagState = DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED;
+            break;
+          default:
+            throw new TezUncheckedException("Encountered unknown final application"
+                + " status from YARN"
+                + ", appState=" + appReport.getYarnApplicationState()
+                + ", finalStatus=" + appReport.getFinalApplicationStatus());
+        }
+        break;
+      default:
+        throw new TezUncheckedException("Encountered unknown application state"
+            + " from YARN, appState=" + appReport.getYarnApplicationState());
+    }
+
+    builder.setState(dagState);
+    if(appReport.getDiagnostics() != null) {
+      builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics()));
+    }
+
+    return dagStatus;
+  }
+
+  private DAGStatus _waitForCompletionWithStatusUpdates(boolean vertexUpdates,
+                                                        @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException {
+    DAGStatus dagStatus;
+    boolean initPrinted = false;
+    boolean runningPrinted = false;
+    double dagProgress = -1.0; // Print the first one
+    // monitoring
+    while (true) {
+      dagStatus = getDAGStatus(statusGetOpts);
+      if (!initPrinted
+          && (dagStatus.getState() == DAGStatus.State.INITING || dagStatus.getState() == DAGStatus.State.SUBMITTED)) {
+        initPrinted = true; // Print once
+        log("Waiting for DAG to start running");
+      }
+      if (dagStatus.getState() == DAGStatus.State.RUNNING
+          || dagStatus.getState() == DAGStatus.State.SUCCEEDED
+          || dagStatus.getState() == DAGStatus.State.FAILED
+          || dagStatus.getState() == DAGStatus.State.KILLED
+          || dagStatus.getState() == DAGStatus.State.ERROR) {
+        break;
+      }
+      Thread.sleep(SLEEP_FOR_COMPLETION);
+    }// End of while(true)
+
+    Set<String> vertexNames = Collections.emptySet();
+    while (!dagStatus.isCompleted()) {
+      if (!runningPrinted) {
+        log("DAG initialized: CurrentState=Running");
+        runningPrinted = true;
+      }
+      if (vertexUpdates && vertexNames.isEmpty()) {
+        vertexNames = getDAGStatus(statusGetOpts).getVertexProgress().keySet();
+      }
+      dagProgress = monitorProgress(vertexNames, dagProgress, null, dagStatus);
+      Thread.sleep(SLEEP_FOR_COMPLETION);
+      dagStatus = getDAGStatus(statusGetOpts);
+    }// end of while
+    // Always print the last status irrespective of progress change
+    monitorProgress(vertexNames, -1.0, statusGetOpts, dagStatus);
+    log("DAG completed. " + "FinalState=" + dagStatus.getState());
+    return dagStatus;
+  }
+
+  private double monitorProgress(Set<String> vertexNames, double prevDagProgress,
+                                 Set<StatusGetOpts> opts, DAGStatus dagStatus) throws IOException, TezException {
+    Progress progress = dagStatus.getDAGProgress();
+    double dagProgress = prevDagProgress;
+    if (progress != null) {
+      dagProgress = getProgress(progress);
+      boolean progressChanged = dagProgress > prevDagProgress;
+      long currentTimeMillis = System.currentTimeMillis();
+      long timeSinceLastPrintStatus =  currentTimeMillis - lastPrintStatusTimeMillis;
+      boolean printIntervalExpired = timeSinceLastPrintStatus > PRINT_STATUS_INTERVAL_MILLIS;
+      if (progressChanged || printIntervalExpired) {
+        lastPrintStatusTimeMillis = currentTimeMillis;
+        printDAGStatus(vertexNames, opts, dagStatus, progress);
+      }
+    }
+
+    return dagProgress;
+  }
+
+  private void printDAGStatus(Set<String> vertexNames, Set<StatusGetOpts> opts,
+                              DAGStatus dagStatus, Progress dagProgress) throws IOException, TezException {
+    double vProgressFloat = 0.0f;
+    log("DAG: State: " + dagStatus.getState() + " Progress: "
+        + formatter.format(getProgress(dagProgress)) + " " + dagProgress);
+    boolean displayCounter = opts != null && opts.contains(StatusGetOpts.GET_COUNTERS);
+    if (displayCounter) {
+      TezCounters counters = dagStatus.getDAGCounters();
+      if (counters != null) {
+        log("DAG Counters:\n" + counters);
+      }
+    }
+    for (String vertex : vertexNames) {
+      VertexStatus vStatus = getVertexStatus(vertex, opts);
+      if (vStatus == null) {
+        log("Could not retrieve status for vertex: " + vertex);
+        continue;
+      }
+      Progress vProgress = vStatus.getProgress();
+      if (vProgress != null) {
+        vProgressFloat = 0.0f;
+        if (vProgress.getTotalTaskCount() == 0) {
+          vProgressFloat = 1.0f;
+        } else if (vProgress.getTotalTaskCount() > 0) {
+          vProgressFloat = getProgress(vProgress);
+        }
+        log("\tVertexStatus:" + " VertexName: " + vertex + " Progress: "
+            + formatter.format(vProgressFloat) + " " + vProgress);
+      }
+      if (displayCounter) {
+        TezCounters counters = vStatus.getVertexCounters();
+        if (counters != null) {
+          log("Vertex Counters for " + vertex + ":\n" + counters);
+        }
+      }
+    } // end of for loop
+  }
+
+  private void checkAndSetDagCompletionStatus() {
+    ApplicationReport appReport = realClient.getApplicationReportInternal();
+    if (appReport != null) {
+      final YarnApplicationState appState = appReport.getYarnApplicationState();
+      if (appState == YarnApplicationState.FINISHED || appState == YarnApplicationState.FAILED ||
+          appState == YarnApplicationState.KILLED) {
+        dagCompleted = true;
+      }
+    }
+  }
+
+  private ApplicationReport getApplicationReport() {
+    ApplicationReport appReport = null;
+    try {
+      appReport = frameworkClient.getApplicationReport(appId);
+    } catch (YarnException e) {
+      // do nothing
+    } catch (IOException e) {
+      // do nothing
+    }
+    return appReport;
+  }
+
+  private void switchToTimelineClient() throws IOException, TezException {
+    realClient.close();
+    realClient = new DAGClientTimelineImpl(appId, dagId, conf, frameworkClient);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("dag completed switching to DAGClientTimelineImpl");
+    }
+  }
+
+  @VisibleForTesting
+  public DAGClient getRealClient() {
+    return realClient;
+  }
+
+  private double getProgress(Progress progress) {
+    return (progress.getTotalTaskCount() == 0 ? 0.0 : (double) (progress.getSucceededTaskCount())
+        / progress.getTotalTaskCount());
+  }
+
+  private void log(String message) {
+    LOG.info(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
new file mode 100644
index 0000000..57453aa
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -0,0 +1,501 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.client.FrameworkClient;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
+import org.apache.tez.dag.api.records.DAGProtos.ProgressProto;
+import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCounterGroupProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
+import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos.VertexStatusStateProto;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+
+@Private
+public class DAGClientTimelineImpl extends DAGClient {
+  private static final Log LOG = LogFactory.getLog(DAGClientTimelineImpl.class);
+
+  private static final String FILTER_BY_FIELDS = "primaryfilters,otherinfo";
+  private static final String HTTPS_SCHEME = "https://";
+  private static final String HTTP_SCHEME = "http://";
+  private static Client httpClient = null;
+  private final ApplicationId appId;
+  private final String dagId;
+  private final TezConfiguration conf;
+  private final FrameworkClient frameworkClient;
+
+  private Map<String, VertexTaskStats> vertexTaskStatsCache = null;
+
+  @VisibleForTesting
+  protected String baseUri;
+
+  public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf,
+                               FrameworkClient frameworkClient)
+      throws TezException {
+    this.appId = appId;
+    this.dagId = dagId;
+    this.conf = conf;
+    this.frameworkClient = frameworkClient;
+
+    String scheme;
+    String webAppAddress;
+    if (webappHttpsOnly(conf)) {
+      scheme = HTTPS_SCHEME;
+      webAppAddress = conf.get(ATSConstants.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME);
+    } else {
+      scheme = HTTP_SCHEME;
+      webAppAddress = conf.get(ATSConstants.TIMELINE_SERVICE_WEBAPP_HTTP_ADDRESS_CONF_NAME);
+    }
+    if (webAppAddress == null) {
+      throw new TezException("Failed to get ATS webapp address");
+    }
+
+    baseUri = Joiner.on("").join(scheme, webAppAddress, ATSConstants.RESOURCE_URI_BASE);
+  }
+
+
+  @Override
+  public String getExecutionContext() {
+    return "Executing on YARN cluster with App id " + appId;
+  }
+
+  @Override
+  protected ApplicationReport getApplicationReportInternal() {
+    ApplicationReport appReport = null;
+    try {
+      appReport = frameworkClient.getApplicationReport(appId);
+    } catch (YarnException e) {
+    } catch (IOException e) {
+    }
+    return appReport;
+  }
+
+  @Override
+  public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions)
+      throws IOException, TezException {
+    final String url = String.format("%s/%s/%s?fields=%s", baseUri, ATSConstants.TEZ_DAG_ID, dagId,
+        FILTER_BY_FIELDS);
+    try {
+      DAGStatusProto.Builder statusBuilder;
+      final JSONObject jsonRoot = getJsonRootEntity(url);
+
+      statusBuilder = parseDagStatus(jsonRoot, statusOptions);
+      if (statusBuilder == null) {
+        throw new TezException("Failed to get DagStatus from ATS");
+      }
+
+      return new DAGStatus(statusBuilder);
+    } catch (JSONException je) {
+      throw new TezException("Failed to parse DagStatus json from YARN Timeline", je);
+    }
+  }
+
+  @Override
+  public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions)
+      throws IOException, TezException {
+    final String url = String.format(
+        "%s/%s?primaryFilter=%s:%s&secondaryFilter=vertexName:%s&fields=%s", baseUri,
+        ATSConstants.TEZ_VERTEX_ID, ATSConstants.TEZ_DAG_ID, dagId, vertexName, FILTER_BY_FIELDS);
+
+    try {
+      VertexStatusProto.Builder statusBuilder;
+      final JSONObject jsonRoot = getJsonRootEntity(url);
+      JSONArray entitiesNode = jsonRoot.optJSONArray(ATSConstants.ENTITIES);
+      if (entitiesNode == null || entitiesNode.length() != 1) {
+        throw new TezException("Failed to get vertex status YARN Timeline");
+      }
+      JSONObject vertexNode = entitiesNode.getJSONObject(0);
+
+      statusBuilder = parseVertexStatus(vertexNode, statusOptions);
+      if (statusBuilder == null) {
+        throw new TezException("Failed to parse vertex status from YARN Timeline");
+      }
+
+      return new VertexStatus(statusBuilder);
+    } catch (JSONException je) {
+      throw new TezException("Failed to parse VertexStatus json from YARN Timeline", je);
+    }
+  }
+
+  @Override
+  public void tryKillDAG() throws IOException, TezException {
+    throw new TezException("tryKillDAG is unsupported for DAGClientTimelineImpl");
+  }
+
+  @Override
+  public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException {
+    return getDAGStatus(null);
+  }
+
+  @Override
+  public DAGStatus waitForCompletionWithStatusUpdates(
+      @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException,
+      InterruptedException {
+    return getDAGStatus(statusGetOpts);
+  }
+
+ @Override
+  public void close() throws IOException {
+    if (httpClient != null) {
+      httpClient.destroy();
+      httpClient = null;
+    }
+  }
+
+  private DAGStatusProto.Builder parseDagStatus(JSONObject jsonRoot, Set<StatusGetOpts> statusOptions)
+      throws JSONException, TezException {
+    final JSONObject otherInfoNode = jsonRoot.getJSONObject(ATSConstants.OTHER_INFO);
+
+    DAGStatusProto.Builder dagStatusBuilder = DAGStatusProto.newBuilder();
+
+    final String status = otherInfoNode.optString(ATSConstants.STATUS);
+    final String diagnostics = otherInfoNode.optString(ATSConstants.DIAGNOSTICS);
+    if (status.equals("")) {
+      return null;
+    }
+
+    dagStatusBuilder.setState(dagStateProtoMap.get(status))
+        .addAllDiagnostics(Collections.singleton(diagnostics));
+
+    if (statusOptions != null && statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
+      final TezCountersProto.Builder tezCounterBuilder;
+      final JSONObject countersNode = otherInfoNode.optJSONObject(ATSConstants.COUNTERS);
+      tezCounterBuilder = parseDagCounters(countersNode);
+      if (tezCounterBuilder != null) {
+        dagStatusBuilder.setDagCounters(tezCounterBuilder);
+      }
+    }
+
+    final Map<String, VertexTaskStats> vertexTaskStatsMap = parseTaskStatsForVertexes();
+    if (vertexTaskStatsMap.size() > 0) {
+      ProgressProto.Builder dagProgressBuilder = getProgressBuilder(vertexTaskStatsMap, null);
+      dagStatusBuilder.setDAGProgress(dagProgressBuilder);
+
+      List<StringProgressPairProto> vertexProgressBuilder =
+          new ArrayList<StringProgressPairProto>(vertexTaskStatsMap.size());
+      for (Map.Entry<String, VertexTaskStats> v : vertexTaskStatsMap.entrySet()) {
+        StringProgressPairProto vertexProgressProto = StringProgressPairProto
+            .newBuilder()
+            .setKey(v.getKey())
+            .setProgress(getProgressBuilder(vertexTaskStatsMap, v.getKey()))
+            .build();
+        vertexProgressBuilder.add(vertexProgressProto);
+      }
+      dagStatusBuilder.addAllVertexProgress(vertexProgressBuilder);
+    }
+
+    return dagStatusBuilder;
+  }
+
+  private ProgressProto.Builder getProgressBuilder(Map<String, VertexTaskStats> vertexTaskStatsMap,
+                                                   String vertexName) {
+    int failedTaskCount = 0;
+    int killedTaskCount = 0;
+    int runningTaskCount = 0;
+    int succeededTaskCount = 0;
+    int totalCount = 0;
+
+    for (Map.Entry<String, VertexTaskStats> v : vertexTaskStatsMap.entrySet()) {
+      if (vertexName == null || vertexName.equals(v.getKey())) {
+        final VertexTaskStats taskStats = v.getValue();
+        totalCount += taskStats.numTaskCount;
+        succeededTaskCount += taskStats.succeededTaskCount;
+        killedTaskCount += taskStats.killedTaskCount;
+        failedTaskCount += taskStats.failedTaskCount;
+        runningTaskCount += (taskStats.numTaskCount - taskStats.completedTaskCount);
+      }
+    }
+
+    ProgressProto.Builder progressBuilder = ProgressProto.newBuilder();
+    progressBuilder.setTotalTaskCount(totalCount);
+    progressBuilder.setRunningTaskCount(runningTaskCount);
+    progressBuilder.setSucceededTaskCount(succeededTaskCount);
+    progressBuilder.setKilledTaskCount(killedTaskCount);
+    progressBuilder.setFailedTaskCount(failedTaskCount);
+    return progressBuilder;
+  }
+
+  private VertexStatusProto.Builder parseVertexStatus(JSONObject jsonRoot,
+                                                      Set<StatusGetOpts> statusOptions)
+      throws JSONException {
+    final JSONObject otherInfoNode = jsonRoot.getJSONObject(ATSConstants.OTHER_INFO);
+    final VertexStatusProto.Builder vertexStatusBuilder = VertexStatusProto.newBuilder();
+
+    final String status = otherInfoNode.optString(ATSConstants.STATUS);
+    final String diagnostics = otherInfoNode.optString(ATSConstants.DIAGNOSTICS);
+    if (status.equals("")) {
+      return null;
+    }
+
+    vertexStatusBuilder.setState(vertexStateProtoMap.get(status))
+        .addAllDiagnostics(Collections.singleton(diagnostics));
+
+    int numRunningTasks = otherInfoNode.optInt(ATSConstants.NUM_TASKS) -
+        otherInfoNode.optInt(ATSConstants.NUM_COMPLETED_TASKS);
+    ProgressProto.Builder progressBuilder = ProgressProto.newBuilder();
+    progressBuilder.setTotalTaskCount(otherInfoNode.optInt(ATSConstants.NUM_TASKS));
+    progressBuilder.setRunningTaskCount(numRunningTasks);
+    progressBuilder.setSucceededTaskCount(otherInfoNode.optInt(ATSConstants.NUM_SUCCEEDED_TASKS));
+    progressBuilder.setKilledTaskCount(otherInfoNode.optInt(ATSConstants.NUM_KILLED_TASKS));
+    progressBuilder.setFailedTaskCount(otherInfoNode.optInt(ATSConstants.NUM_FAILED_TASKS));
+    vertexStatusBuilder.setProgress(progressBuilder);
+
+    if (statusOptions != null && statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
+      final TezCountersProto.Builder tezCounterBuilder;
+      final JSONObject countersNode = otherInfoNode.optJSONObject(ATSConstants.COUNTERS);
+      tezCounterBuilder = parseDagCounters(countersNode);
+      if (tezCounterBuilder != null) {
+        vertexStatusBuilder.setVertexCounters(tezCounterBuilder);
+      }
+    }
+
+    return vertexStatusBuilder;
+  }
+
+  private TezCountersProto.Builder parseDagCounters(JSONObject countersNode)
+      throws JSONException {
+    if (countersNode == null) {
+      return null;
+    }
+
+    TezCountersProto.Builder countersProto = TezCountersProto.newBuilder();
+    final JSONArray counterGroupNodes = countersNode.optJSONArray(ATSConstants.COUNTER_GROUPS);
+    if (counterGroupNodes != null) {
+      final int numCounterGroups = counterGroupNodes.length();
+
+      for (int i = 0; i < numCounterGroups; i++) {
+        TezCounterGroupProto.Builder counterGroupBuilder =
+            parseCounterGroup(counterGroupNodes.optJSONObject(i));
+        if (counterGroupBuilder != null) {
+          countersProto.addCounterGroups(counterGroupBuilder);
+        }
+      }
+    }
+
+    return countersProto;
+  }
+
+  private TezCounterGroupProto.Builder parseCounterGroup(JSONObject counterGroupNode)
+      throws JSONException {
+
+    if (counterGroupNode == null) {
+      return null;
+    }
+
+    TezCounterGroupProto.Builder counterGroup = TezCounterGroupProto.newBuilder();
+
+    final String groupName = counterGroupNode.optString(ATSConstants.COUNTER_GROUP_NAME);
+    final String groupDisplayName = counterGroupNode.optString(
+        ATSConstants.COUNTER_GROUP_DISPLAY_NAME);
+    final JSONArray counterNodes = counterGroupNode.optJSONArray(ATSConstants.COUNTERS);
+    final int numCounters = counterNodes.length();
+
+    List<TezCounterProto> counters = new ArrayList<TezCounterProto>(numCounters);
+
+    for (int i = 0; i < numCounters; i++) {
+      final JSONObject counterNode = counterNodes.getJSONObject(i);
+      final String counterName = counterNode.getString(ATSConstants.COUNTER_NAME);
+      final String counterDisplayName = counterNode.getString(ATSConstants.COUNTER_DISPLAY_NAME);
+      final long counterValue = counterNode.getLong(ATSConstants.COUNTER_VALUE);
+
+      counters.add(
+          TezCounterProto.newBuilder()
+              .setName(counterName)
+              .setDisplayName(counterDisplayName)
+              .setValue(counterValue)
+              .build());
+    }
+
+    return counterGroup.setName(groupName)
+        .setDisplayName(groupDisplayName)
+        .addAllCounters(counters);
+  }
+
+  @VisibleForTesting
+  protected Map<String, VertexTaskStats> parseTaskStatsForVertexes()
+      throws TezException, JSONException {
+
+    if (vertexTaskStatsCache == null) {
+      final String url = String.format("%s/%s?primaryFilter=%s:%s&fields=%s", baseUri,
+          ATSConstants.TEZ_VERTEX_ID, ATSConstants.TEZ_DAG_ID, dagId, FILTER_BY_FIELDS);
+
+      final JSONObject jsonRoot = getJsonRootEntity(url);
+      final JSONArray vertexNodes = jsonRoot.optJSONArray(ATSConstants.ENTITIES);
+
+      if (vertexNodes != null) {
+        final int numVertexNodes = vertexNodes.length();
+        Map<String, VertexTaskStats> vertexTaskStatsMap =
+            new HashMap<String, VertexTaskStats>(numVertexNodes);
+        for (int i = 0; i < numVertexNodes; i++) {
+          final JSONObject vertexNode = vertexNodes.getJSONObject(i);
+          final JSONObject otherInfoNode = vertexNode.getJSONObject(ATSConstants.OTHER_INFO);
+          final String vertexName = otherInfoNode.getString(ATSConstants.VERTEX_NAME);
+          final VertexTaskStats vertexTaskStats =
+              new VertexTaskStats(otherInfoNode.optInt(ATSConstants.NUM_TASKS),
+                  otherInfoNode.optInt(ATSConstants.NUM_COMPLETED_TASKS),
+                  otherInfoNode.optInt(ATSConstants.NUM_SUCCEEDED_TASKS),
+                  otherInfoNode.optInt(ATSConstants.NUM_KILLED_TASKS),
+                  otherInfoNode.optInt(ATSConstants.NUM_FAILED_TASKS));
+          vertexTaskStatsMap.put(vertexName, vertexTaskStats);
+        }
+        vertexTaskStatsCache = vertexTaskStatsMap;
+      }
+    }
+    return vertexTaskStatsCache;
+  }
+
+  @VisibleForTesting
+  protected JSONObject getJsonRootEntity(String url) throws TezException {
+    try {
+      WebResource wr = getHttpClient().resource(url);
+      ClientResponse response = wr.accept(MediaType.APPLICATION_JSON_TYPE)
+          .type(MediaType.APPLICATION_JSON_TYPE)
+          .get(ClientResponse.class);
+
+      if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
+        throw new TezException("Failed to get response from YARN Timeline: url: " + url);
+      }
+
+      return response.getEntity(JSONObject.class);
+    } catch (ClientHandlerException e) {
+      throw new TezException("Error processing response from YARN Timeline", e);
+    } catch (UniformInterfaceException e) {
+      throw new TezException("Error accessing content from YARN Timeline - unexpected response", e);
+    } catch (IllegalArgumentException e) {
+      throw new TezException("Error accessing content from YARN Timeline - invalid url", e);
+    }
+  }
+
+ @VisibleForTesting
+  protected class VertexTaskStats {
+    final int numTaskCount;
+    final int completedTaskCount;
+    final int succeededTaskCount;
+    final int killedTaskCount;
+    final int failedTaskCount;
+
+    public VertexTaskStats(int numTaskCount, int completedTaskCount, int succeededTaskCount,
+                           int killedTaskCount, int failedTaskCount) {
+      this.numTaskCount = numTaskCount;
+      this.completedTaskCount = completedTaskCount;
+      this.succeededTaskCount = succeededTaskCount;
+      this.killedTaskCount = killedTaskCount;
+      this.failedTaskCount = failedTaskCount;
+    }
+  }
+
+  private boolean webappHttpsOnly(Configuration conf) throws TezException {
+    try {
+      Class<?> yarnConfiguration = Class.forName("org.apache.hadoop.yarn.conf.YarnConfiguration");
+      final Method useHttps = yarnConfiguration.getMethod("useHttps", Configuration.class);
+      return (Boolean)useHttps.invoke(null, conf);
+    } catch (ReflectiveOperationException e) {
+      throw new TezException("error accessing yarn configuration", e);
+    }
+  }
+
+  protected Client getHttpClient() {
+    if (httpClient == null) {
+      ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
+      HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
+      httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
+    }
+    return httpClient;
+  }
+
+  private static final Map<String, DAGStatusStateProto> dagStateProtoMap =
+      Collections.unmodifiableMap(new HashMap<String, DAGStatusStateProto>() {{
+        put("NEW", DAGStatusStateProto.DAG_SUBMITTED);
+        put("INITED", DAGStatusStateProto.DAG_SUBMITTED);
+        put("RUNNING", DAGStatusStateProto.DAG_RUNNING);
+        put("SUCCEEDED", DAGStatusStateProto.DAG_SUCCEEDED);
+        put("FAILED", DAGStatusStateProto.DAG_FAILED);
+        put("KILLED", DAGStatusStateProto.DAG_KILLED);
+        put("ERROR", DAGStatusStateProto.DAG_ERROR);
+        put("TERMINATING", DAGStatusStateProto.DAG_TERMINATING);
+  }});
+
+  private static final Map<String, VertexStatusStateProto> vertexStateProtoMap =
+      Collections.unmodifiableMap(new HashMap<String, VertexStatusStateProto>() {{
+        put("NEW", VertexStatusStateProto.VERTEX_NEW);
+        put("INITIALIZING", VertexStatusStateProto.VERTEX_INITIALIZING);
+        put("RECOVERING", VertexStatusStateProto.VERTEX_RECOVERING);
+        put("INITED", VertexStatusStateProto.VERTEX_INITED);
+        put("RUNNING", VertexStatusStateProto.VERTEX_RUNNING);
+        put("SUCCEEDED", VertexStatusStateProto.VERTEX_SUCCEEDED);
+        put("FAILED", VertexStatusStateProto.VERTEX_FAILED);
+        put("KILLED", VertexStatusStateProto.VERTEX_KILLED);
+        put("ERROR", VertexStatusStateProto.VERTEX_ERROR);
+        put("TERMINATING", VertexStatusStateProto.VERTEX_TERMINATING);
+      }});
+
+
+  class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
+    @Override
+    public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+      String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
+          URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
+      return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
index 3684eef..378cd35 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
@@ -132,7 +132,7 @@ public class VertexStatus {
     sb.append("status=" + getState()
       + ", progress=" + getProgress()
       + ", counters="
-      + (vertexCounters == null ? "null" : vertexCounters.toString()));
+      + (getVertexCounters() == null ? "null" : getVertexCounters().toString()));
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 51f7cfa..09755fd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -19,9 +19,6 @@
 package org.apache.tez.dag.api.client.rpc;
 
 import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.Collections;
-import java.util.EnumSet;
 import java.util.Set;
 
 import javax.annotation.Nullable;
@@ -30,28 +27,24 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.client.FrameworkClient;
 import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAGNotRunningException;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.Progress;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
-import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
-import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
@@ -60,13 +53,11 @@ import com.google.protobuf.ServiceException;
 public class DAGClientRPCImpl extends DAGClient {
   private static final Log LOG = LogFactory.getLog(DAGClientRPCImpl.class);
 
-  private static final long SLEEP_FOR_COMPLETION = 500;
-  private static final long PRINT_STATUS_INTERVAL_MILLIS = 5000;
-  private final DecimalFormat formatter = new DecimalFormat("###.##%");
+  private static final String DAG_NOT_RUNNING_CLASS_NAME =
+      DAGNotRunningException.class.getCanonicalName();
   private final ApplicationId appId;
   private final String dagId;
   private final TezConfiguration conf;
-  private long lastPrintStatusTimeMillis;
   @VisibleForTesting
   ApplicationReport appReport;
   private final FrameworkClient frameworkClient;
@@ -78,14 +69,7 @@ public class DAGClientRPCImpl extends DAGClient {
     this.appId = appId;
     this.dagId = dagId;
     this.conf = conf;
-    if (frameworkClient != null &&
-        conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) {
-      this.frameworkClient = frameworkClient;
-    } else {
-      this.frameworkClient = FrameworkClient.createFrameworkClient(conf);
-      this.frameworkClient.init(conf, new YarnConfiguration(conf));
-      this.frameworkClient.start();
-    }
+    this.frameworkClient = frameworkClient;
     appReport = null;
   }
 
@@ -99,14 +83,16 @@ public class DAGClientRPCImpl extends DAGClient {
       throws IOException, TezException {
     if(createAMProxyIfNeeded()) {
       try {
-        return getDAGStatusViaAM(statusOptions);
+        DAGStatus dagStatus = getDAGStatusViaAM(statusOptions);
+        return dagStatus;
       } catch (TezException e) {
         resetProxy(e); // create proxy again
+        throw e;
       }
     }
 
-    // Later maybe from History
-    return getDAGStatusViaRM();
+    // either the dag is not running or some exception happened
+    return null;
   }
 
   @Override
@@ -118,10 +104,10 @@ public class DAGClientRPCImpl extends DAGClient {
         return getVertexStatusViaAM(vertexName, statusOptions);
       } catch (TezException e) {
         resetProxy(e); // create proxy again
+        throw e;
       }
     }
 
-    // need AM for this. Later maybe from History
     return null;
   }
 
@@ -148,9 +134,6 @@ public class DAGClientRPCImpl extends DAGClient {
     if (this.proxy != null) {
       RPC.stopProxy(this.proxy);
     }
-    if(frameworkClient != null) {
-      frameworkClient.stop();
-    }
   }
 
   @Override
@@ -185,75 +168,17 @@ public class DAGClientRPCImpl extends DAGClient {
         proxy.getDAGStatus(null,
           requestProtoBuilder.build()).getDagStatus());
     } catch (ServiceException e) {
-      // TEZ-151 retrieve wrapped TezException
-      throw new TezException(e);
-    }
-  }
-
-  DAGStatus getDAGStatusViaRM() throws TezException, IOException {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
-    }
-    ApplicationReport appReport;
-    try {
-      appReport = frameworkClient.getApplicationReport(appId);
-    } catch (YarnException e) {
-      throw new TezException(e);
-    }
-
-    if(appReport == null) {
-      throw new TezException("Unknown/Invalid appId: " + appId);
-    }
-
-    DAGStatusProto.Builder builder = DAGStatusProto.newBuilder();
-    DAGStatus dagStatus = new DAGStatus(builder);
-    DAGStatusStateProto dagState;
-    switch (appReport.getYarnApplicationState()) {
-    case NEW:
-    case NEW_SAVING:
-    case SUBMITTED:
-    case ACCEPTED:
-      dagState = DAGStatusStateProto.DAG_SUBMITTED;
-      break;
-    case RUNNING:
-      dagState = DAGStatusStateProto.DAG_RUNNING;
-      break;
-    case FAILED:
-      dagState = DAGStatusStateProto.DAG_FAILED;
-      break;
-    case KILLED:
-      dagState = DAGStatusStateProto.DAG_KILLED;
-      break;
-    case FINISHED:
-      switch(appReport.getFinalApplicationStatus()) {
-      case UNDEFINED:
-      case FAILED:
-        dagState = DAGStatusStateProto.DAG_FAILED;
-        break;
-      case KILLED:
-        dagState = DAGStatusStateProto.DAG_KILLED;
-        break;
-      case SUCCEEDED:
-        dagState = DAGStatusStateProto.DAG_SUCCEEDED;
-        break;
-      default:
-        throw new TezUncheckedException("Encountered unknown final application"
-          + " status from YARN"
-          + ", appState=" + appReport.getYarnApplicationState()
-          + ", finalStatus=" + appReport.getFinalApplicationStatus());
+      final Throwable cause = e.getCause();
+      if (cause instanceof RemoteException) {
+        RemoteException remoteException = (RemoteException) cause;
+        if (DAG_NOT_RUNNING_CLASS_NAME.equals(remoteException.getClassName())) {
+          throw new DAGNotRunningException(remoteException.getMessage());
+        }
       }
-      break;
-    default:
-      throw new TezUncheckedException("Encountered unknown application state"
-          + " from YARN, appState=" + appReport.getYarnApplicationState());
-    }
 
-    builder.setState(dagState);
-    if(appReport.getDiagnostics() != null) {
-      builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics()));
+      // TEZ-151 retrieve wrapped TezException
+      throw new TezException(e);
     }
-
-    return dagStatus;
   }
 
   VertexStatus getVertexStatusViaAM(String vertexName,
@@ -301,6 +226,7 @@ public class DAGClientRPCImpl extends DAGClient {
       // if proxy exist optimistically use it assuming there is no retry
       return true;
     }
+    appReport = null;
     appReport = getAppReport();
 
     if(appReport == null) {
@@ -326,121 +252,15 @@ public class DAGClientRPCImpl extends DAGClient {
 
   @Override
   public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException {
-    return _waitForCompletionWithStatusUpdates(false, EnumSet.noneOf(StatusGetOpts.class));
+    // should be used from DAGClientImpl
+    throw new TezException("not supported");
   }
 
   @Override
   public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts)
       throws IOException, TezException, InterruptedException {
-    return _waitForCompletionWithStatusUpdates(true, statusGetOpts);
-  }
-
-  private DAGStatus _waitForCompletionWithStatusUpdates(boolean vertexUpdates,
-      @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException {
-    DAGStatus dagStatus;
-    boolean initPrinted = false;
-    boolean runningPrinted = false;
-    double dagProgress = -1.0; // Print the first one
-    // monitoring
-    while (true) {
-      dagStatus = getDAGStatus(statusGetOpts);
-      if (!initPrinted
-          && (dagStatus.getState() == DAGStatus.State.INITING || dagStatus.getState() == DAGStatus.State.SUBMITTED)) {
-        initPrinted = true; // Print once
-        log("Waiting for DAG to start running");
-      }
-      if (dagStatus.getState() == DAGStatus.State.RUNNING
-          || dagStatus.getState() == DAGStatus.State.SUCCEEDED
-          || dagStatus.getState() == DAGStatus.State.FAILED
-          || dagStatus.getState() == DAGStatus.State.KILLED
-          || dagStatus.getState() == DAGStatus.State.ERROR) {
-        break;
-      }
-      Thread.sleep(SLEEP_FOR_COMPLETION);
-    }// End of while(true)
-
-    Set<String> vertexNames = Collections.emptySet();
-    while (!dagStatus.isCompleted()) {
-      if (!runningPrinted) {
-        log("DAG initialized: CurrentState=Running");
-        runningPrinted = true;
-      }
-      if (vertexUpdates && vertexNames.isEmpty()) {
-        vertexNames = getDAGStatus(statusGetOpts).getVertexProgress().keySet();
-      }
-      dagProgress = monitorProgress(vertexNames, dagProgress, null, dagStatus);
-      Thread.sleep(SLEEP_FOR_COMPLETION);
-      dagStatus = getDAGStatus(statusGetOpts);
-    }// end of while
-    // Always print the last status irrespective of progress change
-    monitorProgress(vertexNames, -1.0, statusGetOpts, dagStatus);
-    log("DAG completed. " + "FinalState=" + dagStatus.getState());
-    return dagStatus;
-  }
-
-  private double monitorProgress(Set<String> vertexNames, double prevDagProgress,
-      Set<StatusGetOpts> opts, DAGStatus dagStatus) throws IOException, TezException {
-    Progress progress = dagStatus.getDAGProgress();
-    double dagProgress = prevDagProgress;
-    if (progress != null) {
-      dagProgress = getProgress(progress);
-      boolean progressChanged = dagProgress > prevDagProgress;
-      long currentTimeMillis = System.currentTimeMillis();
-      long timeSinceLastPrintStatus =  currentTimeMillis - lastPrintStatusTimeMillis;
-      boolean printIntervalExpired = timeSinceLastPrintStatus > PRINT_STATUS_INTERVAL_MILLIS;
-      if (progressChanged || printIntervalExpired) {
-        lastPrintStatusTimeMillis = currentTimeMillis;
-        printDAGStatus(vertexNames, opts, dagStatus, progress);
-      }
-    }
-
-    return dagProgress;
+    // should be used from DAGClientImpl
+    throw new TezException("not supported");
   }
 
-  private void printDAGStatus(Set<String> vertexNames, Set<StatusGetOpts> opts,
-      DAGStatus dagStatus, Progress dagProgress) throws IOException, TezException {
-    double vProgressFloat = 0.0f;
-    log("DAG: State: " + dagStatus.getState() + " Progress: "
-        + formatter.format(getProgress(dagProgress)) + " " + dagProgress);
-    boolean displayCounter = opts != null ? opts.contains(StatusGetOpts.GET_COUNTERS) : false;
-    if (displayCounter) {
-      TezCounters counters = dagStatus.getDAGCounters();
-      if (counters != null) {
-        log("DAG Counters:\n" + counters);
-      }
-    }
-    for (String vertex : vertexNames) {
-      VertexStatus vStatus = getVertexStatus(vertex, opts);
-      if (vStatus == null) {
-        log("Could not retrieve status for vertex: " + vertex);
-        continue;
-      }
-      Progress vProgress = vStatus.getProgress();
-      if (vProgress != null) {
-        vProgressFloat = 0.0f;
-        if (vProgress.getTotalTaskCount() == 0) {
-          vProgressFloat = 1.0f;
-        } else if (vProgress.getTotalTaskCount() > 0) {
-          vProgressFloat = getProgress(vProgress);
-        }
-        log("\tVertexStatus:" + " VertexName: " + vertex + " Progress: "
-            + formatter.format(vProgressFloat) + " " + vProgress);
-      }
-      if (displayCounter) {
-        TezCounters counters = vStatus.getVertexCounters();
-        if (counters != null) {
-          log("Vertex Counters for " + vertex + ":\n" + counters);
-        }
-      }
-    } // end of for loop
-  }
-
-  private double getProgress(Progress progress) {
-    return (progress.getTotalTaskCount() == 0 ? 0.0 : (double) (progress.getSucceededTaskCount())
-        / progress.getTotalTaskCount());
-  }
-
-  private void log(String message) {
-    LOG.info(message);
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
new file mode 100644
index 0000000..0e3290f
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client;
+
+import static junit.framework.TestCase.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestATSHttpClient {
+
+  @Test
+  public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException {
+    ApplicationId mockAppId = mock(ApplicationId.class);
+    DAGClientTimelineImpl httpClient = new DAGClientTimelineImpl(mockAppId, "EXAMPLE_DAG_ID",
+        new TezConfiguration(), null);
+    DAGClientTimelineImpl spyClient = spy(httpClient);
+    spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
+    final String expectedDagUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_DAG_ID/EXAMPLE_DAG_ID" +
+        "?fields=primaryfilters,otherinfo";
+
+    doReturn(new JSONObject()).when(spyClient).getJsonRootEntity(expectedDagUrl);
+    boolean exceptionHappened = false;
+    try {
+      spyClient.getDAGStatus(null);
+    } catch (TezException e) {
+      exceptionHappened = true;
+    } catch (IOException e) {
+      fail("should not come here");
+    }
+
+    Assert.assertTrue("Expected TezException but did not happen", exceptionHappened);
+    verify(spyClient).getJsonRootEntity(expectedDagUrl);
+  }
+
+  @Test
+  public void testGetDagStatusSimple() throws TezException, JSONException, IOException {
+    DAGClientTimelineImpl
+        httpClient = new DAGClientTimelineImpl(mock(ApplicationId.class),"EXAMPLE_DAG_ID",
+        new TezConfiguration(), null);
+    DAGClientTimelineImpl spyClient = spy(httpClient);
+    spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
+    final String expectedDagUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_DAG_ID/EXAMPLE_DAG_ID" +
+        "?fields=primaryfilters,otherinfo";
+    final String expectedVertexUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_VERTEX_ID" +
+        "?primaryFilter=TEZ_DAG_ID:EXAMPLE_DAG_ID&fields=primaryfilters,otherinfo";
+
+    Set<StatusGetOpts> statusOptions = new HashSet<StatusGetOpts>(1);
+    statusOptions.add(StatusGetOpts.GET_COUNTERS);
+
+
+    final String jsonDagData =
+            "{ " +
+            "  otherinfo: { " +
+            "    status: 'SUCCEEDED'," +
+            "    diagnostics: 'SAMPLE_DIAGNOSTICS'," +
+            "    counters: { counterGroups: [ " +
+            "      { counterGroupName: 'CG1', counterGroupDisplayName: 'CGD1', counters: [" +
+            "        {counterName:'C1', counterDisplayName: 'CD1', counterValue: 1 }," +
+            "        {counterName:'C2', counterDisplayName: 'CD2', counterValue: 2 }" +
+            "      ]}" +
+            "    ]}" +
+            "  }" +
+            "}";
+
+    final String jsonVertexData = "{entities:[ " +
+        "{otherinfo: {vertexName:'v1', numTasks:5,numFailedTasks:1,numSucceededTasks:2," +
+          "numKilledTasks:3,numCompletedTasks:3}}," +
+        "{otherinfo: {vertexName:'v2',numTasks:10,numFailedTasks:1,numSucceededTasks:5," +
+          "numKilledTasks:3,numCompletedTasks:4}}" +
+        "]}";
+
+    doReturn(new JSONObject(jsonDagData)).when(spyClient).getJsonRootEntity(expectedDagUrl);
+    doReturn(new JSONObject(jsonVertexData)).when(spyClient).getJsonRootEntity(expectedVertexUrl);
+
+    DAGStatus dagStatus = spyClient.getDAGStatus(statusOptions);
+
+    Assert.assertEquals("DAG State", DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    Assert.assertEquals("DAG Diagnostics size", 1, dagStatus.getDiagnostics().size());
+    Assert.assertEquals("DAG diagnostics detail", "SAMPLE_DIAGNOSTICS",
+        dagStatus.getDiagnostics().get(0));
+    Assert.assertEquals("Counters Size", 2, dagStatus.getDAGCounters().countCounters());
+    Assert.assertEquals("Counter Value", 1,
+        dagStatus.getDAGCounters().getGroup("CG1").findCounter("C1").getValue());
+    Assert.assertEquals("total tasks", 15, dagStatus.getDAGProgress().getTotalTaskCount());
+    Assert.assertEquals("failed tasks", 2, dagStatus.getDAGProgress().getFailedTaskCount());
+    Assert.assertEquals("killed tasks", 6, dagStatus.getDAGProgress().getKilledTaskCount());
+    Assert.assertEquals("succeeded tasks", 7, dagStatus.getDAGProgress().getSucceededTaskCount());
+    Assert.assertEquals("running tasks", 8, dagStatus.getDAGProgress().getRunningTaskCount());
+    final Map<String, Progress> vertexProgress = dagStatus.getVertexProgress();
+    Assert.assertEquals("vertex progress count", 2, vertexProgress.size());
+    Assert.assertTrue("vertex name1", vertexProgress.containsKey("v1"));
+    Assert.assertTrue("vertex name2", vertexProgress.containsKey("v2"));
+  }
+
+  @Test
+  public void testGetVertexStatusSimple() throws JSONException, TezException, IOException {
+    DAGClientTimelineImpl
+        httpClient = new DAGClientTimelineImpl(mock(ApplicationId.class), "EXAMPLE_DAG_ID",
+        new TezConfiguration(), null);
+    DAGClientTimelineImpl spyClient = spy(httpClient);
+    spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
+    final String expectedVertexUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_VERTEX_ID" +
+        "?primaryFilter=TEZ_DAG_ID:EXAMPLE_DAG_ID&secondaryFilter=vertexName:vertex1name&" +
+        "fields=primaryfilters,otherinfo";
+
+    Set<StatusGetOpts> statusOptions = new HashSet<StatusGetOpts>(1);
+    statusOptions.add(StatusGetOpts.GET_COUNTERS);
+
+    final String jsonData = "{entities:[ {otherinfo:{numFailedTasks:1,numSucceededTasks:2," +
+        "status:'SUCCEEDED', vertexName:'vertex1name', numTasks:4, numKilledTasks: 3, " +
+        "numCompletedTasks: 4, diagnostics: 'diagnostics1', " +
+        "counters: { counterGroups: [ " +
+        "      { counterGroupName: 'CG1', counterGroupDisplayName: 'CGD1', counters: [" +
+        "        {counterName:'C1', counterDisplayName: 'CD1', counterValue: 1 }," +
+        "        {counterName:'C2', counterDisplayName: 'CD2', counterValue: 2 }" +
+        "      ]}" +
+        "    ]}" +
+        "}}]}";
+
+    doReturn(new JSONObject(jsonData)).when(spyClient).getJsonRootEntity(expectedVertexUrl);
+
+    VertexStatus vertexStatus = spyClient.getVertexStatus("vertex1name", statusOptions);
+    Assert.assertEquals("status check", VertexStatus.State.SUCCEEDED, vertexStatus.getState());
+    Assert.assertEquals("diagnostics", "diagnostics1", vertexStatus.getDiagnostics().get(0));
+    final Progress progress = vertexStatus.getProgress();
+    final TezCounters vertexCounters = vertexStatus.getVertexCounters();
+    Assert.assertEquals("failed task count", 1, progress.getFailedTaskCount());
+    Assert.assertEquals("suceeded task count", 2, progress.getSucceededTaskCount());
+    Assert.assertEquals("killed task count", 3, progress.getKilledTaskCount());
+    Assert.assertEquals("total task count", 4, progress.getTotalTaskCount());
+    Assert.assertEquals("Counters Size", 2, vertexCounters.countCounters());
+    Assert.assertEquals("Counter Value", 1,
+        vertexCounters.getGroup("CG1").findCounter("C1").getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 505e044..a23c1d5 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGClientImpl;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
@@ -61,7 +63,7 @@ import com.google.protobuf.ServiceException;
 
 public class TestDAGClient {
 
-  private DAGClientRPCImpl dagClient;
+  private DAGClient dagClient;
   private ApplicationId mockAppId;
   private ApplicationReport mockAppReport;
   private String dagIdStr;
@@ -188,17 +190,17 @@ public class TestDAGClient {
     when(mockProxy.getVertexStatus(isNull(RpcController.class), argThat(new VertexCounterRequestMatcher())))
       .thenReturn(GetVertexStatusResponseProto.newBuilder().setVertexStatus(vertexStatusProtoWithCounters).build());
    
-    
-    
-    dagClient = new DAGClientRPCImpl(mockAppId, dagIdStr, new TezConfiguration(), null);
-    dagClient.appReport = mockAppReport;
-    ((DAGClientRPCImpl)dagClient).proxy = mockProxy;
+    dagClient = new DAGClientImpl(mockAppId, dagIdStr, new TezConfiguration(), null);
+    DAGClientRPCImpl realClient = (DAGClientRPCImpl)((DAGClientImpl)dagClient).getRealClient();
+    realClient.appReport = mockAppReport;
+    realClient.proxy = mockProxy;
   }
   
   @Test
   public void testApp() throws IOException, TezException, ServiceException{
     assertTrue(dagClient.getExecutionContext().contains(mockAppId.toString()));
-    assertEquals(mockAppReport, dagClient.getApplicationReportInternal());
+    DAGClientRPCImpl realClient = (DAGClientRPCImpl)((DAGClientImpl)dagClient).getRealClient();
+    assertEquals(mockAppReport, realClient.getApplicationReportInternal());
   }
   
   @Test
@@ -226,7 +228,8 @@ public class TestDAGClient {
     
     resultVertexStatus = dagClient.getVertexStatus("v1", Sets.newSet(StatusGetOpts.GET_COUNTERS));
     verify(mockProxy).getVertexStatus(null, GetVertexStatusRequestProto.newBuilder()
-        .setDagId(dagIdStr).setVertexName("v1").addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
+        .setDagId(dagIdStr).setVertexName("v1").addStatusOptions(StatusGetOptsProto.GET_COUNTERS)
+        .build());
     assertEquals(new VertexStatus(vertexStatusProtoWithCounters), resultVertexStatus);
     System.out.println("VertexWithCounter:" + resultVertexStatus);
   }
@@ -249,7 +252,7 @@ public class TestDAGClient {
       
     dagClient.waitForCompletion();
     verify(mockProxy, times(2)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
-      .setDagId(dagIdStr).build());
+        .setDagId(dagIdStr).build());
   }
 
   @Test
@@ -257,7 +260,8 @@ public class TestDAGClient {
 
     // first time and second time return DAG_RUNNING, third time return DAG_SUCCEEDED
     when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
-      .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithoutCounters).build())
+      .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithoutCounters)
+          .build())
       .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithoutCounters).build())
       .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus
                 (DAGStatusProto.newBuilder(dagStatusProtoWithoutCounters).setState(DAGStatusStateProto.DAG_SUCCEEDED).build())

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index c4d3497..d14ed2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezAppMasterStatus;
+import org.apache.tez.dag.api.DAGNotRunningException;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.DAGAppMaster;
@@ -48,6 +49,10 @@ public class DAGClientHandler {
     return dagAppMaster.getContext().getCurrentDAG();
   }
 
+  private Set<String> getAllDagIDs() {
+    return dagAppMaster.getContext().getAllDAGIDs();
+  }
+
   public List<String> getAllDAGs() throws TezException {
     return Collections.singletonList(getCurrentDAG().getID().toString());
   }
@@ -78,12 +83,20 @@ public class DAGClientHandler {
     if (currentDAG == null) {
       throw new TezException("No running dag at present");
     }
-    if (!currentDAG.getID().toString().equals(dagId.toString())) {
-      LOG.warn("Current DAGID : "
-          + (currentDAG.getID() == null ? "NULL" : currentDAG.getID())
-          + ", Looking for string (not found): " + dagIdStr + ", dagIdObj: "
-          + dagId);
-      throw new TezException("Unknown dagId: " + dagIdStr);
+
+    final String currentDAGIdStr = currentDAG.getID().toString();
+    if (!currentDAGIdStr.equals(dagIdStr)) {
+      if (getAllDagIDs().contains(dagIdStr)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Looking for finished dagId " + dagIdStr + " current dag is " + currentDAGIdStr);
+        }
+        throw new DAGNotRunningException("DAG " + dagIdStr + " Not running, current dag is " +
+            currentDAGIdStr);
+      } else {
+        LOG.warn("Current DAGID : " + currentDAGIdStr + ", Looking for string (not found): " +
+            dagIdStr + ", dagIdObj: " + dagId);
+        throw new TezException("Unknown dagId: " + dagIdStr);
+      }
     }
 
     return currentDAG;

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 9cc28cb..af244c8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app;
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -65,6 +66,8 @@ public interface AppContext {
 
   void setDAG(DAG dag);
 
+  Set<String> getAllDAGIDs();
+
   @SuppressWarnings("rawtypes")
   EventHandler getEventHandler();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 0e4f78b..e6a1d9c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -245,6 +245,7 @@ public class DAGAppMaster extends AbstractService {
    * set of already executed dag names.
    */
   Set<String> dagNames = new HashSet<String>();
+  Set<String> dagIDs = new HashSet<String>();
 
   protected boolean isLastAMRetry = false;
 
@@ -1165,6 +1166,11 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
+    public Set<String> getAllDAGIDs() {
+      return dagIDs;
+    }
+
+    @Override
     public EventHandler getEventHandler() {
       return eventHandler;
     }
@@ -1861,6 +1867,7 @@ public class DAGAppMaster extends AbstractService {
       throw new TezException(e);
     }
 
+    dagIDs.add(currentDAG.getID().toString());
     // End of creating the job.
     ((RunningAppContext) context).setDAG(currentDAG);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 85851c5..220b5b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -599,6 +599,7 @@ public class RecoveryParser {
     dagAppMaster.setDAGCounter(dagCounter);
     for (DAGSummaryData dagSummaryData: dagSummaryDataMap.values()){
       dagAppMaster.dagNames.add(dagSummaryData.dagName);
+      dagAppMaster.dagIDs.add(dagSummaryData.dagId.toString());
     }
 
     DAGSummaryData lastInProgressDAGData =

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 594c651..ab22099 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -1463,19 +1464,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   void logJobHistoryVertexFinishedEvent() throws IOException {
     this.setFinishTime();
-    VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
-        vertexName, initTimeRequested, initedTime, startTimeRequested,
-        startedTime, finishTime, VertexState.SUCCEEDED, "",
-        getAllCounters(), getVertexStats());
-    this.appContext.getHistoryHandler().handleCriticalEvent(
-        new DAGHistoryEvent(getDAGId(), finishEvt));
+    logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, "");
   }
 
   void logJobHistoryVertexFailedEvent(VertexState state) throws IOException {
-    VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
-        vertexName, initTimeRequested, initedTime, startTimeRequested,
-        startedTime, clock.getTime(), state, StringUtils.join(getDiagnostics(),
-            LINE_SEPARATOR), getAllCounters(), getVertexStats());
+    logJobHistoryVertexCompletedHelper(state, clock.getTime(),
+        StringUtils.join(getDiagnostics(), LINE_SEPARATOR));
+  }
+
+  private void logJobHistoryVertexCompletedHelper(VertexState finalState, long finishTime,
+                                                  String diagnostics) throws IOException {
+    Map<String, Integer> taskStats = new HashMap<String, Integer>();
+    taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, completedTaskCount);
+    taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, succeededTaskCount);
+    taskStats.put(ATSConstants.NUM_FAILED_TASKS, failedTaskCount);
+    taskStats.put(ATSConstants.NUM_KILLED_TASKS, killedTaskCount);
+
+    VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, initTimeRequested,
+        initedTime, startTimeRequested, startedTime, finishTime, finalState, diagnostics,
+        getAllCounters(), getVertexStats(), taskStats);
     this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index 450bcc1..d0e0e69 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TezDAGID;
@@ -108,4 +109,7 @@ public class DAGStartedEvent implements HistoryEvent {
     return dagName;
   }
 
+  public DAGState getDagState() {
+    return DAGState.RUNNING;
+  }
 }