You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/19 08:19:01 UTC
[1/2] TEZ-1495. ATS integration for TezClient (Prakash Ramachandran
via bikas)
Repository: tez
Updated Branches:
refs/heads/master 82ec16baf -> 4cf6472e3
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 8057714..d9cafc7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -52,12 +53,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
private TezCounters tezCounters;
private boolean fromSummary = false;
private VertexStats vertexStats;
+ private Map<String, Integer> vertexTaskStats;
- public VertexFinishedEvent(TezVertexID vertexId,
- String vertexName, long initRequestedTime, long initedTime,
- long startRequestedTime, long startedTime, long finishTime,
- VertexState state, String diagnostics, TezCounters counters,
- VertexStats vertexStats) {
+ public VertexFinishedEvent(TezVertexID vertexId, String vertexName, long initRequestedTime,
+ long initedTime, long startRequestedTime, long startedTime,
+ long finishTime, VertexState state, String diagnostics,
+ TezCounters counters, VertexStats vertexStats,
+ Map<String, Integer> vertexTaskStats) {
this.vertexName = vertexName;
this.vertexID = vertexId;
this.initRequestedTime = initRequestedTime;
@@ -69,6 +71,7 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
this.diagnostics = diagnostics;
this.tezCounters = counters;
this.vertexStats = vertexStats;
+ this.vertexTaskStats = vertexTaskStats;
}
public VertexFinishedEvent() {
@@ -138,10 +141,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
+ ", status=" + state.name()
+ ", diagnostics=" + diagnostics
+ ", counters=" + ( tezCounters == null ? "null" :
- tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " "))
- + ", vertexStats=" + (vertexStats == null ? "null"
- : vertexStats.toString());
+ tezCounters.toString().replaceAll("\\n", ", ").replaceAll("\\s+", " "))
+ + ", vertexStats=" + (vertexStats == null ? "null" : vertexStats.toString())
+ + ", vertexTaskStats=" + (vertexTaskStats == null ? "null" : vertexTaskStats.toString());
}
public TezVertexID getVertexID() {
@@ -176,6 +178,10 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
return startTime;
}
+ public Map<String, Integer> getVertexTaskStats() {
+ return vertexTaskStats;
+ }
+
@Override
public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
VertexFinishStateProto finishStateProto =
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index 136c5f1..a8bd21e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.records.TezVertexID;
@@ -105,4 +106,7 @@ public class VertexStartedEvent implements HistoryEvent {
return startTime;
}
+ public VertexState getVertexState() {
+ return VertexState.RUNNING;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 12dcf1c..a9987d6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.history.logging.impl;
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
@@ -36,7 +37,6 @@ import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
deleted file mode 100644
index 0188a8e..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.utils;
-
-public class ATSConstants {
-
- // TODO remove once YARN exposes proper constants
-
- /* Top level keys */
- public static final String ENTITY = "entity";
- public static final String ENTITY_TYPE = "entitytype";
- public static final String EVENTS = "events";
- public static final String EVENT_TYPE = "eventtype";
- public static final String TIMESTAMP = "ts";
- public static final String EVENT_INFO = "eventinfo";
- public static final String RELATED_ENTITIES = "relatedEntities";
- public static final String PRIMARY_FILTERS = "primaryfilters";
- public static final String SECONDARY_FILTERS = "secondaryfilters";
- public static final String OTHER_INFO = "otherinfo";
-
- /* Section for related entities */
- public static final String APPLICATION_ID = "applicationId";
- public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
- public static final String CONTAINER_ID = "containerId";
- public static final String NODE_ID = "nodeId";
- public static final String USER = "user";
-
- /* Keys used in other info */
- public static final String APP_SUBMIT_TIME = "appSubmitTime";
-
- /* Tez-specific info */
- public static final String DAG_PLAN = "dagPlan";
- public static final String DAG_NAME = "dagName";
- public static final String VERTEX_NAME = "vertexName";
- public static final String SCHEDULED_TIME = "scheduledTime";
- public static final String INIT_REQUESTED_TIME = "initRequestedTime";
- public static final String INIT_TIME = "initTime";
- public static final String START_REQUESTED_TIME = "startRequestedTime";
- public static final String START_TIME = "startTime";
- public static final String FINISH_TIME = "endTime";
- public static final String TIME_TAKEN = "timeTaken";
- public static final String STATUS = "status";
- public static final String DIAGNOSTICS = "diagnostics";
- public static final String COUNTERS = "counters";
- public static final String STATS = "stats";
- public static final String NUM_TASKS = "numTasks";
- public static final String PROCESSOR_CLASS_NAME = "processorClassName";
- public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
- public static final String COMPLETED_LOGS_URL = "completedLogsURL";
- public static final String EXIT_STATUS = "exitStatus";
-
- /* Counters-related keys */
- public static final String COUNTER_GROUPS = "counterGroups";
- public static final String COUNTER_GROUP_NAME = "counterGroupName";
- public static final String COUNTER_GROUP_DISPLAY_NAME = "counterGroupDisplayName";
- public static final String COUNTER_NAME = "counterName";
- public static final String COUNTER_DISPLAY_NAME = "counterDisplayName";
- public static final String COUNTER_VALUE = "counterValue";
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 232a3b2..cab3c83 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 9042a93..b278d8f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -572,7 +572,7 @@ public class TestVertexRecovery {
vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
"vertex1", initRequestedTime, initedTime, startRequestedTime,
startTime, finishTime, VertexState.SUCCEEDED, "",
- new TezCounters(), new VertexStats()));
+ new TezCounters(), new VertexStats(), null));
assertEquals(finishTime, vertex1.finishTime);
assertEquals(VertexState.SUCCEEDED, recoveredState);
assertEquals(false, vertex1.recoveryCommitInProgress);
@@ -801,7 +801,7 @@ public class TestVertexRecovery {
recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
"vertex1", initRequestedTime, initedTime, initRequestedTime + 300L,
initRequestedTime + 400L, initRequestedTime + 500L,
- VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats()));
+ VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), null));
assertEquals(VertexState.SUCCEEDED, recoveredState);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index bcbe6f1..f52671c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -346,7 +346,7 @@ public class TestHistoryEventsProtoConversion {
new VertexFinishedEvent(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
"vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
- null, null, null);
+ null, null, null, null);
VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
@@ -361,7 +361,7 @@ public class TestHistoryEventsProtoConversion {
new VertexFinishedEvent(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
"vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
- "diagnose", new TezCounters(), new VertexStats());
+ "diagnose", new TezCounters(), new VertexStats(), null);
VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index f674fc0..2149053 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -127,7 +127,7 @@ public class TestHistoryEventJsonConversion {
case VERTEX_FINISHED:
event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
- null, null, null);
+ null, null, null, null);
break;
case TASK_STARTED:
event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index f078f1d..a81bdf4 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -19,9 +19,11 @@
package org.apache.tez.dag.history.logging.ats;
import java.io.IOException;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
@@ -41,7 +43,6 @@ import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
public class HistoryEventTimelineConversion {
@@ -253,6 +254,7 @@ public class HistoryEventTimelineConversion {
atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+ atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString());
return atsEntity;
}
@@ -422,6 +424,13 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.STATS,
DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
+ final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
+ if (vertexTaskStats != null) {
+ for(String key : vertexTaskStats.keySet()) {
+ atsEntity.addOtherInfo(key, vertexTaskStats.get(key));
+ }
+ }
+
return atsEntity;
}
@@ -464,6 +473,7 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+ atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString());
return atsEntity;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index b04b8d4..d2e366d 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -127,7 +127,7 @@ public class TestHistoryEventTimelineConversion {
case VERTEX_FINISHED:
event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
- null, null, null);
+ null, null, null, null);
break;
case TASK_STARTED:
event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
[2/2] git commit: TEZ-1495. ATS integration for TezClient (Prakash
Ramachandran via bikas)
Posted by bi...@apache.org.
TEZ-1495. ATS integration for TezClient (Prakash Ramachandran via bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4cf6472e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4cf6472e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4cf6472e
Branch: refs/heads/master
Commit: 4cf6472e39018d8809a945f4ccb39155d8c03220
Parents: 82ec16b
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Sep 18 23:16:53 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Sep 18 23:18:07 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../java/org/apache/tez/client/TezClient.java | 6 +-
.../org/apache/tez/common/ATSConstants.java | 94 ++++
.../tez/dag/api/DAGNotRunningException.java | 34 ++
.../tez/dag/api/client/DAGClientImpl.java | 477 ++++++++++++++++++
.../dag/api/client/DAGClientTimelineImpl.java | 501 +++++++++++++++++++
.../apache/tez/dag/api/client/VertexStatus.java | 2 +-
.../dag/api/client/rpc/DAGClientRPCImpl.java | 228 +--------
.../tez/dag/api/client/TestATSHttpClient.java | 167 +++++++
.../tez/dag/api/client/rpc/TestDAGClient.java | 24 +-
.../tez/dag/api/client/DAGClientHandler.java | 25 +-
.../java/org/apache/tez/dag/app/AppContext.java | 3 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 7 +
.../org/apache/tez/dag/app/RecoveryParser.java | 1 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 27 +-
.../tez/dag/history/events/DAGStartedEvent.java | 4 +
.../dag/history/events/VertexFinishedEvent.java | 24 +-
.../dag/history/events/VertexStartedEvent.java | 4 +
.../impl/HistoryEventJsonConversion.java | 2 +-
.../tez/dag/history/utils/ATSConstants.java | 76 ---
.../apache/tez/dag/history/utils/DAGUtils.java | 1 +
.../dag/app/dag/impl/TestVertexRecovery.java | 4 +-
.../TestHistoryEventsProtoConversion.java | 4 +-
.../impl/TestHistoryEventJsonConversion.java | 2 +-
.../ats/HistoryEventTimelineConversion.java | 12 +-
.../ats/TestHistoryEventTimelineConversion.java | 2 +-
26 files changed, 1405 insertions(+), 328 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3ae8f59..c0c03f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,7 +19,6 @@ ALL CHANGES:
TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
TEZ-1524. Resolve user group information only if ACLs are enabled.
TEZ-1581. GroupByOrderByMRRTest no longer functional.
- TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless of DAG submission.
TEZ-1157. Optimize broadcast shuffle to download data only once per host.
Release 0.5.1: Unreleased
@@ -45,6 +44,7 @@ ALL CHANGES
TEZ-1533. Request Events more often if a complete set of events is received by a task.
TEZ-1587. Some tez-examples fail in local mode.
TEZ-1597. ImmediateStartVertexManager should handle corner case of vertex having zero tasks.
+ TEZ-1495. ATS integration for TezClient
Release 0.5.0: 2014-09-03
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 5cec8b0..0955c20 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -54,7 +54,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRespo
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+import org.apache.tez.dag.api.client.DAGClientImpl;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import com.google.common.annotations.VisibleForTesting;
@@ -409,7 +409,7 @@ public class TezClient {
+ ", sessionName=" + clientName
+ ", applicationId=" + sessionAppId
+ ", dagName=" + dag.getName());
- return new DAGClientRPCImpl(sessionAppId, dagId,
+ return new DAGClientImpl(sessionAppId, dagId,
amConfig.getTezConfiguration(), frameworkClient);
}
@@ -711,7 +711,7 @@ public class TezClient {
static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf,
FrameworkClient frameworkClient)
throws IOException, TezException {
- return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient);
+ return new DAGClientImpl(appId, getDefaultTezDAGID(appId), tezConf, frameworkClient);
}
// DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
new file mode 100644
index 0000000..065614c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+@Private
+public class ATSConstants {
+
+ // TODO remove once YARN exposes proper constants
+
+ /* Top level keys */
+ public static final String ENTITIES = "entities";
+ public static final String ENTITY = "entity";
+ public static final String ENTITY_TYPE = "entitytype";
+ public static final String EVENTS = "events";
+ public static final String EVENT_TYPE = "eventtype";
+ public static final String TIMESTAMP = "ts";
+ public static final String EVENT_INFO = "eventinfo";
+ public static final String RELATED_ENTITIES = "relatedEntities";
+ public static final String PRIMARY_FILTERS = "primaryfilters";
+ public static final String SECONDARY_FILTERS = "secondaryfilters";
+ public static final String OTHER_INFO = "otherinfo";
+
+ /* Section for related entities */
+ public static final String APPLICATION_ID = "applicationId";
+ public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
+ public static final String CONTAINER_ID = "containerId";
+ public static final String NODE_ID = "nodeId";
+ public static final String USER = "user";
+
+ /* Keys used in other info */
+ public static final String APP_SUBMIT_TIME = "appSubmitTime";
+
+ /* Tez-specific info */
+ public static final String DAG_PLAN = "dagPlan";
+ public static final String DAG_NAME = "dagName";
+ public static final String VERTEX_NAME = "vertexName";
+ public static final String SCHEDULED_TIME = "scheduledTime";
+ public static final String INIT_REQUESTED_TIME = "initRequestedTime";
+ public static final String INIT_TIME = "initTime";
+ public static final String START_REQUESTED_TIME = "startRequestedTime";
+ public static final String START_TIME = "startTime";
+ public static final String FINISH_TIME = "endTime";
+ public static final String TIME_TAKEN = "timeTaken";
+ public static final String STATUS = "status";
+ public static final String DIAGNOSTICS = "diagnostics";
+ public static final String COUNTERS = "counters";
+ public static final String STATS = "stats";
+ public static final String NUM_TASKS = "numTasks";
+ public static final String NUM_COMPLETED_TASKS = "numCompletedTasks";
+ public static final String NUM_SUCCEEDED_TASKS = "numSucceededTasks";
+ public static final String NUM_FAILED_TASKS = "numFailedTasks";
+ public static final String NUM_KILLED_TASKS = "numKilledTasks";
+ public static final String PROCESSOR_CLASS_NAME = "processorClassName";
+ public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
+ public static final String COMPLETED_LOGS_URL = "completedLogsURL";
+ public static final String EXIT_STATUS = "exitStatus";
+
+ /* Counters-related keys */
+ public static final String COUNTER_GROUPS = "counterGroups";
+ public static final String COUNTER_GROUP_NAME = "counterGroupName";
+ public static final String COUNTER_GROUP_DISPLAY_NAME = "counterGroupDisplayName";
+ public static final String COUNTER_NAME = "counterName";
+ public static final String COUNTER_DISPLAY_NAME = "counterDisplayName";
+ public static final String COUNTER_VALUE = "counterValue";
+
+ /* Url related */
+ public static final String RESOURCE_URI_BASE = "/ws/v1/timeline";
+ public static final String TEZ_DAG_ID = "TEZ_DAG_ID";
+ public static final String TEZ_VERTEX_ID = "TEZ_VERTEX_ID";
+
+ /* In Yarn but not present in 2.2 */
+ public static final String TIMELINE_SERVICE_WEBAPP_HTTP_ADDRESS_CONF_NAME =
+ "yarn.timeline-service.webapp.address";
+ public static final String TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME =
+ "yarn.timeline-service.webapp.https.address";
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java
new file mode 100644
index 0000000..cbc93a9
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Checked Exception thrown upon error
+ */
+@Private
+public class DAGNotRunningException extends TezException {
+ private static final long serialVersionUID = 6337442733802964448L;
+ public DAGNotRunningException(Throwable cause) { super(cause); }
+ public DAGNotRunningException(String message) { super(message); }
+ public DAGNotRunningException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
new file mode 100644
index 0000000..0c8ef1a
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.client.FrameworkClient;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAGNotRunningException;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+import org.apache.tez.dag.api.records.DAGProtos;
+
+@Private
+public class DAGClientImpl extends DAGClient {
+ private static final Log LOG = LogFactory.getLog(DAGClientImpl.class);
+
+ private final ApplicationId appId;
+ private final String dagId;
+ private final TezConfiguration conf;
+ private final FrameworkClient frameworkClient;
+
+ @VisibleForTesting
+ protected DAGClient realClient;
+ private boolean dagCompleted = false;
+ private boolean isATSEnabled = false;
+ private DAGStatus cachedDagStatus = null;
+ Map<String, VertexStatus> cachedVertexStatus = new HashMap<String, VertexStatus>();
+
+ private static final long SLEEP_FOR_COMPLETION = 500;
+ private static final long PRINT_STATUS_INTERVAL_MILLIS = 5000;
+ private final DecimalFormat formatter = new DecimalFormat("###.##%");
+ private long lastPrintStatusTimeMillis;
+ private EnumSet<VertexStatus.State> vertexCompletionStates = EnumSet.of(
+ VertexStatus.State.SUCCEEDED, VertexStatus.State.FAILED, VertexStatus.State.KILLED,
+ VertexStatus.State.ERROR);
+
+ public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration conf,
+ @Nullable FrameworkClient frameworkClient) {
+ this.appId = appId;
+ this.dagId = dagId;
+ this.conf = conf;
+ if (frameworkClient != null &&
+ conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) {
+ this.frameworkClient = frameworkClient;
+ } else {
+ this.frameworkClient = FrameworkClient.createFrameworkClient(conf);
+ this.frameworkClient.init(conf, new YarnConfiguration(conf));
+ this.frameworkClient.start();
+ }
+ isATSEnabled = conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "")
+ .equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService");
+
+ if (UserGroupInformation.isSecurityEnabled()){
+ //TODO: enable ATS integration in kerberos secured cluster - see TEZ-1529
+ isATSEnabled = false;
+ }
+
+ realClient = new DAGClientRPCImpl(appId, dagId, conf, this.frameworkClient);
+ }
+
+ @Override
+ public String getExecutionContext() {
+ return realClient.getExecutionContext();
+ }
+
+ @Override
+ protected ApplicationReport getApplicationReportInternal() {
+ return realClient.getApplicationReportInternal();
+ }
+
+ @Override
+ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions) throws
+ TezException, IOException {
+
+ if (!dagCompleted) {
+ // fetch from AM. on Error and while DAG is still not completed (could not reach AM, AM got
+ // killed). return cached status. This prevents the progress being reset (for ex fetching from
+ // RM does not give status).
+ final DAGStatus dagStatus = getDAGStatusViaAM(statusOptions);
+
+ if (!dagCompleted) {
+ if (dagStatus != null) {
+ cachedDagStatus = dagStatus;
+ return dagStatus;
+ }
+ if (cachedDagStatus != null) {
+ // could not get from AM (not reachable/ was killed). return cached status.
+ return cachedDagStatus;
+ }
+ }
+
+ if (isATSEnabled && dagCompleted) {
+ switchToTimelineClient();
+ }
+ }
+
+ if (isATSEnabled && dagCompleted) {
+ try {
+ // fetch from ATS and return only if status is completed.
+ DAGStatus dagStatus = realClient.getDAGStatus(statusOptions);
+ if (dagStatus.isCompleted()) {
+ return dagStatus;
+ }
+ } catch (TezException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("DAGStatus fetch failed." + e.getMessage());
+ }
+ }
+ }
+
+ // dag completed and Timeline service is either not enabled or does not have completion status
+ // return cached status if completion info is present.
+ if (dagCompleted && cachedDagStatus != null && cachedDagStatus.isCompleted()) {
+ return cachedDagStatus;
+ }
+
+ // everything else fails rely on RM.
+ return getDAGStatusViaRM();
+ }
+
+ @Override
+ public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) throws
+ IOException, TezException {
+
+ if (!dagCompleted) {
+ VertexStatus vertexStatus = getVertexStatusViaAM(vertexName, statusOptions);
+
+ if (!dagCompleted) {
+ if (vertexStatus != null) {
+ cachedVertexStatus.put(vertexName, vertexStatus);
+ return vertexStatus;
+ }
+ if (cachedVertexStatus.containsKey(vertexName)) {
+ return cachedVertexStatus.get(vertexName);
+ }
+ }
+
+ if (isATSEnabled && dagCompleted) {
+ switchToTimelineClient();
+ }
+ }
+
+ if (isATSEnabled && dagCompleted) {
+ try {
+ final VertexStatus vertexStatus = realClient.getVertexStatus(vertexName, statusOptions);
+ if (vertexCompletionStates.contains(vertexStatus.getState())) {
+ return vertexStatus;
+ }
+ } catch (TezException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ERROR fetching vertex data from Yarn Timeline. " + e.getMessage());
+ }
+ }
+ }
+
+ if (cachedVertexStatus.containsKey(vertexName)) {
+ final VertexStatus vertexStatus = cachedVertexStatus.get(vertexName);
+ if (vertexCompletionStates.contains(vertexStatus.getState())) {
+ return vertexStatus;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void tryKillDAG() throws IOException, TezException {
+ if (!dagCompleted) {
+ realClient.tryKillDAG();
+ } else {
+ LOG.info("TryKill for app: " + appId + " dag:" + dagId + " dag already completed.");
+ }
+ }
+
+ @Override
+ public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException {
+ return _waitForCompletionWithStatusUpdates(false, EnumSet.noneOf(StatusGetOpts.class));
+ }
+
+ @Override
+ public DAGStatus waitForCompletionWithStatusUpdates(
+ @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException,
+ InterruptedException {
+ return _waitForCompletionWithStatusUpdates(true, statusGetOpts);
+ }
+
+ @Override
+ public void close() throws IOException {
+ realClient.close();
+ if (frameworkClient != null) {
+ frameworkClient.stop();
+ }
+ }
+
+ private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions) throws
+ IOException {
+ DAGStatus dagStatus = null;
+ try {
+ dagStatus = realClient.getDAGStatus(statusOptions);
+ } catch (DAGNotRunningException e) {
+ dagCompleted = true;
+ } catch (TezException e) {
+ // can be either due to a n/w issue of due to AM completed.
+ }
+
+ if (dagStatus == null && !dagCompleted) {
+ checkAndSetDagCompletionStatus();
+ }
+
+ return dagStatus;
+ }
+
+ private VertexStatus getVertexStatusViaAM(String vertexName, Set<StatusGetOpts> statusOptions) throws
+ IOException {
+ VertexStatus vertexStatus = null;
+ try {
+ vertexStatus = realClient.getVertexStatus(vertexName, statusOptions);
+ } catch (DAGNotRunningException e) {
+ dagCompleted = true;
+ } catch (TezException e) {
+ // can be either due to a n/w issue of due to AM completed.
+ }
+
+ if (vertexStatus == null && !dagCompleted) {
+ checkAndSetDagCompletionStatus();
+ }
+
+ return vertexStatus;
+ }
+
+ DAGStatus getDAGStatusViaRM() throws TezException, IOException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
+ }
+ ApplicationReport appReport;
+ try {
+ appReport = frameworkClient.getApplicationReport(appId);
+ } catch (YarnException e) {
+ throw new TezException(e);
+ }
+
+ if(appReport == null) {
+ throw new TezException("Unknown/Invalid appId: " + appId);
+ }
+
+ DAGProtos.DAGStatusProto.Builder builder = DAGProtos.DAGStatusProto.newBuilder();
+ DAGStatus dagStatus = new DAGStatus(builder);
+ DAGProtos.DAGStatusStateProto dagState;
+ switch (appReport.getYarnApplicationState()) {
+ case NEW:
+ case NEW_SAVING:
+ case SUBMITTED:
+ case ACCEPTED:
+ dagState = DAGProtos.DAGStatusStateProto.DAG_SUBMITTED;
+ break;
+ case RUNNING:
+ dagState = DAGProtos.DAGStatusStateProto.DAG_RUNNING;
+ break;
+ case FAILED:
+ dagState = DAGProtos.DAGStatusStateProto.DAG_FAILED;
+ break;
+ case KILLED:
+ dagState = DAGProtos.DAGStatusStateProto.DAG_KILLED;
+ break;
+ case FINISHED:
+ switch(appReport.getFinalApplicationStatus()) {
+ case UNDEFINED:
+ case FAILED:
+ dagState = DAGProtos.DAGStatusStateProto.DAG_FAILED;
+ break;
+ case KILLED:
+ dagState = DAGProtos.DAGStatusStateProto.DAG_KILLED;
+ break;
+ case SUCCEEDED:
+ dagState = DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED;
+ break;
+ default:
+ throw new TezUncheckedException("Encountered unknown final application"
+ + " status from YARN"
+ + ", appState=" + appReport.getYarnApplicationState()
+ + ", finalStatus=" + appReport.getFinalApplicationStatus());
+ }
+ break;
+ default:
+ throw new TezUncheckedException("Encountered unknown application state"
+ + " from YARN, appState=" + appReport.getYarnApplicationState());
+ }
+
+ builder.setState(dagState);
+ if(appReport.getDiagnostics() != null) {
+ builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics()));
+ }
+
+ return dagStatus;
+ }
+
+ private DAGStatus _waitForCompletionWithStatusUpdates(boolean vertexUpdates,
+ @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException {
+ DAGStatus dagStatus;
+ boolean initPrinted = false;
+ boolean runningPrinted = false;
+ double dagProgress = -1.0; // Print the first one
+ // monitoring
+ while (true) {
+ dagStatus = getDAGStatus(statusGetOpts);
+ if (!initPrinted
+ && (dagStatus.getState() == DAGStatus.State.INITING || dagStatus.getState() == DAGStatus.State.SUBMITTED)) {
+ initPrinted = true; // Print once
+ log("Waiting for DAG to start running");
+ }
+ if (dagStatus.getState() == DAGStatus.State.RUNNING
+ || dagStatus.getState() == DAGStatus.State.SUCCEEDED
+ || dagStatus.getState() == DAGStatus.State.FAILED
+ || dagStatus.getState() == DAGStatus.State.KILLED
+ || dagStatus.getState() == DAGStatus.State.ERROR) {
+ break;
+ }
+ Thread.sleep(SLEEP_FOR_COMPLETION);
+ }// End of while(true)
+
+ Set<String> vertexNames = Collections.emptySet();
+ while (!dagStatus.isCompleted()) {
+ if (!runningPrinted) {
+ log("DAG initialized: CurrentState=Running");
+ runningPrinted = true;
+ }
+ if (vertexUpdates && vertexNames.isEmpty()) {
+ vertexNames = getDAGStatus(statusGetOpts).getVertexProgress().keySet();
+ }
+ dagProgress = monitorProgress(vertexNames, dagProgress, null, dagStatus);
+ Thread.sleep(SLEEP_FOR_COMPLETION);
+ dagStatus = getDAGStatus(statusGetOpts);
+ }// end of while
+ // Always print the last status irrespective of progress change
+ monitorProgress(vertexNames, -1.0, statusGetOpts, dagStatus);
+ log("DAG completed. " + "FinalState=" + dagStatus.getState());
+ return dagStatus;
+ }
+
+ private double monitorProgress(Set<String> vertexNames, double prevDagProgress,
+ Set<StatusGetOpts> opts, DAGStatus dagStatus) throws IOException, TezException {
+ Progress progress = dagStatus.getDAGProgress();
+ double dagProgress = prevDagProgress;
+ if (progress != null) {
+ dagProgress = getProgress(progress);
+ boolean progressChanged = dagProgress > prevDagProgress;
+ long currentTimeMillis = System.currentTimeMillis();
+ long timeSinceLastPrintStatus = currentTimeMillis - lastPrintStatusTimeMillis;
+ boolean printIntervalExpired = timeSinceLastPrintStatus > PRINT_STATUS_INTERVAL_MILLIS;
+ if (progressChanged || printIntervalExpired) {
+ lastPrintStatusTimeMillis = currentTimeMillis;
+ printDAGStatus(vertexNames, opts, dagStatus, progress);
+ }
+ }
+
+ return dagProgress;
+ }
+
+ private void printDAGStatus(Set<String> vertexNames, Set<StatusGetOpts> opts,
+ DAGStatus dagStatus, Progress dagProgress) throws IOException, TezException {
+ double vProgressFloat = 0.0f;
+ log("DAG: State: " + dagStatus.getState() + " Progress: "
+ + formatter.format(getProgress(dagProgress)) + " " + dagProgress);
+ boolean displayCounter = opts != null && opts.contains(StatusGetOpts.GET_COUNTERS);
+ if (displayCounter) {
+ TezCounters counters = dagStatus.getDAGCounters();
+ if (counters != null) {
+ log("DAG Counters:\n" + counters);
+ }
+ }
+ for (String vertex : vertexNames) {
+ VertexStatus vStatus = getVertexStatus(vertex, opts);
+ if (vStatus == null) {
+ log("Could not retrieve status for vertex: " + vertex);
+ continue;
+ }
+ Progress vProgress = vStatus.getProgress();
+ if (vProgress != null) {
+ vProgressFloat = 0.0f;
+ if (vProgress.getTotalTaskCount() == 0) {
+ vProgressFloat = 1.0f;
+ } else if (vProgress.getTotalTaskCount() > 0) {
+ vProgressFloat = getProgress(vProgress);
+ }
+ log("\tVertexStatus:" + " VertexName: " + vertex + " Progress: "
+ + formatter.format(vProgressFloat) + " " + vProgress);
+ }
+ if (displayCounter) {
+ TezCounters counters = vStatus.getVertexCounters();
+ if (counters != null) {
+ log("Vertex Counters for " + vertex + ":\n" + counters);
+ }
+ }
+ } // end of for loop
+ }
+
+ private void checkAndSetDagCompletionStatus() {
+ ApplicationReport appReport = realClient.getApplicationReportInternal();
+ if (appReport != null) {
+ final YarnApplicationState appState = appReport.getYarnApplicationState();
+ if (appState == YarnApplicationState.FINISHED || appState == YarnApplicationState.FAILED ||
+ appState == YarnApplicationState.KILLED) {
+ dagCompleted = true;
+ }
+ }
+ }
+
+ private ApplicationReport getApplicationReport() {
+ ApplicationReport appReport = null;
+ try {
+ appReport = frameworkClient.getApplicationReport(appId);
+ } catch (YarnException e) {
+ // do nothing
+ } catch (IOException e) {
+ // do nothing
+ }
+ return appReport;
+ }
+
+ private void switchToTimelineClient() throws IOException, TezException {
+ realClient.close();
+ realClient = new DAGClientTimelineImpl(appId, dagId, conf, frameworkClient);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("dag completed switching to DAGClientTimelineImpl");
+ }
+ }
+
+ @VisibleForTesting
+ public DAGClient getRealClient() {
+ return realClient;
+ }
+
+ private double getProgress(Progress progress) {
+ return (progress.getTotalTaskCount() == 0 ? 0.0 : (double) (progress.getSucceededTaskCount())
+ / progress.getTotalTaskCount());
+ }
+
+ private void log(String message) {
+ LOG.info(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
new file mode 100644
index 0000000..57453aa
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -0,0 +1,501 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.client.FrameworkClient;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
+import org.apache.tez.dag.api.records.DAGProtos.ProgressProto;
+import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCounterGroupProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
+import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos.VertexStatusStateProto;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+
+@Private
+public class DAGClientTimelineImpl extends DAGClient {
+ private static final Log LOG = LogFactory.getLog(DAGClientTimelineImpl.class);
+
+ private static final String FILTER_BY_FIELDS = "primaryfilters,otherinfo";
+ private static final String HTTPS_SCHEME = "https://";
+ private static final String HTTP_SCHEME = "http://";
+ private static Client httpClient = null;
+ private final ApplicationId appId;
+ private final String dagId;
+ private final TezConfiguration conf;
+ private final FrameworkClient frameworkClient;
+
+ private Map<String, VertexTaskStats> vertexTaskStatsCache = null;
+
+ @VisibleForTesting
+ protected String baseUri;
+
+ public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf,
+ FrameworkClient frameworkClient)
+ throws TezException {
+ this.appId = appId;
+ this.dagId = dagId;
+ this.conf = conf;
+ this.frameworkClient = frameworkClient;
+
+ String scheme;
+ String webAppAddress;
+ if (webappHttpsOnly(conf)) {
+ scheme = HTTPS_SCHEME;
+ webAppAddress = conf.get(ATSConstants.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME);
+ } else {
+ scheme = HTTP_SCHEME;
+ webAppAddress = conf.get(ATSConstants.TIMELINE_SERVICE_WEBAPP_HTTP_ADDRESS_CONF_NAME);
+ }
+ if (webAppAddress == null) {
+ throw new TezException("Failed to get ATS webapp address");
+ }
+
+ baseUri = Joiner.on("").join(scheme, webAppAddress, ATSConstants.RESOURCE_URI_BASE);
+ }
+
+
+ @Override
+ public String getExecutionContext() {
+ return "Executing on YARN cluster with App id " + appId;
+ }
+
+ @Override
+ protected ApplicationReport getApplicationReportInternal() {
+ ApplicationReport appReport = null;
+ try {
+ appReport = frameworkClient.getApplicationReport(appId);
+ } catch (YarnException e) {
+ } catch (IOException e) {
+ }
+ return appReport;
+ }
+
+ @Override
+ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException {
+ final String url = String.format("%s/%s/%s?fields=%s", baseUri, ATSConstants.TEZ_DAG_ID, dagId,
+ FILTER_BY_FIELDS);
+ try {
+ DAGStatusProto.Builder statusBuilder;
+ final JSONObject jsonRoot = getJsonRootEntity(url);
+
+ statusBuilder = parseDagStatus(jsonRoot, statusOptions);
+ if (statusBuilder == null) {
+ throw new TezException("Failed to get DagStatus from ATS");
+ }
+
+ return new DAGStatus(statusBuilder);
+ } catch (JSONException je) {
+ throw new TezException("Failed to parse DagStatus json from YARN Timeline", je);
+ }
+ }
+
+ @Override
+ public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException {
+ final String url = String.format(
+ "%s/%s?primaryFilter=%s:%s&secondaryFilter=vertexName:%s&fields=%s", baseUri,
+ ATSConstants.TEZ_VERTEX_ID, ATSConstants.TEZ_DAG_ID, dagId, vertexName, FILTER_BY_FIELDS);
+
+ try {
+ VertexStatusProto.Builder statusBuilder;
+ final JSONObject jsonRoot = getJsonRootEntity(url);
+ JSONArray entitiesNode = jsonRoot.optJSONArray(ATSConstants.ENTITIES);
+ if (entitiesNode == null || entitiesNode.length() != 1) {
+ throw new TezException("Failed to get vertex status YARN Timeline");
+ }
+ JSONObject vertexNode = entitiesNode.getJSONObject(0);
+
+ statusBuilder = parseVertexStatus(vertexNode, statusOptions);
+ if (statusBuilder == null) {
+ throw new TezException("Failed to parse vertex status from YARN Timeline");
+ }
+
+ return new VertexStatus(statusBuilder);
+ } catch (JSONException je) {
+ throw new TezException("Failed to parse VertexStatus json from YARN Timeline", je);
+ }
+ }
+
+ @Override
+ public void tryKillDAG() throws IOException, TezException {
+ throw new TezException("tryKillDAG is unsupported for DAGClientTimelineImpl");
+ }
+
+ @Override
+ public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException {
+ return getDAGStatus(null);
+ }
+
+ @Override
+ public DAGStatus waitForCompletionWithStatusUpdates(
+ @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException,
+ InterruptedException {
+ return getDAGStatus(statusGetOpts);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (httpClient != null) {
+ httpClient.destroy();
+ httpClient = null;
+ }
+ }
+
+ private DAGStatusProto.Builder parseDagStatus(JSONObject jsonRoot, Set<StatusGetOpts> statusOptions)
+ throws JSONException, TezException {
+ final JSONObject otherInfoNode = jsonRoot.getJSONObject(ATSConstants.OTHER_INFO);
+
+ DAGStatusProto.Builder dagStatusBuilder = DAGStatusProto.newBuilder();
+
+ final String status = otherInfoNode.optString(ATSConstants.STATUS);
+ final String diagnostics = otherInfoNode.optString(ATSConstants.DIAGNOSTICS);
+ if (status.equals("")) {
+ return null;
+ }
+
+ dagStatusBuilder.setState(dagStateProtoMap.get(status))
+ .addAllDiagnostics(Collections.singleton(diagnostics));
+
+ if (statusOptions != null && statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
+ final TezCountersProto.Builder tezCounterBuilder;
+ final JSONObject countersNode = otherInfoNode.optJSONObject(ATSConstants.COUNTERS);
+ tezCounterBuilder = parseDagCounters(countersNode);
+ if (tezCounterBuilder != null) {
+ dagStatusBuilder.setDagCounters(tezCounterBuilder);
+ }
+ }
+
+ final Map<String, VertexTaskStats> vertexTaskStatsMap = parseTaskStatsForVertexes();
+ if (vertexTaskStatsMap.size() > 0) {
+ ProgressProto.Builder dagProgressBuilder = getProgressBuilder(vertexTaskStatsMap, null);
+ dagStatusBuilder.setDAGProgress(dagProgressBuilder);
+
+ List<StringProgressPairProto> vertexProgressBuilder =
+ new ArrayList<StringProgressPairProto>(vertexTaskStatsMap.size());
+ for (Map.Entry<String, VertexTaskStats> v : vertexTaskStatsMap.entrySet()) {
+ StringProgressPairProto vertexProgressProto = StringProgressPairProto
+ .newBuilder()
+ .setKey(v.getKey())
+ .setProgress(getProgressBuilder(vertexTaskStatsMap, v.getKey()))
+ .build();
+ vertexProgressBuilder.add(vertexProgressProto);
+ }
+ dagStatusBuilder.addAllVertexProgress(vertexProgressBuilder);
+ }
+
+ return dagStatusBuilder;
+ }
+
+ private ProgressProto.Builder getProgressBuilder(Map<String, VertexTaskStats> vertexTaskStatsMap,
+ String vertexName) {
+ int failedTaskCount = 0;
+ int killedTaskCount = 0;
+ int runningTaskCount = 0;
+ int succeededTaskCount = 0;
+ int totalCount = 0;
+
+ for (Map.Entry<String, VertexTaskStats> v : vertexTaskStatsMap.entrySet()) {
+ if (vertexName == null || vertexName.equals(v.getKey())) {
+ final VertexTaskStats taskStats = v.getValue();
+ totalCount += taskStats.numTaskCount;
+ succeededTaskCount += taskStats.succeededTaskCount;
+ killedTaskCount += taskStats.killedTaskCount;
+ failedTaskCount += taskStats.failedTaskCount;
+ runningTaskCount += (taskStats.numTaskCount - taskStats.completedTaskCount);
+ }
+ }
+
+ ProgressProto.Builder progressBuilder = ProgressProto.newBuilder();
+ progressBuilder.setTotalTaskCount(totalCount);
+ progressBuilder.setRunningTaskCount(runningTaskCount);
+ progressBuilder.setSucceededTaskCount(succeededTaskCount);
+ progressBuilder.setKilledTaskCount(killedTaskCount);
+ progressBuilder.setFailedTaskCount(failedTaskCount);
+ return progressBuilder;
+ }
+
+ private VertexStatusProto.Builder parseVertexStatus(JSONObject jsonRoot,
+ Set<StatusGetOpts> statusOptions)
+ throws JSONException {
+ final JSONObject otherInfoNode = jsonRoot.getJSONObject(ATSConstants.OTHER_INFO);
+ final VertexStatusProto.Builder vertexStatusBuilder = VertexStatusProto.newBuilder();
+
+ final String status = otherInfoNode.optString(ATSConstants.STATUS);
+ final String diagnostics = otherInfoNode.optString(ATSConstants.DIAGNOSTICS);
+ if (status.equals("")) {
+ return null;
+ }
+
+ vertexStatusBuilder.setState(vertexStateProtoMap.get(status))
+ .addAllDiagnostics(Collections.singleton(diagnostics));
+
+ int numRunningTasks = otherInfoNode.optInt(ATSConstants.NUM_TASKS) -
+ otherInfoNode.optInt(ATSConstants.NUM_COMPLETED_TASKS);
+ ProgressProto.Builder progressBuilder = ProgressProto.newBuilder();
+ progressBuilder.setTotalTaskCount(otherInfoNode.optInt(ATSConstants.NUM_TASKS));
+ progressBuilder.setRunningTaskCount(numRunningTasks);
+ progressBuilder.setSucceededTaskCount(otherInfoNode.optInt(ATSConstants.NUM_SUCCEEDED_TASKS));
+ progressBuilder.setKilledTaskCount(otherInfoNode.optInt(ATSConstants.NUM_KILLED_TASKS));
+ progressBuilder.setFailedTaskCount(otherInfoNode.optInt(ATSConstants.NUM_FAILED_TASKS));
+ vertexStatusBuilder.setProgress(progressBuilder);
+
+ if (statusOptions != null && statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
+ final TezCountersProto.Builder tezCounterBuilder;
+ final JSONObject countersNode = otherInfoNode.optJSONObject(ATSConstants.COUNTERS);
+ tezCounterBuilder = parseDagCounters(countersNode);
+ if (tezCounterBuilder != null) {
+ vertexStatusBuilder.setVertexCounters(tezCounterBuilder);
+ }
+ }
+
+ return vertexStatusBuilder;
+ }
+
+ private TezCountersProto.Builder parseDagCounters(JSONObject countersNode)
+ throws JSONException {
+ if (countersNode == null) {
+ return null;
+ }
+
+ TezCountersProto.Builder countersProto = TezCountersProto.newBuilder();
+ final JSONArray counterGroupNodes = countersNode.optJSONArray(ATSConstants.COUNTER_GROUPS);
+ if (counterGroupNodes != null) {
+ final int numCounterGroups = counterGroupNodes.length();
+
+ for (int i = 0; i < numCounterGroups; i++) {
+ TezCounterGroupProto.Builder counterGroupBuilder =
+ parseCounterGroup(counterGroupNodes.optJSONObject(i));
+ if (counterGroupBuilder != null) {
+ countersProto.addCounterGroups(counterGroupBuilder);
+ }
+ }
+ }
+
+ return countersProto;
+ }
+
+ private TezCounterGroupProto.Builder parseCounterGroup(JSONObject counterGroupNode)
+ throws JSONException {
+
+ if (counterGroupNode == null) {
+ return null;
+ }
+
+ TezCounterGroupProto.Builder counterGroup = TezCounterGroupProto.newBuilder();
+
+ final String groupName = counterGroupNode.optString(ATSConstants.COUNTER_GROUP_NAME);
+ final String groupDisplayName = counterGroupNode.optString(
+ ATSConstants.COUNTER_GROUP_DISPLAY_NAME);
+ final JSONArray counterNodes = counterGroupNode.optJSONArray(ATSConstants.COUNTERS);
+ final int numCounters = counterNodes.length();
+
+ List<TezCounterProto> counters = new ArrayList<TezCounterProto>(numCounters);
+
+ for (int i = 0; i < numCounters; i++) {
+ final JSONObject counterNode = counterNodes.getJSONObject(i);
+ final String counterName = counterNode.getString(ATSConstants.COUNTER_NAME);
+ final String counterDisplayName = counterNode.getString(ATSConstants.COUNTER_DISPLAY_NAME);
+ final long counterValue = counterNode.getLong(ATSConstants.COUNTER_VALUE);
+
+ counters.add(
+ TezCounterProto.newBuilder()
+ .setName(counterName)
+ .setDisplayName(counterDisplayName)
+ .setValue(counterValue)
+ .build());
+ }
+
+ return counterGroup.setName(groupName)
+ .setDisplayName(groupDisplayName)
+ .addAllCounters(counters);
+ }
+
+ @VisibleForTesting
+ protected Map<String, VertexTaskStats> parseTaskStatsForVertexes()
+ throws TezException, JSONException {
+
+ if (vertexTaskStatsCache == null) {
+ final String url = String.format("%s/%s?primaryFilter=%s:%s&fields=%s", baseUri,
+ ATSConstants.TEZ_VERTEX_ID, ATSConstants.TEZ_DAG_ID, dagId, FILTER_BY_FIELDS);
+
+ final JSONObject jsonRoot = getJsonRootEntity(url);
+ final JSONArray vertexNodes = jsonRoot.optJSONArray(ATSConstants.ENTITIES);
+
+ if (vertexNodes != null) {
+ final int numVertexNodes = vertexNodes.length();
+ Map<String, VertexTaskStats> vertexTaskStatsMap =
+ new HashMap<String, VertexTaskStats>(numVertexNodes);
+ for (int i = 0; i < numVertexNodes; i++) {
+ final JSONObject vertexNode = vertexNodes.getJSONObject(i);
+ final JSONObject otherInfoNode = vertexNode.getJSONObject(ATSConstants.OTHER_INFO);
+ final String vertexName = otherInfoNode.getString(ATSConstants.VERTEX_NAME);
+ final VertexTaskStats vertexTaskStats =
+ new VertexTaskStats(otherInfoNode.optInt(ATSConstants.NUM_TASKS),
+ otherInfoNode.optInt(ATSConstants.NUM_COMPLETED_TASKS),
+ otherInfoNode.optInt(ATSConstants.NUM_SUCCEEDED_TASKS),
+ otherInfoNode.optInt(ATSConstants.NUM_KILLED_TASKS),
+ otherInfoNode.optInt(ATSConstants.NUM_FAILED_TASKS));
+ vertexTaskStatsMap.put(vertexName, vertexTaskStats);
+ }
+ vertexTaskStatsCache = vertexTaskStatsMap;
+ }
+ }
+ return vertexTaskStatsCache;
+ }
+
+ @VisibleForTesting
+ protected JSONObject getJsonRootEntity(String url) throws TezException {
+ try {
+ WebResource wr = getHttpClient().resource(url);
+ ClientResponse response = wr.accept(MediaType.APPLICATION_JSON_TYPE)
+ .type(MediaType.APPLICATION_JSON_TYPE)
+ .get(ClientResponse.class);
+
+ if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
+ throw new TezException("Failed to get response from YARN Timeline: url: " + url);
+ }
+
+ return response.getEntity(JSONObject.class);
+ } catch (ClientHandlerException e) {
+ throw new TezException("Error processing response from YARN Timeline", e);
+ } catch (UniformInterfaceException e) {
+ throw new TezException("Error accessing content from YARN Timeline - unexpected response", e);
+ } catch (IllegalArgumentException e) {
+ throw new TezException("Error accessing content from YARN Timeline - invalid url", e);
+ }
+ }
+
+ @VisibleForTesting
+ protected class VertexTaskStats {
+ final int numTaskCount;
+ final int completedTaskCount;
+ final int succeededTaskCount;
+ final int killedTaskCount;
+ final int failedTaskCount;
+
+ public VertexTaskStats(int numTaskCount, int completedTaskCount, int succeededTaskCount,
+ int killedTaskCount, int failedTaskCount) {
+ this.numTaskCount = numTaskCount;
+ this.completedTaskCount = completedTaskCount;
+ this.succeededTaskCount = succeededTaskCount;
+ this.killedTaskCount = killedTaskCount;
+ this.failedTaskCount = failedTaskCount;
+ }
+ }
+
+ private boolean webappHttpsOnly(Configuration conf) throws TezException {
+ try {
+ Class<?> yarnConfiguration = Class.forName("org.apache.hadoop.yarn.conf.YarnConfiguration");
+ final Method useHttps = yarnConfiguration.getMethod("useHttps", Configuration.class);
+ return (Boolean)useHttps.invoke(null, conf);
+ } catch (ReflectiveOperationException e) {
+ throw new TezException("error accessing yarn configuration", e);
+ }
+ }
+
+ protected Client getHttpClient() {
+ if (httpClient == null) {
+ ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
+ HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
+ httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
+ }
+ return httpClient;
+ }
+
+ private static final Map<String, DAGStatusStateProto> dagStateProtoMap =
+ Collections.unmodifiableMap(new HashMap<String, DAGStatusStateProto>() {{
+ put("NEW", DAGStatusStateProto.DAG_SUBMITTED);
+ put("INITED", DAGStatusStateProto.DAG_SUBMITTED);
+ put("RUNNING", DAGStatusStateProto.DAG_RUNNING);
+ put("SUCCEEDED", DAGStatusStateProto.DAG_SUCCEEDED);
+ put("FAILED", DAGStatusStateProto.DAG_FAILED);
+ put("KILLED", DAGStatusStateProto.DAG_KILLED);
+ put("ERROR", DAGStatusStateProto.DAG_ERROR);
+ put("TERMINATING", DAGStatusStateProto.DAG_TERMINATING);
+ }});
+
+ private static final Map<String, VertexStatusStateProto> vertexStateProtoMap =
+ Collections.unmodifiableMap(new HashMap<String, VertexStatusStateProto>() {{
+ put("NEW", VertexStatusStateProto.VERTEX_NEW);
+ put("INITIALIZING", VertexStatusStateProto.VERTEX_INITIALIZING);
+ put("RECOVERING", VertexStatusStateProto.VERTEX_RECOVERING);
+ put("INITED", VertexStatusStateProto.VERTEX_INITED);
+ put("RUNNING", VertexStatusStateProto.VERTEX_RUNNING);
+ put("SUCCEEDED", VertexStatusStateProto.VERTEX_SUCCEEDED);
+ put("FAILED", VertexStatusStateProto.VERTEX_FAILED);
+ put("KILLED", VertexStatusStateProto.VERTEX_KILLED);
+ put("ERROR", VertexStatusStateProto.VERTEX_ERROR);
+ put("TERMINATING", VertexStatusStateProto.VERTEX_TERMINATING);
+ }});
+
+
+ class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
+ @Override
+ public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+ String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
+ URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
+ return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
index 3684eef..378cd35 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
@@ -132,7 +132,7 @@ public class VertexStatus {
sb.append("status=" + getState()
+ ", progress=" + getProgress()
+ ", counters="
- + (vertexCounters == null ? "null" : vertexCounters.toString()));
+ + (getVertexCounters() == null ? "null" : getVertexCounters().toString()));
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 51f7cfa..09755fd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -19,9 +19,6 @@
package org.apache.tez.dag.api.client.rpc;
import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.Collections;
-import java.util.EnumSet;
import java.util.Set;
import javax.annotation.Nullable;
@@ -30,28 +27,24 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.FrameworkClient;
import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAGNotRunningException;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.Progress;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
-import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
-import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
@@ -60,13 +53,11 @@ import com.google.protobuf.ServiceException;
public class DAGClientRPCImpl extends DAGClient {
private static final Log LOG = LogFactory.getLog(DAGClientRPCImpl.class);
- private static final long SLEEP_FOR_COMPLETION = 500;
- private static final long PRINT_STATUS_INTERVAL_MILLIS = 5000;
- private final DecimalFormat formatter = new DecimalFormat("###.##%");
+ private static final String DAG_NOT_RUNNING_CLASS_NAME =
+ DAGNotRunningException.class.getCanonicalName();
private final ApplicationId appId;
private final String dagId;
private final TezConfiguration conf;
- private long lastPrintStatusTimeMillis;
@VisibleForTesting
ApplicationReport appReport;
private final FrameworkClient frameworkClient;
@@ -78,14 +69,7 @@ public class DAGClientRPCImpl extends DAGClient {
this.appId = appId;
this.dagId = dagId;
this.conf = conf;
- if (frameworkClient != null &&
- conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) {
- this.frameworkClient = frameworkClient;
- } else {
- this.frameworkClient = FrameworkClient.createFrameworkClient(conf);
- this.frameworkClient.init(conf, new YarnConfiguration(conf));
- this.frameworkClient.start();
- }
+ this.frameworkClient = frameworkClient;
appReport = null;
}
@@ -99,14 +83,16 @@ public class DAGClientRPCImpl extends DAGClient {
throws IOException, TezException {
if(createAMProxyIfNeeded()) {
try {
- return getDAGStatusViaAM(statusOptions);
+ DAGStatus dagStatus = getDAGStatusViaAM(statusOptions);
+ return dagStatus;
} catch (TezException e) {
resetProxy(e); // create proxy again
+ throw e;
}
}
- // Later maybe from History
- return getDAGStatusViaRM();
+ // either the dag is not running or some exception happened
+ return null;
}
@Override
@@ -118,10 +104,10 @@ public class DAGClientRPCImpl extends DAGClient {
return getVertexStatusViaAM(vertexName, statusOptions);
} catch (TezException e) {
resetProxy(e); // create proxy again
+ throw e;
}
}
- // need AM for this. Later maybe from History
return null;
}
@@ -148,9 +134,6 @@ public class DAGClientRPCImpl extends DAGClient {
if (this.proxy != null) {
RPC.stopProxy(this.proxy);
}
- if(frameworkClient != null) {
- frameworkClient.stop();
- }
}
@Override
@@ -185,75 +168,17 @@ public class DAGClientRPCImpl extends DAGClient {
proxy.getDAGStatus(null,
requestProtoBuilder.build()).getDagStatus());
} catch (ServiceException e) {
- // TEZ-151 retrieve wrapped TezException
- throw new TezException(e);
- }
- }
-
- DAGStatus getDAGStatusViaRM() throws TezException, IOException {
- if(LOG.isDebugEnabled()) {
- LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
- }
- ApplicationReport appReport;
- try {
- appReport = frameworkClient.getApplicationReport(appId);
- } catch (YarnException e) {
- throw new TezException(e);
- }
-
- if(appReport == null) {
- throw new TezException("Unknown/Invalid appId: " + appId);
- }
-
- DAGStatusProto.Builder builder = DAGStatusProto.newBuilder();
- DAGStatus dagStatus = new DAGStatus(builder);
- DAGStatusStateProto dagState;
- switch (appReport.getYarnApplicationState()) {
- case NEW:
- case NEW_SAVING:
- case SUBMITTED:
- case ACCEPTED:
- dagState = DAGStatusStateProto.DAG_SUBMITTED;
- break;
- case RUNNING:
- dagState = DAGStatusStateProto.DAG_RUNNING;
- break;
- case FAILED:
- dagState = DAGStatusStateProto.DAG_FAILED;
- break;
- case KILLED:
- dagState = DAGStatusStateProto.DAG_KILLED;
- break;
- case FINISHED:
- switch(appReport.getFinalApplicationStatus()) {
- case UNDEFINED:
- case FAILED:
- dagState = DAGStatusStateProto.DAG_FAILED;
- break;
- case KILLED:
- dagState = DAGStatusStateProto.DAG_KILLED;
- break;
- case SUCCEEDED:
- dagState = DAGStatusStateProto.DAG_SUCCEEDED;
- break;
- default:
- throw new TezUncheckedException("Encountered unknown final application"
- + " status from YARN"
- + ", appState=" + appReport.getYarnApplicationState()
- + ", finalStatus=" + appReport.getFinalApplicationStatus());
+ final Throwable cause = e.getCause();
+ if (cause instanceof RemoteException) {
+ RemoteException remoteException = (RemoteException) cause;
+ if (DAG_NOT_RUNNING_CLASS_NAME.equals(remoteException.getClassName())) {
+ throw new DAGNotRunningException(remoteException.getMessage());
+ }
}
- break;
- default:
- throw new TezUncheckedException("Encountered unknown application state"
- + " from YARN, appState=" + appReport.getYarnApplicationState());
- }
- builder.setState(dagState);
- if(appReport.getDiagnostics() != null) {
- builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics()));
+ // TEZ-151 retrieve wrapped TezException
+ throw new TezException(e);
}
-
- return dagStatus;
}
VertexStatus getVertexStatusViaAM(String vertexName,
@@ -301,6 +226,7 @@ public class DAGClientRPCImpl extends DAGClient {
// if proxy exist optimistically use it assuming there is no retry
return true;
}
+ appReport = null;
appReport = getAppReport();
if(appReport == null) {
@@ -326,121 +252,15 @@ public class DAGClientRPCImpl extends DAGClient {
@Override
public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException {
- return _waitForCompletionWithStatusUpdates(false, EnumSet.noneOf(StatusGetOpts.class));
+ // should be used from DAGClientImpl
+ throw new TezException("not supported");
}
@Override
public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts)
throws IOException, TezException, InterruptedException {
- return _waitForCompletionWithStatusUpdates(true, statusGetOpts);
- }
-
- private DAGStatus _waitForCompletionWithStatusUpdates(boolean vertexUpdates,
- @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException {
- DAGStatus dagStatus;
- boolean initPrinted = false;
- boolean runningPrinted = false;
- double dagProgress = -1.0; // Print the first one
- // monitoring
- while (true) {
- dagStatus = getDAGStatus(statusGetOpts);
- if (!initPrinted
- && (dagStatus.getState() == DAGStatus.State.INITING || dagStatus.getState() == DAGStatus.State.SUBMITTED)) {
- initPrinted = true; // Print once
- log("Waiting for DAG to start running");
- }
- if (dagStatus.getState() == DAGStatus.State.RUNNING
- || dagStatus.getState() == DAGStatus.State.SUCCEEDED
- || dagStatus.getState() == DAGStatus.State.FAILED
- || dagStatus.getState() == DAGStatus.State.KILLED
- || dagStatus.getState() == DAGStatus.State.ERROR) {
- break;
- }
- Thread.sleep(SLEEP_FOR_COMPLETION);
- }// End of while(true)
-
- Set<String> vertexNames = Collections.emptySet();
- while (!dagStatus.isCompleted()) {
- if (!runningPrinted) {
- log("DAG initialized: CurrentState=Running");
- runningPrinted = true;
- }
- if (vertexUpdates && vertexNames.isEmpty()) {
- vertexNames = getDAGStatus(statusGetOpts).getVertexProgress().keySet();
- }
- dagProgress = monitorProgress(vertexNames, dagProgress, null, dagStatus);
- Thread.sleep(SLEEP_FOR_COMPLETION);
- dagStatus = getDAGStatus(statusGetOpts);
- }// end of while
- // Always print the last status irrespective of progress change
- monitorProgress(vertexNames, -1.0, statusGetOpts, dagStatus);
- log("DAG completed. " + "FinalState=" + dagStatus.getState());
- return dagStatus;
- }
-
- private double monitorProgress(Set<String> vertexNames, double prevDagProgress,
- Set<StatusGetOpts> opts, DAGStatus dagStatus) throws IOException, TezException {
- Progress progress = dagStatus.getDAGProgress();
- double dagProgress = prevDagProgress;
- if (progress != null) {
- dagProgress = getProgress(progress);
- boolean progressChanged = dagProgress > prevDagProgress;
- long currentTimeMillis = System.currentTimeMillis();
- long timeSinceLastPrintStatus = currentTimeMillis - lastPrintStatusTimeMillis;
- boolean printIntervalExpired = timeSinceLastPrintStatus > PRINT_STATUS_INTERVAL_MILLIS;
- if (progressChanged || printIntervalExpired) {
- lastPrintStatusTimeMillis = currentTimeMillis;
- printDAGStatus(vertexNames, opts, dagStatus, progress);
- }
- }
-
- return dagProgress;
+ // should be used from DAGClientImpl
+ throw new TezException("not supported");
}
- private void printDAGStatus(Set<String> vertexNames, Set<StatusGetOpts> opts,
- DAGStatus dagStatus, Progress dagProgress) throws IOException, TezException {
- double vProgressFloat = 0.0f;
- log("DAG: State: " + dagStatus.getState() + " Progress: "
- + formatter.format(getProgress(dagProgress)) + " " + dagProgress);
- boolean displayCounter = opts != null ? opts.contains(StatusGetOpts.GET_COUNTERS) : false;
- if (displayCounter) {
- TezCounters counters = dagStatus.getDAGCounters();
- if (counters != null) {
- log("DAG Counters:\n" + counters);
- }
- }
- for (String vertex : vertexNames) {
- VertexStatus vStatus = getVertexStatus(vertex, opts);
- if (vStatus == null) {
- log("Could not retrieve status for vertex: " + vertex);
- continue;
- }
- Progress vProgress = vStatus.getProgress();
- if (vProgress != null) {
- vProgressFloat = 0.0f;
- if (vProgress.getTotalTaskCount() == 0) {
- vProgressFloat = 1.0f;
- } else if (vProgress.getTotalTaskCount() > 0) {
- vProgressFloat = getProgress(vProgress);
- }
- log("\tVertexStatus:" + " VertexName: " + vertex + " Progress: "
- + formatter.format(vProgressFloat) + " " + vProgress);
- }
- if (displayCounter) {
- TezCounters counters = vStatus.getVertexCounters();
- if (counters != null) {
- log("Vertex Counters for " + vertex + ":\n" + counters);
- }
- }
- } // end of for loop
- }
-
- private double getProgress(Progress progress) {
- return (progress.getTotalTaskCount() == 0 ? 0.0 : (double) (progress.getSucceededTaskCount())
- / progress.getTotalTaskCount());
- }
-
- private void log(String message) {
- LOG.info(message);
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
new file mode 100644
index 0000000..0e3290f
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/TestATSHttpClient.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client;
+
+import static junit.framework.TestCase.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestATSHttpClient {
+
+ @Test
+ public void testGetDagStatusThrowsExceptionOnEmptyJson() throws TezException {
+ ApplicationId mockAppId = mock(ApplicationId.class);
+ DAGClientTimelineImpl httpClient = new DAGClientTimelineImpl(mockAppId, "EXAMPLE_DAG_ID",
+ new TezConfiguration(), null);
+ DAGClientTimelineImpl spyClient = spy(httpClient);
+ spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
+ final String expectedDagUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_DAG_ID/EXAMPLE_DAG_ID" +
+ "?fields=primaryfilters,otherinfo";
+
+ doReturn(new JSONObject()).when(spyClient).getJsonRootEntity(expectedDagUrl);
+ boolean exceptionHappened = false;
+ try {
+ spyClient.getDAGStatus(null);
+ } catch (TezException e) {
+ exceptionHappened = true;
+ } catch (IOException e) {
+ fail("should not come here");
+ }
+
+ Assert.assertTrue("Expected TezException but did not happen", exceptionHappened);
+ verify(spyClient).getJsonRootEntity(expectedDagUrl);
+ }
+
+ @Test
+ public void testGetDagStatusSimple() throws TezException, JSONException, IOException {
+ DAGClientTimelineImpl
+ httpClient = new DAGClientTimelineImpl(mock(ApplicationId.class),"EXAMPLE_DAG_ID",
+ new TezConfiguration(), null);
+ DAGClientTimelineImpl spyClient = spy(httpClient);
+ spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
+ final String expectedDagUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_DAG_ID/EXAMPLE_DAG_ID" +
+ "?fields=primaryfilters,otherinfo";
+ final String expectedVertexUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_VERTEX_ID" +
+ "?primaryFilter=TEZ_DAG_ID:EXAMPLE_DAG_ID&fields=primaryfilters,otherinfo";
+
+ Set<StatusGetOpts> statusOptions = new HashSet<StatusGetOpts>(1);
+ statusOptions.add(StatusGetOpts.GET_COUNTERS);
+
+
+ final String jsonDagData =
+ "{ " +
+ " otherinfo: { " +
+ " status: 'SUCCEEDED'," +
+ " diagnostics: 'SAMPLE_DIAGNOSTICS'," +
+ " counters: { counterGroups: [ " +
+ " { counterGroupName: 'CG1', counterGroupDisplayName: 'CGD1', counters: [" +
+ " {counterName:'C1', counterDisplayName: 'CD1', counterValue: 1 }," +
+ " {counterName:'C2', counterDisplayName: 'CD2', counterValue: 2 }" +
+ " ]}" +
+ " ]}" +
+ " }" +
+ "}";
+
+ final String jsonVertexData = "{entities:[ " +
+ "{otherinfo: {vertexName:'v1', numTasks:5,numFailedTasks:1,numSucceededTasks:2," +
+ "numKilledTasks:3,numCompletedTasks:3}}," +
+ "{otherinfo: {vertexName:'v2',numTasks:10,numFailedTasks:1,numSucceededTasks:5," +
+ "numKilledTasks:3,numCompletedTasks:4}}" +
+ "]}";
+
+ doReturn(new JSONObject(jsonDagData)).when(spyClient).getJsonRootEntity(expectedDagUrl);
+ doReturn(new JSONObject(jsonVertexData)).when(spyClient).getJsonRootEntity(expectedVertexUrl);
+
+ DAGStatus dagStatus = spyClient.getDAGStatus(statusOptions);
+
+ Assert.assertEquals("DAG State", DAGStatus.State.SUCCEEDED, dagStatus.getState());
+ Assert.assertEquals("DAG Diagnostics size", 1, dagStatus.getDiagnostics().size());
+ Assert.assertEquals("DAG diagnostics detail", "SAMPLE_DIAGNOSTICS",
+ dagStatus.getDiagnostics().get(0));
+ Assert.assertEquals("Counters Size", 2, dagStatus.getDAGCounters().countCounters());
+ Assert.assertEquals("Counter Value", 1,
+ dagStatus.getDAGCounters().getGroup("CG1").findCounter("C1").getValue());
+ Assert.assertEquals("total tasks", 15, dagStatus.getDAGProgress().getTotalTaskCount());
+ Assert.assertEquals("failed tasks", 2, dagStatus.getDAGProgress().getFailedTaskCount());
+ Assert.assertEquals("killed tasks", 6, dagStatus.getDAGProgress().getKilledTaskCount());
+ Assert.assertEquals("succeeded tasks", 7, dagStatus.getDAGProgress().getSucceededTaskCount());
+ Assert.assertEquals("running tasks", 8, dagStatus.getDAGProgress().getRunningTaskCount());
+ final Map<String, Progress> vertexProgress = dagStatus.getVertexProgress();
+ Assert.assertEquals("vertex progress count", 2, vertexProgress.size());
+ Assert.assertTrue("vertex name1", vertexProgress.containsKey("v1"));
+ Assert.assertTrue("vertex name2", vertexProgress.containsKey("v2"));
+ }
+
+ @Test
+ public void testGetVertexStatusSimple() throws JSONException, TezException, IOException {
+ DAGClientTimelineImpl
+ httpClient = new DAGClientTimelineImpl(mock(ApplicationId.class), "EXAMPLE_DAG_ID",
+ new TezConfiguration(), null);
+ DAGClientTimelineImpl spyClient = spy(httpClient);
+ spyClient.baseUri = "http://yarn.ats.webapp/ws/v1/timeline";
+ final String expectedVertexUrl = "http://yarn.ats.webapp/ws/v1/timeline/TEZ_VERTEX_ID" +
+ "?primaryFilter=TEZ_DAG_ID:EXAMPLE_DAG_ID&secondaryFilter=vertexName:vertex1name&" +
+ "fields=primaryfilters,otherinfo";
+
+ Set<StatusGetOpts> statusOptions = new HashSet<StatusGetOpts>(1);
+ statusOptions.add(StatusGetOpts.GET_COUNTERS);
+
+ final String jsonData = "{entities:[ {otherinfo:{numFailedTasks:1,numSucceededTasks:2," +
+ "status:'SUCCEEDED', vertexName:'vertex1name', numTasks:4, numKilledTasks: 3, " +
+ "numCompletedTasks: 4, diagnostics: 'diagnostics1', " +
+ "counters: { counterGroups: [ " +
+ " { counterGroupName: 'CG1', counterGroupDisplayName: 'CGD1', counters: [" +
+ " {counterName:'C1', counterDisplayName: 'CD1', counterValue: 1 }," +
+ " {counterName:'C2', counterDisplayName: 'CD2', counterValue: 2 }" +
+ " ]}" +
+ " ]}" +
+ "}}]}";
+
+ doReturn(new JSONObject(jsonData)).when(spyClient).getJsonRootEntity(expectedVertexUrl);
+
+ VertexStatus vertexStatus = spyClient.getVertexStatus("vertex1name", statusOptions);
+ Assert.assertEquals("status check", VertexStatus.State.SUCCEEDED, vertexStatus.getState());
+ Assert.assertEquals("diagnostics", "diagnostics1", vertexStatus.getDiagnostics().get(0));
+ final Progress progress = vertexStatus.getProgress();
+ final TezCounters vertexCounters = vertexStatus.getVertexCounters();
+ Assert.assertEquals("failed task count", 1, progress.getFailedTaskCount());
+ Assert.assertEquals("suceeded task count", 2, progress.getSucceededTaskCount());
+ Assert.assertEquals("killed task count", 3, progress.getKilledTaskCount());
+ Assert.assertEquals("total task count", 4, progress.getTotalTaskCount());
+ Assert.assertEquals("Counters Size", 2, vertexCounters.countCounters());
+ Assert.assertEquals("Counter Value", 1,
+ vertexCounters.getGroup("CG1").findCounter("C1").getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 505e044..a23c1d5 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGClientImpl;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
@@ -61,7 +63,7 @@ import com.google.protobuf.ServiceException;
public class TestDAGClient {
- private DAGClientRPCImpl dagClient;
+ private DAGClient dagClient;
private ApplicationId mockAppId;
private ApplicationReport mockAppReport;
private String dagIdStr;
@@ -188,17 +190,17 @@ public class TestDAGClient {
when(mockProxy.getVertexStatus(isNull(RpcController.class), argThat(new VertexCounterRequestMatcher())))
.thenReturn(GetVertexStatusResponseProto.newBuilder().setVertexStatus(vertexStatusProtoWithCounters).build());
-
-
- dagClient = new DAGClientRPCImpl(mockAppId, dagIdStr, new TezConfiguration(), null);
- dagClient.appReport = mockAppReport;
- ((DAGClientRPCImpl)dagClient).proxy = mockProxy;
+ dagClient = new DAGClientImpl(mockAppId, dagIdStr, new TezConfiguration(), null);
+ DAGClientRPCImpl realClient = (DAGClientRPCImpl)((DAGClientImpl)dagClient).getRealClient();
+ realClient.appReport = mockAppReport;
+ realClient.proxy = mockProxy;
}
@Test
public void testApp() throws IOException, TezException, ServiceException{
assertTrue(dagClient.getExecutionContext().contains(mockAppId.toString()));
- assertEquals(mockAppReport, dagClient.getApplicationReportInternal());
+ DAGClientRPCImpl realClient = (DAGClientRPCImpl)((DAGClientImpl)dagClient).getRealClient();
+ assertEquals(mockAppReport, realClient.getApplicationReportInternal());
}
@Test
@@ -226,7 +228,8 @@ public class TestDAGClient {
resultVertexStatus = dagClient.getVertexStatus("v1", Sets.newSet(StatusGetOpts.GET_COUNTERS));
verify(mockProxy).getVertexStatus(null, GetVertexStatusRequestProto.newBuilder()
- .setDagId(dagIdStr).setVertexName("v1").addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
+ .setDagId(dagIdStr).setVertexName("v1").addStatusOptions(StatusGetOptsProto.GET_COUNTERS)
+ .build());
assertEquals(new VertexStatus(vertexStatusProtoWithCounters), resultVertexStatus);
System.out.println("VertexWithCounter:" + resultVertexStatus);
}
@@ -249,7 +252,7 @@ public class TestDAGClient {
dagClient.waitForCompletion();
verify(mockProxy, times(2)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
- .setDagId(dagIdStr).build());
+ .setDagId(dagIdStr).build());
}
@Test
@@ -257,7 +260,8 @@ public class TestDAGClient {
// first time and second time return DAG_RUNNING, third time return DAG_SUCCEEDED
when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
- .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithoutCounters).build())
+ .thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithoutCounters)
+ .build())
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithoutCounters).build())
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus
(DAGStatusProto.newBuilder(dagStatusProtoWithoutCounters).setState(DAGStatusStateProto.DAG_SUCCEEDED).build())
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index c4d3497..d14ed2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezAppMasterStatus;
+import org.apache.tez.dag.api.DAGNotRunningException;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.DAGAppMaster;
@@ -48,6 +49,10 @@ public class DAGClientHandler {
return dagAppMaster.getContext().getCurrentDAG();
}
+ private Set<String> getAllDagIDs() {
+ return dagAppMaster.getContext().getAllDAGIDs();
+ }
+
public List<String> getAllDAGs() throws TezException {
return Collections.singletonList(getCurrentDAG().getID().toString());
}
@@ -78,12 +83,20 @@ public class DAGClientHandler {
if (currentDAG == null) {
throw new TezException("No running dag at present");
}
- if (!currentDAG.getID().toString().equals(dagId.toString())) {
- LOG.warn("Current DAGID : "
- + (currentDAG.getID() == null ? "NULL" : currentDAG.getID())
- + ", Looking for string (not found): " + dagIdStr + ", dagIdObj: "
- + dagId);
- throw new TezException("Unknown dagId: " + dagIdStr);
+
+ final String currentDAGIdStr = currentDAG.getID().toString();
+ if (!currentDAGIdStr.equals(dagIdStr)) {
+ if (getAllDagIDs().contains(dagIdStr)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Looking for finished dagId " + dagIdStr + " current dag is " + currentDAGIdStr);
+ }
+ throw new DAGNotRunningException("DAG " + dagIdStr + " Not running, current dag is " +
+ currentDAGIdStr);
+ } else {
+ LOG.warn("Current DAGID : " + currentDAGIdStr + ", Looking for string (not found): " +
+ dagIdStr + ", dagIdObj: " + dagId);
+ throw new TezException("Unknown dagId: " + dagIdStr);
+ }
}
return currentDAG;
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 9cc28cb..af244c8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -65,6 +66,8 @@ public interface AppContext {
void setDAG(DAG dag);
+ Set<String> getAllDAGIDs();
+
@SuppressWarnings("rawtypes")
EventHandler getEventHandler();
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 0e4f78b..e6a1d9c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -245,6 +245,7 @@ public class DAGAppMaster extends AbstractService {
* set of already executed dag names.
*/
Set<String> dagNames = new HashSet<String>();
+ Set<String> dagIDs = new HashSet<String>();
protected boolean isLastAMRetry = false;
@@ -1165,6 +1166,11 @@ public class DAGAppMaster extends AbstractService {
}
@Override
+ public Set<String> getAllDAGIDs() {
+ return dagIDs;
+ }
+
+ @Override
public EventHandler getEventHandler() {
return eventHandler;
}
@@ -1861,6 +1867,7 @@ public class DAGAppMaster extends AbstractService {
throw new TezException(e);
}
+ dagIDs.add(currentDAG.getID().toString());
// End of creating the job.
((RunningAppContext) context).setDAG(currentDAG);
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 85851c5..220b5b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -599,6 +599,7 @@ public class RecoveryParser {
dagAppMaster.setDAGCounter(dagCounter);
for (DAGSummaryData dagSummaryData: dagSummaryDataMap.values()){
dagAppMaster.dagNames.add(dagSummaryData.dagName);
+ dagAppMaster.dagIDs.add(dagSummaryData.dagId.toString());
}
DAGSummaryData lastInProgressDAGData =
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 594c651..ab22099 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
@@ -1463,19 +1464,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
void logJobHistoryVertexFinishedEvent() throws IOException {
this.setFinishTime();
- VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
- vertexName, initTimeRequested, initedTime, startTimeRequested,
- startedTime, finishTime, VertexState.SUCCEEDED, "",
- getAllCounters(), getVertexStats());
- this.appContext.getHistoryHandler().handleCriticalEvent(
- new DAGHistoryEvent(getDAGId(), finishEvt));
+ logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, "");
}
void logJobHistoryVertexFailedEvent(VertexState state) throws IOException {
- VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
- vertexName, initTimeRequested, initedTime, startTimeRequested,
- startedTime, clock.getTime(), state, StringUtils.join(getDiagnostics(),
- LINE_SEPARATOR), getAllCounters(), getVertexStats());
+ logJobHistoryVertexCompletedHelper(state, clock.getTime(),
+ StringUtils.join(getDiagnostics(), LINE_SEPARATOR));
+ }
+
+ private void logJobHistoryVertexCompletedHelper(VertexState finalState, long finishTime,
+ String diagnostics) throws IOException {
+ Map<String, Integer> taskStats = new HashMap<String, Integer>();
+ taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, completedTaskCount);
+ taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, succeededTaskCount);
+ taskStats.put(ATSConstants.NUM_FAILED_TASKS, failedTaskCount);
+ taskStats.put(ATSConstants.NUM_KILLED_TASKS, killedTaskCount);
+
+ VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, initTimeRequested,
+ initedTime, startTimeRequested, startedTime, finishTime, finalState, diagnostics,
+ getAllCounters(), getVertexStats(), taskStats);
this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(getDAGId(), finishEvt));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index 450bcc1..d0e0e69 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.records.TezDAGID;
@@ -108,4 +109,7 @@ public class DAGStartedEvent implements HistoryEvent {
return dagName;
}
+ public DAGState getDagState() {
+ return DAGState.RUNNING;
+ }
}