You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:16 UTC
[07/50] [abbrv] tez git commit: TEZ-1716. Additional ATS data for UI.
(hitesh)
TEZ-1716. Additional ATS data for UI. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ede0e645
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ede0e645
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ede0e645
Branch: refs/heads/TEZ-8
Commit: ede0e645a9f4fdda44842a46b7fcf9edecdf50b2
Parents: a6c8006
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Oct 30 08:06:29 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Oct 30 08:06:29 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/ATSConstants.java | 5 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 11 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 56 +++++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +
.../tez/dag/history/HistoryEventType.java | 1 +
.../dag/history/events/AppLaunchedEvent.java | 104 ++++++++++
.../dag/history/events/DAGFinishedEvent.java | 11 +-
.../dag/history/events/DAGInitializedEvent.java | 10 +-
.../dag/history/events/DAGSubmittedEvent.java | 8 +-
.../dag/history/events/TaskFinishedEvent.java | 1 +
.../tez/dag/history/logging/EntityTypes.java | 1 +
.../impl/HistoryEventJsonConversion.java | 51 ++++-
.../apache/tez/dag/history/utils/DAGUtils.java | 28 +--
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 4 +-
.../TestHistoryEventsProtoConversion.java | 30 ++-
.../impl/TestHistoryEventJsonConversion.java | 12 +-
.../tez/dag/history/utils/TestDAGUtils.java | 5 +-
.../ats/HistoryEventTimelineConversion.java | 57 +++++-
.../ats/TestHistoryEventTimelineConversion.java | 188 ++++++++++++++++++-
20 files changed, 533 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ca1da2e..5d6ebf5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -89,6 +89,7 @@ ALL CHANGES:
invocations
TEZ-1700. Replace containerId from TaskLocationHint with [TaskIndex+Vertex]
based affinity
+ TEZ-1716. Additional ATS data for UI.
Release 0.5.1: 2014-10-02
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index ab81683..58761d5 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -50,9 +50,11 @@ public class ATSConstants {
public static final String APP_SUBMIT_TIME = "appSubmitTime";
/* Tez-specific info */
+ public static final String CONFIG = "config";
public static final String DAG_PLAN = "dagPlan";
public static final String DAG_NAME = "dagName";
public static final String VERTEX_NAME = "vertexName";
+ public static final String VERTEX_NAME_ID_MAPPING = "vertexNameIdMapping";
public static final String SCHEDULED_TIME = "scheduledTime";
public static final String INIT_REQUESTED_TIME = "initRequestedTime";
public static final String INIT_TIME = "initTime";
@@ -62,6 +64,7 @@ public class ATSConstants {
public static final String TIME_TAKEN = "timeTaken";
public static final String STATUS = "status";
public static final String DIAGNOSTICS = "diagnostics";
+ public static final String SUCCESSFUL_ATTEMPT_ID = "successfulAttemptId";
public static final String COUNTERS = "counters";
public static final String STATS = "stats";
public static final String NUM_TASKS = "numTasks";
@@ -70,6 +73,8 @@ public class ATSConstants {
public static final String NUM_SUCCEEDED_TASKS = "numSucceededTasks";
public static final String NUM_FAILED_TASKS = "numFailedTasks";
public static final String NUM_KILLED_TASKS = "numKilledTasks";
+ public static final String NUM_FAILED_TASKS_ATTEMPTS = "numFailedTaskAttempts";
+ public static final String NUM_KILLED_TASKS_ATTEMPTS = "numKilledTaskAttempts";
public static final String PROCESSOR_CLASS_NAME = "processorClassName";
public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
public static final String COMPLETED_LOGS_URL = "completedLogsURL";
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 20da85b..789de24 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -145,6 +145,7 @@ import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
@@ -459,6 +460,12 @@ public class DAGAppMaster extends AbstractService {
super.serviceInit(conf);
if (!versionMismatch) {
+ if (this.appAttemptID.getAttemptId() == 1) {
+ AppLaunchedEvent appLaunchedEvent = new AppLaunchedEvent(appAttemptID.getApplicationId(),
+ startTime, appSubmitTime, appMasterUgi.getShortUserName(), this.amConf);
+ historyEventHandler.handle(
+ new DAGHistoryEvent(appLaunchedEvent));
+ }
AMLaunchedEvent launchedEvent = new AMLaunchedEvent(appAttemptID,
startTime, appSubmitTime, appMasterUgi.getShortUserName());
historyEventHandler.handle(
@@ -749,7 +756,7 @@ public class DAGAppMaster extends AbstractService {
if (LOG.isDebugEnabled()) {
LOG.info("JSON dump for submitted DAG, dagId=" + dagId.toString()
+ ", json="
- + DAGUtils.generateSimpleJSONPlan(dagPB, newDag.getVertexNameIDMapping()).toString());
+ + DAGUtils.generateSimpleJSONPlan(dagPB).toString());
}
} catch (JSONException e) {
LOG.warn("Failed to generate json for DAG", e);
@@ -1915,7 +1922,7 @@ public class DAGAppMaster extends AbstractService {
// for an app later
DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
- newDAG.getUserName(), newDAG.getVertexNameIDMapping());
+ newDAG.getUserName());
try {
historyEventHandler.handleCriticalEvent(
new DAGHistoryEvent(newDAG.getID(), submittedEvent));
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index d3aecd4..6dccf3a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeProperty;
@@ -716,6 +717,40 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
+ private ProgressBuilder getDAGProgress() {
+ int totalTaskCount = 0;
+ int totalSucceededTaskCount = 0;
+ int totalRunningTaskCount = 0;
+ int totalFailedTaskCount = 0;
+ int totalKilledTaskCount = 0;
+ int totalFailedTaskAttemptCount = 0;
+ int totalKilledTaskAttemptCount = 0;
+ readLock.lock();
+ try {
+ for(Map.Entry<String, Vertex> entry : vertexMap.entrySet()) {
+ ProgressBuilder progress = entry.getValue().getVertexProgress();
+ totalTaskCount += progress.getTotalTaskCount();
+ totalSucceededTaskCount += progress.getSucceededTaskCount();
+ totalRunningTaskCount += progress.getRunningTaskCount();
+ totalFailedTaskCount += progress.getFailedTaskCount();
+ totalKilledTaskCount += progress.getKilledTaskCount();
+ totalFailedTaskAttemptCount += progress.getFailedTaskAttemptCount();
+ totalKilledTaskAttemptCount += progress.getKilledTaskAttemptCount();
+ }
+ ProgressBuilder dagProgress = new ProgressBuilder();
+ dagProgress.setTotalTaskCount(totalTaskCount);
+ dagProgress.setSucceededTaskCount(totalSucceededTaskCount);
+ dagProgress.setRunningTaskCount(totalRunningTaskCount);
+ dagProgress.setFailedTaskCount(totalFailedTaskCount);
+ dagProgress.setKilledTaskCount(totalKilledTaskCount);
+ dagProgress.setFailedTaskAttemptCount(totalFailedTaskAttemptCount);
+ dagProgress.setKilledTaskAttemptCount(totalKilledTaskAttemptCount);
+ return dagProgress;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
@Override
public VertexStatusBuilder getVertexStatus(String vertexName,
Set<StatusGetOpts> statusOptions) {
@@ -940,18 +975,32 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
finishTime = clock.getTime();
}
+ private Map<String, Integer> constructTaskStats(ProgressBuilder progressBuilder) {
+ Map<String, Integer> taskStats = new HashMap<String, Integer>();
+ taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, progressBuilder.getTotalTaskCount());
+ taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, progressBuilder.getSucceededTaskCount());
+ taskStats.put(ATSConstants.NUM_FAILED_TASKS, progressBuilder.getFailedTaskCount());
+ taskStats.put(ATSConstants.NUM_KILLED_TASKS, progressBuilder.getKilledTaskCount());
+ taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS,
+ progressBuilder.getFailedTaskAttemptCount());
+ taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS,
+ progressBuilder.getKilledTaskAttemptCount());
+ return taskStats;
+ }
+
void logJobHistoryFinishedEvent() throws IOException {
this.setFinishTime();
+ Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
finishTime, DAGState.SUCCEEDED, "", getAllCounters(),
- this.userName, this.dagName);
+ this.userName, this.dagName, taskStats);
this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(dagId, finishEvt));
}
void logJobHistoryInitedEvent() {
DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId,
- this.initTime, this.userName, this.dagName);
+ this.initTime, this.userName, this.dagName, this.getVertexNameIDMapping());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(dagId, initEvt));
}
@@ -964,10 +1013,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException {
+ Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
clock.getTime(), state,
StringUtils.join(getDiagnostics(), LINE_SEPARATOR),
- getAllCounters(), this.userName, this.dagName);
+ getAllCounters(), this.userName, this.dagName, taskStats);
this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(dagId, finishEvt));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 5c76a77..4edd12b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1520,6 +1520,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, succeededTaskCount);
taskStats.put(ATSConstants.NUM_FAILED_TASKS, failedTaskCount);
taskStats.put(ATSConstants.NUM_KILLED_TASKS, killedTaskCount);
+ taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, failedTaskAttemptCount.get());
+ taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS, killedTaskAttemptCount.get());
VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, initTimeRequested,
initedTime, startTimeRequested, startedTime, finishTime, finalState, diagnostics,
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index fd747e0..17df58f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.history;
public enum HistoryEventType {
+ APP_LAUNCHED,
AM_LAUNCHED,
AM_STARTED,
DAG_SUBMITTED,
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
new file mode 100644
index 0000000..4c79c53
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+
+public class AppLaunchedEvent implements HistoryEvent {
+
+ private ApplicationId applicationId;
+ private long launchTime;
+ private long appSubmitTime;
+ private String user;
+ private Configuration conf;
+
+ public AppLaunchedEvent() {
+ }
+
+ public AppLaunchedEvent(ApplicationId appId,
+ long launchTime, long appSubmitTime, String user,
+ Configuration conf) {
+ this.applicationId = appId;
+ this.launchTime = launchTime;
+ this.appSubmitTime = appSubmitTime;
+ this.user = user;
+ this.conf = conf;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.APP_LAUNCHED;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return false;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return true;
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ throw new UnsupportedOperationException("Not a recovery event");
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ throw new UnsupportedOperationException("Not a recovery event");
+ }
+
+ @Override
+ public String toString() {
+ return "applicationId=" + applicationId
+ + ", appSubmitTime=" + appSubmitTime
+ + ", launchTime=" + launchTime;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+ public long getLaunchTime() {
+ return launchTime;
+ }
+
+ public long getAppSubmitTime() {
+ return appSubmitTime;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 21199f4..e05f043 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -51,13 +52,15 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
private String user;
private String dagName;
+ Map<String, Integer> dagTaskStats;
+
public DAGFinishedEvent() {
}
public DAGFinishedEvent(TezDAGID dagId, long startTime,
long finishTime, DAGState state,
String diagnostics, TezCounters counters,
- String user, String dagName) {
+ String user, String dagName, Map<String, Integer> dagTaskStats) {
this.dagID = dagId;
this.startTime = startTime;
this.finishTime = finishTime;
@@ -66,6 +69,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
this.tezCounters = counters;
this.user = user;
this.dagName = dagName;
+ this.dagTaskStats = dagTaskStats;
}
@Override
@@ -194,4 +198,9 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
public String getDagName() {
return dagName;
}
+
+ public Map<String, Integer> getDagTaskStats() {
+ return dagTaskStats;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 6e17da8..98d64d3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -21,10 +21,12 @@ package org.apache.tez.dag.history.events;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
public class DAGInitializedEvent implements HistoryEvent {
@@ -33,16 +35,18 @@ public class DAGInitializedEvent implements HistoryEvent {
private long initTime;
private String user;
private String dagName;
+ private Map<String, TezVertexID> vertexNameIDMap;
public DAGInitializedEvent() {
}
public DAGInitializedEvent(TezDAGID dagID, long initTime,
- String user, String dagName) {
+ String user, String dagName, Map<String, TezVertexID> vertexNameIDMap) {
this.dagID = dagID;
this.initTime = initTime;
this.user = user;
this.dagName = dagName;
+ this.vertexNameIDMap = vertexNameIDMap;
}
@Override
@@ -109,4 +113,8 @@ public class DAGInitializedEvent implements HistoryEvent {
return dagName;
}
+ public Map<String, TezVertexID> getVertexNameIDMap() {
+ return vertexNameIDMap;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 0074a4e..7f0fab3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -54,7 +54,6 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
private ApplicationAttemptId applicationAttemptId;
private String user;
private Map<String, LocalResource> cumulativeAdditionalLocalResources;
- private Map<String, TezVertexID> vertexNameIDMap;
public DAGSubmittedEvent() {
}
@@ -62,7 +61,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
Map<String, LocalResource> cumulativeAdditionalLocalResources,
- String user, Map<String, TezVertexID> vertexNameIDMap) {
+ String user) {
this.dagID = dagID;
this.dagName = dagPlan.getName();
this.submitTime = submitTime;
@@ -70,7 +69,6 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
this.applicationAttemptId = applicationAttemptId;
this.cumulativeAdditionalLocalResources = cumulativeAdditionalLocalResources;
this.user = user;
- this.vertexNameIDMap = vertexNameIDMap;
}
@Override
@@ -185,8 +183,4 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
return user;
}
- public Map<String, TezVertexID> getVertexNameIDMap() {
- return vertexNameIDMap;
- }
-
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index 9323270..c367d5c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -56,6 +56,7 @@ public class TaskFinishedEvent implements HistoryEvent {
this.state = state;
this.diagnostics = diagnostics;
this.tezCounters = counters;
+ this.successfulAttemptID = successfulAttemptID;
}
public TaskFinishedEvent() {
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
index 00cac28..e2f0882 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.history.logging;
public enum EntityTypes {
+ TEZ_APPLICATION,
TEZ_APPLICATION_ATTEMPT,
TEZ_CONTAINER_ID,
TEZ_DAG_ID,
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index ad42392..0b6f9d2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Map.Entry;
+import java.util.TreeMap;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
@@ -30,6 +31,7 @@ import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
@@ -60,6 +62,9 @@ public class HistoryEventJsonConversion {
}
JSONObject jsonObject = null;
switch (historyEvent.getEventType()) {
+ case APP_LAUNCHED:
+ jsonObject = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
+ break;
case AM_LAUNCHED:
jsonObject = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
break;
@@ -122,6 +127,24 @@ public class HistoryEventJsonConversion {
return jsonObject;
}
+ private static JSONObject convertAppLaunchedEvent(AppLaunchedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + event.getApplicationId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION.name());
+
+ // Other info to tag with Tez App
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.USER, event.getUser());
+ otherInfo.put(ATSConstants.CONFIG, new JSONObject(
+ DAGUtils.convertConfigurationToATSMap(event.getConf())));
+
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
private static JSONObject convertAMLaunchedEvent(AMLaunchedEvent event) throws JSONException {
JSONObject jsonObject = new JSONObject();
jsonObject.put(ATSConstants.ENTITY,
@@ -307,6 +330,14 @@ public class HistoryEventJsonConversion {
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getTezCounters()));
+
+ final Map<String, Integer> dagTaskStats = event.getDagTaskStats();
+ if (dagTaskStats != null) {
+ for(Entry<String, Integer> entry : dagTaskStats.entrySet()) {
+ otherInfo.put(entry.getKey(), entry.getValue().intValue());
+ }
+ }
+
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
@@ -330,6 +361,17 @@ public class HistoryEventJsonConversion {
events.put(initEvent);
jsonObject.put(ATSConstants.EVENTS, events);
+ JSONObject otherInfo = new JSONObject();
+
+ if (event.getVertexNameIDMap() != null) {
+ Map<String, String> nameIdStrMap = new TreeMap<String, String>();
+ for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) {
+ nameIdStrMap.put(entry.getKey(), entry.getValue().toString());
+ }
+ otherInfo.put(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap);
+ }
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
return jsonObject;
}
@@ -411,7 +453,7 @@ public class HistoryEventJsonConversion {
// Other info such as dag plan
JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.DAG_PLAN,
- DAGUtils.generateSimpleJSONPlan(event.getDAGPlan(), event.getVertexNameIDMap()));
+ DAGUtils.generateSimpleJSONPlan(event.getDAGPlan()));
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
@@ -510,6 +552,9 @@ public class HistoryEventJsonConversion {
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getTezCounters()));
+ if (event.getSuccessfulAttemptID() != null) {
+ otherInfo.put(ATSConstants.SUCCESSFUL_ATTEMPT_ID, event.getSuccessfulAttemptID().toString());
+ }
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
@@ -576,8 +621,8 @@ public class HistoryEventJsonConversion {
final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
if (vertexTaskStats != null) {
- for(String key : vertexTaskStats.keySet()) {
- otherInfo.put(key, vertexTaskStats.get(key));
+ for(Entry<String, Integer> entry : vertexTaskStats.entrySet()) {
+ otherInfo.put(entry.getKey(), entry.getValue().intValue());
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 115f739..0bcbcbe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -22,9 +22,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
@@ -85,11 +89,10 @@ public class DAGUtils {
- public static JSONObject generateSimpleJSONPlan(DAGPlan dagPlan,
- Map<String, TezVertexID> vertexNameIDMap) throws JSONException {
+ public static JSONObject generateSimpleJSONPlan(DAGPlan dagPlan) throws JSONException {
JSONObject dagJson;
try {
- dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan, vertexNameIDMap));
+ dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan));
} catch (IOException e) {
throw new TezUncheckedException(e);
}
@@ -129,8 +132,7 @@ public class DAGUtils {
return object;
}
- public static Map<String,Object> convertDAGPlanToATSMap(
- DAGPlan dagPlan, Map<String, TezVertexID> vertexNameIDMap) throws IOException {
+ public static Map<String,Object> convertDAGPlanToATSMap(DAGPlan dagPlan) throws IOException {
final String VERSION_KEY = "version";
final int version = 1;
@@ -141,12 +143,6 @@ public class DAGUtils {
for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
Map<String,Object> vertexMap = new LinkedHashMap<String, Object>();
vertexMap.put(VERTEX_NAME_KEY, vertexPlan.getName());
- if (vertexNameIDMap != null && !vertexNameIDMap.isEmpty()) {
- TezVertexID vertexID = vertexNameIDMap.get(vertexPlan.getName());
- if (vertexID != null) {
- vertexMap.put(VERTEX_ID_KEY, vertexID.toString());
- }
- }
if (vertexPlan.hasProcessorDescriptor()) {
vertexMap.put(PROCESSOR_CLASS_KEY,
vertexPlan.getProcessorDescriptor().getClassName());
@@ -361,4 +357,14 @@ public class DAGUtils {
return jsonDescriptor;
}
+ public static Map<String, String> convertConfigurationToATSMap(Configuration conf) {
+ Iterator<Entry<String, String>> iter = conf.iterator();
+ Map<String, String> atsConf = new TreeMap<String, String>();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ atsConf.put(entry.getKey(), entry.getValue());
+ }
+ return atsConf;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index f05a330..1edefa7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -100,7 +100,7 @@ public class TestDAGRecovery {
private void restoreFromDAGInitializedEvent() {
DAGState recoveredState =
dag.restoreFromEvent(new DAGInitializedEvent(dagId, initTime, user,
- dagName));
+ dagName, null));
assertEquals(DAGState.INITED, recoveredState);
assertEquals(initTime, dag.initTime);
assertEquals(6, dag.getVertices().size());
@@ -144,7 +144,7 @@ public class TestDAGRecovery {
private void restoreFromDAGFinishedEvent(DAGState finalState) {
DAGState recoveredState =
dag.restoreFromEvent(new DAGFinishedEvent(dagId, startTime, finishTime,
- finalState, "", tezCounters, user, dagName));
+ finalState, "", tezCounters, user, dagName, null));
assertEquals(finishTime, dag.finishTime);
assertFalse(dag.recoveryCommitInProgress);
assertEquals(finalState, recoveredState);
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index ad508b6..a7a23db 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -18,9 +18,12 @@
package org.apache.tez.dag.history.events;
+import static org.junit.Assert.fail;
+
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -114,6 +117,20 @@ public class TestHistoryEventsProtoConversion {
LOG.info("Deserialized Event toString: " + deserializedEvent.toString());
}
+ private void testAppLaunchedEvent() throws Exception {
+ AppLaunchedEvent event = new AppLaunchedEvent(ApplicationId.newInstance(0, 1),
+ 100, 100, null, new Configuration(false));
+ try {
+ testProtoConversion(event);
+ fail("Expected to fail on conversion");
+ } catch (UnsupportedOperationException e) {
+ // Expected
+ }
+
+ LOG.info("Initial Event toString: " + event.toString());
+
+ }
+
private void testAMLaunchedEvent() throws Exception {
AMLaunchedEvent event = new AMLaunchedEvent(
ApplicationAttemptId.newInstance(
@@ -148,7 +165,7 @@ public class TestHistoryEventsProtoConversion {
ApplicationId.newInstance(0, 1), 1), 1001l,
DAGPlan.newBuilder().setName("foo").build(),
ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(0, 1), 1), null, "", null);
+ ApplicationId.newInstance(0, 1), 1), null, "");
DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getApplicationAttemptId(),
@@ -167,7 +184,7 @@ public class TestHistoryEventsProtoConversion {
private void testDAGInitializedEvent() throws Exception {
DAGInitializedEvent event = new DAGInitializedEvent(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,
- "user", "dagName");
+ "user", "dagName", null);
DAGInitializedEvent deserializedEvent = (DAGInitializedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getDagID(),
@@ -192,7 +209,7 @@ public class TestHistoryEventsProtoConversion {
{
DAGFinishedEvent event = new DAGFinishedEvent(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
- DAGState.FAILED, null, null, "user", "dagName");
+ DAGState.FAILED, null, null, "user", "dagName", null);
DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(
@@ -213,7 +230,7 @@ public class TestHistoryEventsProtoConversion {
DAGFinishedEvent event = new DAGFinishedEvent(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
DAGState.FAILED, "bad diagnostics", tezCounters,
- "user", "dagName");
+ "user", "dagName", null);
DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(
@@ -531,7 +548,7 @@ public class TestHistoryEventsProtoConversion {
event = new VertexRecoverableEventsGeneratedEvent(
TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), null);
- Assert.fail("Invalid creation should have errored out");
+ fail("Invalid creation should have errored out");
} catch (RuntimeException e) {
// Expected
}
@@ -617,6 +634,9 @@ public class TestHistoryEventsProtoConversion {
public void testDefaultProtoConversion() throws Exception {
for (HistoryEventType eventType : HistoryEventType.values()) {
switch (eventType) {
+ case APP_LAUNCHED:
+ testAppLaunchedEvent();
+ break;
case AM_LAUNCHED:
testAMLaunchedEvent();
break;
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index c9384e1..c3d51c3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -38,6 +39,7 @@ import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
@@ -101,6 +103,10 @@ public class TestHistoryEventJsonConversion {
for (HistoryEventType eventType : HistoryEventType.values()) {
HistoryEvent event = null;
switch (eventType) {
+ case APP_LAUNCHED:
+ event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(),
+ user, new Configuration(false));
+ break;
case AM_LAUNCHED:
event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
user);
@@ -110,17 +116,17 @@ public class TestHistoryEventJsonConversion {
break;
case DAG_SUBMITTED:
event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
- null, user, null);
+ null, user);
break;
case DAG_INITIALIZED:
- event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+ event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
break;
case DAG_STARTED:
event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
break;
case DAG_FINISHED:
event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
- null, null, user, dagPlan.getName());
+ null, null, user, dagPlan.getName(), null);
break;
case VERTEX_INITIALIZED:
event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index e50c67c..6f4202a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -108,7 +108,7 @@ public class TestDAGUtils {
idNameMap.put("vertex2", vId2);
idNameMap.put("vertex3", vId3);
- Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan, idNameMap);
+ Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan);
Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY));
Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY));
Assert.assertTrue(atsMap.containsKey("version"));
@@ -128,10 +128,7 @@ public class TestDAGUtils {
for (Object o : ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY))) {
Map<String, Object> v = (Map<String, Object>) o;
Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_NAME_KEY));
- Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_ID_KEY));
- String vId = (String)v.get(DAGUtils.VERTEX_ID_KEY);
String vName = (String)v.get(DAGUtils.VERTEX_NAME_KEY);
- Assert.assertEquals(idNameMap.get(vName).toString(), vId);
Assert.assertTrue(v.containsKey(DAGUtils.PROCESSOR_CLASS_KEY));
Assert.assertTrue(v.containsKey(DAGUtils.USER_PAYLOAD_AS_TEXT));
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 1d569d6..a492408 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.TreeMap;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
@@ -32,6 +33,7 @@ import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
@@ -48,6 +50,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TezVertexID;
public class HistoryEventTimelineConversion {
@@ -58,6 +61,9 @@ public class HistoryEventTimelineConversion {
}
TimelineEntity timelineEntity = null;
switch (historyEvent.getEventType()) {
+ case APP_LAUNCHED:
+ timelineEntity = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
+ break;
case AM_LAUNCHED:
timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
break;
@@ -121,6 +127,24 @@ public class HistoryEventTimelineConversion {
return timelineEntity;
}
+ private static TimelineEntity convertAppLaunchedEvent(AppLaunchedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId("tez_"
+ + event.getApplicationId().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION.name());
+
+ atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+ event.getApplicationId().toString());
+ atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+ atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+
+ atsEntity.addOtherInfo(ATSConstants.CONFIG,
+ DAGUtils.convertConfigurationToATSMap(event.getConf()));
+
+ return atsEntity;
+ }
+
private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) {
TimelineEntity atsEntity = new TimelineEntity();
atsEntity.setEntityId("tez_"
@@ -234,6 +258,13 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.COUNTERS,
DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+ final Map<String, Integer> dagTaskStats = event.getDagTaskStats();
+ if (dagTaskStats != null) {
+ for(Entry<String, Integer> entry : dagTaskStats.entrySet()) {
+ atsEntity.addOtherInfo(entry.getKey(), entry.getValue().intValue());
+ }
+ }
+
return atsEntity;
}
@@ -242,16 +273,24 @@ public class HistoryEventTimelineConversion {
atsEntity.setEntityId(event.getDagID().toString());
atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
- TimelineEvent finishEvt = new TimelineEvent();
- finishEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name());
- finishEvt.setTimestamp(event.getInitTime());
- atsEntity.addEvent(finishEvt);
+ TimelineEvent initEvt = new TimelineEvent();
+ initEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name());
+ initEvt.setTimestamp(event.getInitTime());
+ atsEntity.addEvent(initEvt);
atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime());
+ if (event.getVertexNameIDMap() != null) {
+ Map<String, String> nameIdStrMap = new TreeMap<String, String>();
+ for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) {
+ nameIdStrMap.put(entry.getKey(), entry.getValue().toString());
+ }
+ atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap);
+ }
+
return atsEntity;
}
@@ -301,7 +340,7 @@ public class HistoryEventTimelineConversion {
try {
atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
- DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan(), event.getVertexNameIDMap()));
+ DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
} catch (IOException e) {
throw new TezUncheckedException(e);
}
@@ -396,6 +435,10 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+ if (event.getSuccessfulAttemptID() != null) {
+ atsEntity.addOtherInfo(ATSConstants.SUCCESSFUL_ATTEMPT_ID,
+ event.getSuccessfulAttemptID().toString());
+ }
atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
atsEntity.addOtherInfo(ATSConstants.COUNTERS,
@@ -459,8 +502,8 @@ public class HistoryEventTimelineConversion {
final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
if (vertexTaskStats != null) {
- for(String key : vertexTaskStats.keySet()) {
- atsEntity.addOtherInfo(key, vertexTaskStats.get(key));
+ for(Entry<String, Integer> entry : vertexTaskStats.entrySet()) {
+ atsEntity.addOtherInfo(entry.getKey(), entry.getValue().intValue());
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ede0e645/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 8f95c1e..ba71d46 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -35,10 +36,12 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.AppLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
@@ -101,6 +104,10 @@ public class TestHistoryEventTimelineConversion {
for (HistoryEventType eventType : HistoryEventType.values()) {
HistoryEvent event = null;
switch (eventType) {
+ case APP_LAUNCHED:
+ event = new AppLaunchedEvent(applicationId, random.nextInt(), random.nextInt(),
+ user, new Configuration(false));
+ break;
case AM_LAUNCHED:
event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
user);
@@ -110,17 +117,17 @@ public class TestHistoryEventTimelineConversion {
break;
case DAG_SUBMITTED:
event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
- null, user, null);
+ null, user);
break;
case DAG_INITIALIZED:
- event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+ event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
break;
case DAG_STARTED:
event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
break;
case DAG_FINISHED:
event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
- null, null, user, dagPlan.getName());
+ null, null, user, dagPlan.getName(), null);
break;
case VERTEX_INITIALIZED:
event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
@@ -185,6 +192,41 @@ public class TestHistoryEventTimelineConversion {
}
@Test
+ public void testConvertAppLaunchedEvent() {
+ long launchTime = random.nextLong();
+ long submitTime = random.nextLong();
+ Configuration conf = new Configuration(false);
+ conf.set("foo", "bar");
+ conf.set("applicationId", "1234");
+
+
+ AppLaunchedEvent event = new AppLaunchedEvent(applicationId, launchTime,
+ submitTime, user, conf);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+
+ Assert.assertEquals(EntityTypes.TEZ_APPLICATION.name(), timelineEntity.getEntityType());
+ Assert.assertEquals("tez_" + applicationId.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(2, timelineEntity.getRelatedEntities().size());
+ Assert.assertTrue(timelineEntity.getRelatedEntities().get(ATSConstants.USER).contains(user));
+ Assert.assertTrue(
+ timelineEntity.getRelatedEntities().get(ATSConstants.APPLICATION_ID).contains(
+ applicationId.toString()));
+
+ Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+ Assert.assertEquals(1, timelineEntity.getOtherInfo().size());
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.CONFIG));
+
+ Map<String, String> config =
+ (Map<String, String>)timelineEntity.getOtherInfo().get(ATSConstants.CONFIG);
+ Assert.assertEquals(conf.get("foo"), config.get("foo"));
+ Assert.assertEquals(conf.get("applicationId"), config.get("applicationId"));
+ }
+
+ @Test
public void testConvertContainerLaunchedEvent() {
long launchTime = random.nextLong();
ContainerLaunchedEvent event = new ContainerLaunchedEvent(containerId, launchTime,
@@ -219,7 +261,7 @@ public class TestHistoryEventTimelineConversion {
long submitTime = random.nextLong();
DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
- applicationAttemptId, null, user, null);
+ applicationAttemptId, null, user);
TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
@@ -263,6 +305,92 @@ public class TestHistoryEventTimelineConversion {
}
@Test
+ public void testConvertDAGInitializedEvent() {
+ long initTime = random.nextLong();
+
+ Map<String,TezVertexID> nameIdMap = new HashMap<String, TezVertexID>();
+ nameIdMap.put("foo", tezVertexID);
+
+ DAGInitializedEvent event = new DAGInitializedEvent(tezDAGID, initTime, "user", "dagName",
+ nameIdMap);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.DAG_INITIALIZED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(initTime, timelineEvent.getTimestamp());
+
+ Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains("dagName"));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(
+ ATSConstants.VERTEX_NAME_ID_MAPPING));
+ Map<String, String> vIdMap = (Map<String, String>) timelineEntity.getOtherInfo().get(
+ ATSConstants.VERTEX_NAME_ID_MAPPING);
+ Assert.assertEquals(1, vIdMap.size());
+ Assert.assertNotNull(vIdMap.containsKey("foo"));
+ Assert.assertEquals(tezVertexID.toString(), vIdMap.get("foo"));
+
+ }
+
+ @Test
+ public void testConvertDAGFinishedEvent() {
+ long finishTime = random.nextLong();
+ long startTime = random.nextLong();
+ Map<String,Integer> taskStats = new HashMap<String, Integer>();
+ taskStats.put("FOO", 100);
+ taskStats.put("BAR", 200);
+
+ DAGFinishedEvent event = new DAGFinishedEvent(tezDAGID, startTime, finishTime, DAGState.ERROR,
+ "diagnostics", null, user, dagPlan.getName(), taskStats);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezDAGID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.DAG_FINISHED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+ Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains(dagPlan.getName()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
+ DAGState.ERROR.name()));
+
+ Assert.assertEquals(startTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.START_TIME)).longValue());
+ Assert.assertEquals(finishTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+ Assert.assertEquals(finishTime - startTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.COUNTERS));
+ Assert.assertEquals(DAGState.ERROR.name(),
+ timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
+ Assert.assertEquals("diagnostics",
+ timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+
+ Assert.assertEquals(100,
+ ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+ Assert.assertEquals(200,
+ ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+ }
+
+ @Test
public void testConvertVertexInitializedEvent() {
long initRequestedTime = random.nextLong();
long initedTime = random.nextLong();
@@ -306,6 +434,58 @@ public class TestHistoryEventTimelineConversion {
}
@Test
+ public void testConvertVertexFinishedEvent() {
+ long initRequestedTime = random.nextLong();
+ long initedTime = random.nextLong();
+ long startRequestedTime = random.nextLong();
+ long startTime = random.nextLong();
+ long finishTime = random.nextLong();
+ Map<String,Integer> taskStats = new HashMap<String, Integer>();
+ taskStats.put("FOO", 100);
+ taskStats.put("BAR", 200);
+ VertexStats vertexStats = new VertexStats();
+
+ VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", initRequestedTime,
+ initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR,
+ "diagnostics", null, vertexStats, taskStats);
+
+ TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
+ Assert.assertEquals(tezVertexID.toString(), timelineEntity.getEntityId());
+
+ Assert.assertEquals(0, timelineEntity.getRelatedEntities().size());
+
+ Assert.assertEquals(2, timelineEntity.getPrimaryFilters().size());
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(EntityTypes.TEZ_DAG_ID.name()).contains(
+ tezDAGID.toString()));
+ Assert.assertTrue(
+ timelineEntity.getPrimaryFilters().get(ATSConstants.STATUS).contains(
+ VertexState.ERROR.name()));
+
+ Assert.assertEquals(1, timelineEntity.getEvents().size());
+ TimelineEvent timelineEvent = timelineEntity.getEvents().get(0);
+ Assert.assertEquals(HistoryEventType.VERTEX_FINISHED.name(), timelineEvent.getEventType());
+ Assert.assertEquals(finishTime, timelineEvent.getTimestamp());
+
+ Assert.assertEquals(finishTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.FINISH_TIME)).longValue());
+ Assert.assertEquals(finishTime - startTime,
+ ((Long)timelineEntity.getOtherInfo().get(ATSConstants.TIME_TAKEN)).longValue());
+ Assert.assertEquals(VertexState.ERROR.name(),
+ timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
+ Assert.assertEquals("diagnostics",
+ timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+
+ Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS));
+
+ Assert.assertEquals(100,
+ ((Integer)timelineEntity.getOtherInfo().get("FOO")).intValue());
+ Assert.assertEquals(200,
+ ((Integer)timelineEntity.getOtherInfo().get("BAR")).intValue());
+ }
+
+ @Test
public void testConvertTaskStartedEvent() {
long scheduleTime = random.nextLong();
long startTime = random.nextLong();