You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:47:47 UTC
[06/50] [abbrv] tez git commit: TEZ-3267. Publish queue name to ATS
as part of dag summary. Contributed by Harish Jaiprakash.
TEZ-3267. Publish queue name to ATS as part of dag summary. Contributed by Harish Jaiprakash.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/16b93de8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/16b93de8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/16b93de8
Branch: refs/heads/TEZ-1190
Commit: 16b93de8f31a815cab63e0be0dc563549a688566
Parents: 11815a7
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 8 18:32:26 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 8 18:32:26 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/ATSConstants.java | 1 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 11 ++++++++-
.../dag/history/events/DAGSubmittedEvent.java | 17 +++++++++++--
.../impl/HistoryEventJsonConversion.java | 3 +++
tez-dag/src/main/proto/HistoryEvents.proto | 1 +
.../apache/tez/dag/app/TestRecoveryParser.java | 26 ++++++++++----------
.../dag/history/TestHistoryEventHandler.java | 2 +-
.../TestHistoryEventsProtoConversion.java | 4 ++-
.../impl/TestHistoryEventJsonConversion.java | 12 +++++++--
.../history/recovery/TestRecoveryService.java | 2 +-
.../ats/acls/TestATSHistoryWithACLs.java | 4 +--
.../ats/TestATSV15HistoryLoggingService.java | 2 +-
.../ats/HistoryEventTimelineConversion.java | 3 +++
.../ats/TestATSHistoryLoggingService.java | 2 +-
.../ats/TestHistoryEventTimelineConversion.java | 10 +++++---
16 files changed, 72 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0949339..215cb08 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3267. Publish queue name to ATS as part of dag summary.
TEZ-3609. Improve ATSv15 performance for DAG entities read calls.
TEZ-3244. Allow overlap of input and output memory when they are not concurrent
TEZ-3581. Add different logger to enable suppressing logs for specific lines.
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index c56582c..03c9fa1 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -55,6 +55,7 @@ public class ATSConstants {
public static final String DAG_PLAN = "dagPlan";
public static final String DAG_NAME = "dagName";
public static final String DAG_STATE = "dagState";
+ public static final String DAG_SUBMITTED_QUEUE_NAME = "submittedQueueName";
public static final String DAG_AM_WEB_SERVICE_VERSION = "amWebServiceVersion";
public static final String RECOVERY_FAILURE_REASON = "recoveryFailureReason";
public static final String VERTEX_NAME = "vertexName";
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index eaaf18b..7f27064 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -2578,7 +2578,7 @@ public class DAGAppMaster extends AbstractService {
// for an app later
final DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
- newDAG.getUserName(), newDAG.getConf(), containerLogs);
+ newDAG.getUserName(), newDAG.getConf(), containerLogs, getSubmittedQueueName());
boolean dagLoggingEnabled = newDAG.getConf().getBoolean(
TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED,
TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT);
@@ -2671,6 +2671,15 @@ public class DAGAppMaster extends AbstractService {
});
}
+ private String getSubmittedQueueName() {
+ // TODO: Replace this with constant once the yarn patch is backported. (JIRA: TEZ-3279)
+ String submittedQueueName = System.getenv("YARN_RESOURCEMANAGER_APPLICATION_QUEUE");
+ if (submittedQueueName == null) {
+ submittedQueueName = amConf.get(TezConfiguration.TEZ_QUEUE_NAME);
+ }
+ return submittedQueueName;
+ }
+
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
dispatcher.getEventHandler().handle(event);
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 07d7c07..1b1fdf3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -57,6 +57,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
private boolean historyLoggingEnabled = true;
private Configuration conf;
private String containerLogs;
+ private String queueName;
public DAGSubmittedEvent() {
}
@@ -64,7 +65,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
Map<String, LocalResource> cumulativeAdditionalLocalResources,
- String user, Configuration conf, String containerLogs) {
+ String user, Configuration conf, String containerLogs, String queueName) {
this.dagID = dagID;
this.dagName = dagPlan.getName();
this.submitTime = submitTime;
@@ -74,6 +75,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
this.user = user;
this.conf = conf;
this.containerLogs = containerLogs;
+ this.queueName = queueName;
}
@Override
@@ -97,6 +99,9 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
.setApplicationAttemptId(applicationAttemptId.toString())
.setDagPlan(dagPlan)
.setSubmitTime(submitTime);
+ if (queueName != null) {
+ builder.setQueueName(queueName);
+ }
if (cumulativeAdditionalLocalResources != null && !cumulativeAdditionalLocalResources.isEmpty()) {
builder.setCumulativeAdditionalAmResources(DagTypeConverters
.convertFromLocalResources(cumulativeAdditionalLocalResources));
@@ -111,6 +116,9 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
this.submitTime = proto.getSubmitTime();
this.applicationAttemptId = ConverterUtils.toApplicationAttemptId(
proto.getApplicationAttemptId());
+ if (proto.hasQueueName()) {
+ this.queueName = proto.getQueueName();
+ }
if (proto.hasCumulativeAdditionalAmResources()) {
this.cumulativeAdditionalLocalResources = DagTypeConverters.convertFromPlanLocalResources(proto
.getCumulativeAdditionalAmResources());
@@ -134,7 +142,8 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
@Override
public String toString() {
return "dagID=" + dagID
- + ", submitTime=" + submitTime;
+ + ", submitTime=" + submitTime
+ + ", queueName=" + queueName;
}
@Override
@@ -203,4 +212,8 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
public String getContainerLogs() {
return containerLogs;
}
+
+ public String getQueueName() {
+ return queueName;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index a767fbf..69c40e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -515,6 +515,9 @@ public class HistoryEventJsonConversion {
otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE,
event.getDAGPlan().getCallerContext().getCallerType());
}
+ if (event.getQueueName() != null) {
+ otherInfo.put(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName());
+ }
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
return jsonObject;
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index ff3707d..7671469 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -54,6 +54,7 @@ message DAGSubmittedProto {
optional int64 submit_time = 3;
optional string application_attempt_id = 4;
optional PlanLocalResourcesProto cumulative_additional_am_resources = 5;
+ optional string queue_name = 6;
}
message DAGInitializedProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index f4edf9e..6673b39 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -169,7 +169,7 @@ public class TestRecoveryParser {
rService.start();
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
rService.handle(new DAGHistoryEvent(dagID,
new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
// only for testing, DAGCommitStartedEvent is not supposed to happen at this time.
@@ -215,7 +215,7 @@ public class TestRecoveryParser {
rService.start();
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
rService.handle(new DAGHistoryEvent(dagID,
new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
rService.handle(new DAGHistoryEvent(dagID,
@@ -264,7 +264,7 @@ public class TestRecoveryParser {
rService.start();
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
// wait until DAGSubmittedEvent is handled in the RecoveryEventHandling thread
rService.await();
rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA");
@@ -310,7 +310,7 @@ public class TestRecoveryParser {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
// write an corrupted SummaryEvent
rService.summaryStream.writeChars("INVALID_DATA");
rService.stop();
@@ -344,7 +344,7 @@ public class TestRecoveryParser {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
rService.handle(new DAGHistoryEvent(dagID,
new DAGCommitStartedEvent(dagID, 0L)));
@@ -376,7 +376,7 @@ public class TestRecoveryParser {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
rService.handle(new DAGHistoryEvent(dagID,
new DAGCommitStartedEvent(dagID, 0L)));
@@ -412,7 +412,7 @@ public class TestRecoveryParser {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
rService.handle(new DAGHistoryEvent(dagID,
new VertexCommitStartedEvent(TezVertexID.getInstance(dagID, 0), 0L)));
@@ -445,7 +445,7 @@ public class TestRecoveryParser {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
rService.handle(new DAGHistoryEvent(dagID,
@@ -482,7 +482,7 @@ public class TestRecoveryParser {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
rService.handle(new DAGHistoryEvent(dagID,
new VertexGroupCommitStartedEvent(dagID, "group_1",
@@ -516,7 +516,7 @@ public class TestRecoveryParser {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
TezVertexID v0 = TezVertexID.getInstance(dagID, 0);
TezVertexID v1 = TezVertexID.getInstance(dagID, 1);
@@ -565,7 +565,7 @@ public class TestRecoveryParser {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
rService.handle(new DAGHistoryEvent(dagID,
@@ -601,7 +601,7 @@ public class TestRecoveryParser {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
rService.handle(new DAGHistoryEvent(dagID,
@@ -640,7 +640,7 @@ public class TestRecoveryParser {
// DAG DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
- null, "user", new Configuration(), null)));
+ null, "user", new Configuration(), null, null)));
DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L,
"user", "dagName", null);
DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName");
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
index 4c0fe3f..5a71a42 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
@@ -219,7 +219,7 @@ public class TestHistoryEventHandler {
new AMStartedEvent(attemptId, time, user)));
historyEvents.add(new DAGHistoryEvent(dagId,
new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user,
- conf, null)));
+ conf, null, "default")));
TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
historyEvents.add(new DAGHistoryEvent(dagId,
new VertexStartedEvent(vertexID, time, time)));
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 3d29a5d..47d8389 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -171,12 +171,13 @@ public class TestHistoryEventsProtoConversion {
logEvents(event, deserializedEvent);
}
+ private final String QUEUE_NAME = "TEST_QUEUE_NAME";
private void testDAGSubmittedEvent() throws Exception {
DAGSubmittedEvent event = new DAGSubmittedEvent(TezDAGID.getInstance(
ApplicationId.newInstance(0, 1), 1), 1001l,
DAGPlan.newBuilder().setName("foo").build(),
ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(0, 1), 1), null, "", null, null);
+ ApplicationId.newInstance(0, 1), 1), null, "", null, null, QUEUE_NAME);
DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getApplicationAttemptId(),
@@ -189,6 +190,7 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getSubmitTime());
Assert.assertEquals(event.getDAGPlan(),
deserializedEvent.getDAGPlan());
+ Assert.assertEquals(event.getQueueName(), deserializedEvent.getQueueName());
logEvents(event, deserializedEvent);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index 9477118..1bbecd3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -124,7 +124,7 @@ public class TestHistoryEventJsonConversion {
break;
case DAG_SUBMITTED:
event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
- null, user, null, null);
+ null, user, null, null, "Q_" + eventType.name());
break;
case DAG_INITIALIZED:
event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
@@ -200,7 +200,15 @@ public class TestHistoryEventJsonConversion {
if (event == null || !event.isHistoryEvent()) {
continue;
}
- HistoryEventJsonConversion.convertToJson(event);
+ JSONObject json = HistoryEventJsonConversion.convertToJson(event);
+ if (eventType == HistoryEventType.DAG_SUBMITTED) {
+ try {
+ Assert.assertEquals("Q_" + eventType.name(), json.getJSONObject(ATSConstants.OTHER_INFO)
+ .getString(ATSConstants.DAG_SUBMITTED_QUEUE_NAME));
+ } catch (JSONException ex) {
+ Assert.fail("Exception: " + ex.getMessage() + " for type: " + eventType);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
index 3dec1d7..790e2d8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java
@@ -333,7 +333,7 @@ public class TestRecoveryService {
DAGPlan dagPlan = DAGPlan.newBuilder().setName("test_dag").build();
// This writes to recovery immediately.
recoveryService.handle(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(
- dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null)));
+ dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null, "default")));
waitForDrain(-1);
verify(summaryFos, times(1)).hflush();
verify(dagFos, times(1)).hflush();
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
index 6b3ebd7..8e5c95c 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
@@ -401,7 +401,7 @@ public class TestATSHistoryWithACLs {
DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID,
1, dagPlan, appAttemptId, null,
- "usr", tezConf, null);
+ "usr", tezConf, null, null);
submittedEvent.setHistoryLoggingEnabled(false);
DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent);
historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent));
@@ -446,7 +446,7 @@ public class TestATSHistoryWithACLs {
DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID,
1, dagPlan, appAttemptId, null,
- "usr", tezConf, null);
+ "usr", tezConf, null, null);
submittedEvent.setHistoryLoggingEnabled(true);
DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent);
historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent));
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
index 9111195..cbded35 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
@@ -450,7 +450,7 @@ public class TestATSV15HistoryLoggingService {
new AMStartedEvent(attemptId, time, user)));
historyEvents.add(new DAGHistoryEvent(dagId,
new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user,
- conf, null)));
+ conf, null, "default")));
TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
historyEvents.add(new DAGHistoryEvent(dagId,
new VertexStartedEvent(vertexID, time, time)));
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 96239c3..8d0c547 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -423,6 +423,9 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_TYPE,
event.getDAGPlan().getCallerContext().getCallerType());
}
+ if (event.getQueueName() != null) {
+ atsEntity.addOtherInfo(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName());
+ }
return atsEntity;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
index da57eb2..a641cda 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -444,7 +444,7 @@ public class TestATSHistoryLoggingService {
Configuration conf = new Configuration(service.getConfig());
historyEvents.add(new DAGHistoryEvent(null, new AMStartedEvent(attemptId, time, "user")));
historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time,
- DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null)));
+ DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null, "default")));
TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time)));
TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
http://git-wip-us.apache.org/repos/asf/tez/blob/16b93de8/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 62fb335..bb189d3 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -145,7 +145,7 @@ public class TestHistoryEventTimelineConversion {
break;
case DAG_SUBMITTED:
event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
- null, user, null, containerLogs);
+ null, user, null, containerLogs, null);
break;
case DAG_INITIALIZED:
event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
@@ -473,8 +473,9 @@ public class TestHistoryEventTimelineConversion {
public void testConvertDAGSubmittedEvent() {
long submitTime = random.nextLong();
+ final String queueName = "TEST_DAG_SUBMITTED";
DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
- applicationAttemptId, null, user, null, containerLogs);
+ applicationAttemptId, null, user, null, containerLogs, queueName);
TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
@@ -509,7 +510,7 @@ public class TestHistoryEventTimelineConversion {
Assert.assertTrue(
timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user));
- Assert.assertEquals(8, timelineEntity.getOtherInfo().size());
+ Assert.assertEquals(9, timelineEntity.getOtherInfo().size());
Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN));
Assert.assertEquals(applicationId.toString(),
timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID));
@@ -530,7 +531,8 @@ public class TestHistoryEventTimelineConversion {
Assert.assertEquals(
timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE),
dagPlan.getCallerContext().getCallerType());
-
+ Assert.assertEquals(
+ queueName, timelineEntity.getOtherInfo().get(ATSConstants.DAG_SUBMITTED_QUEUE_NAME));
}