You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/07/21 21:36:02 UTC
tez git commit: TEZ-3358. Support using the same TimelineGroupId for
multiple DAGs. (Harish Jaiprakash via hitesh)
Repository: tez
Updated Branches:
refs/heads/master 9930011b0 -> e610b00d3
TEZ-3358. Support using the same TimelineGroupId for multiple DAGs. (Harish Jaiprakash via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e610b00d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e610b00d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e610b00d
Branch: refs/heads/master
Commit: e610b00d388b61c2cc66a60b78d26e1fb4ce74de
Parents: 9930011
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Jul 21 14:35:51 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Jul 21 14:35:51 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../ats/ATSV15HistoryLoggingService.java | 13 +-
.../ats/TestATSV15HistoryLoggingService.java | 286 +++++++++++++++++++
3 files changed, 298 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/e610b00d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a925104..364ff2c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3358. Support using the same TimelineGroupId for multiple DAGs.
TEZ-3357. Change TimelineCachePlugin to handle DAG grouping.
TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat.
TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
@@ -86,6 +87,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3358. Support using the same TimelineGroupId for multiple DAGs.
TEZ-3357. Change TimelineCachePlugin to handle DAG grouping.
TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat.
TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
http://git-wip-us.apache.org/repos/asf/tez/blob/e610b00d/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
index fc7e97a..dd21d2d 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
@@ -54,7 +54,8 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
private static final Logger LOG = LoggerFactory.getLogger(ATSV15HistoryLoggingService.class);
- private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+ @VisibleForTesting
+ LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
new LinkedBlockingQueue<DAGHistoryEvent>();
private Thread eventHandlingThread;
@@ -81,6 +82,8 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
"org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager";
private HistoryACLPolicyManager historyACLPolicyManager;
+ private int numDagsPerGroup;
+
public ATSV15HistoryLoggingService() {
super(ATSV15HistoryLoggingService.class.getName());
}
@@ -151,6 +154,8 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
historyACLPolicyManager = null;
}
+ numDagsPerGroup = conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP,
+ TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT);
}
@Override
@@ -290,8 +295,10 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
case VERTEX_GROUP_COMMIT_STARTED:
case VERTEX_GROUP_COMMIT_FINISHED:
case DAG_RECOVERED:
- return TimelineEntityGroupId.newInstance(event.getDagID().getApplicationId(),
- event.getDagID().toString());
+ String entityGroupId = numDagsPerGroup > 1
+ ? event.getDagID().getGroupId(numDagsPerGroup)
+ : event.getDagID().toString();
+ return TimelineEntityGroupId.newInstance(event.getDagID().getApplicationId(), entityGroupId);
case APP_LAUNCHED:
case AM_LAUNCHED:
case AM_STARTED:
http://git-wip-us.apache.org/repos/asf/tez/blob/e610b00d/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
new file mode 100644
index 0000000..87a48f6
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.junit.Test;
+
+public class TestATSV15HistoryLoggingService {
+ private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+ private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+ private static String user = "TEST_USER";
+
+ private InMemoryTimelineClient timelineClient;
+
+ @Test(timeout=2000)
+ public void testDAGGroupingDefault() throws Exception {
+ ATSV15HistoryLoggingService service = createService(-1);
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(2, timelineClient.entityLog.size());
+
+ List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, appId.toString()));
+ assertNotNull(amEvents);
+ assertEquals(1, amEvents.size());
+
+ List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
+ assertNotNull(nonGroupedDagEvents);
+ assertEquals(4, nonGroupedDagEvents.size());
+
+ service.stop();
+ }
+
+ @Test(timeout=2000)
+ public void testDAGGroupingDisabled() throws Exception {
+ ATSV15HistoryLoggingService service = createService(1);
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(2, timelineClient.entityLog.size());
+
+ List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, appId.toString()));
+ assertNotNull(amEvents);
+ assertEquals(1, amEvents.size());
+
+ List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
+ assertNotNull(nonGroupedDagEvents);
+ assertEquals(4, nonGroupedDagEvents.size());
+
+ service.stop();
+ }
+
+ @Test(timeout=2000)
+ public void testDAGGroupingGroupingEnabled() throws Exception {
+ int numDagsPerGroup = 100;
+ ATSV15HistoryLoggingService service = createService(numDagsPerGroup);
+ TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+ service.handle(event);
+ }
+ TezDAGID dagId2 = TezDAGID.getInstance(appId, numDagsPerGroup - 1);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId2, service)) {
+ service.handle(event);
+ }
+
+ TezDAGID dagId3 = TezDAGID.getInstance(appId, numDagsPerGroup);
+ for (DAGHistoryEvent event : makeHistoryEvents(dagId3, service)) {
+ service.handle(event);
+ }
+
+ while (!service.eventQueue.isEmpty()) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(dagId1.getGroupId(numDagsPerGroup), dagId2.getGroupId(numDagsPerGroup));
+ assertNotEquals(dagId2.getGroupId(numDagsPerGroup), dagId3.getGroupId(numDagsPerGroup));
+
+ assertEquals(3, timelineClient.entityLog.size());
+
+ List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, appId.toString()));
+ assertNotNull(amEvents);
+ assertEquals(3, amEvents.size());
+
+ List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
+ assertNull(nonGroupedDagEvents);
+
+ List<TimelineEntity> groupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, dagId1.getGroupId(numDagsPerGroup)));
+ assertNotNull(groupedDagEvents);
+ assertEquals(8, groupedDagEvents.size());
+
+ nonGroupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, dagId3.toString()));
+ assertNull(nonGroupedDagEvents);
+
+ groupedDagEvents = timelineClient.entityLog.get(
+ TimelineEntityGroupId.newInstance(appId, dagId3.getGroupId(numDagsPerGroup)));
+ assertNotNull(groupedDagEvents);
+ assertEquals(4, groupedDagEvents.size());
+
+ service.stop();
+ }
+
+ private ATSV15HistoryLoggingService createService(int numDagsPerGroup) {
+ ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService();
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getApplicationID()).thenReturn(appId);
+ when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
+ service.setAppContext(appContext);
+
+ Configuration conf = new Configuration();
+ if (numDagsPerGroup != -1) {
+ conf.setInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP,
+ numDagsPerGroup);
+ }
+ service.init(conf);
+
+ // Set timeline service.
+ timelineClient = new InMemoryTimelineClient();
+ timelineClient.init(conf);
+ service.timelineClient = timelineClient;
+
+ service.start();
+ return service;
+ }
+
+ private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
+ ATSV15HistoryLoggingService service) {
+ List<DAGHistoryEvent> historyEvents = new ArrayList<>();
+
+ long time = System.currentTimeMillis();
+ Configuration conf = new Configuration(service.getConfig());
+ historyEvents.add(new DAGHistoryEvent(null,
+ new AMStartedEvent(attemptId, time, user)));
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user,
+ conf, null)));
+ TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new VertexStartedEvent(vertexID, time, time)));
+ TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new TaskStartedEvent(tezTaskID, "test", time, time)));
+ historyEvents.add(new DAGHistoryEvent(dagId,
+ new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time,
+ ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765), null,
+ null, null)));
+ return historyEvents;
+ }
+
+ private static class InMemoryTimelineClient extends TimelineClient {
+ Map<TimelineEntityGroupId, List<TimelineEntity>> entityLog = new HashMap<>();
+
+ protected InMemoryTimelineClient() {
+ super("InMemoryTimelineClient");
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ public static final ApplicationId DEFAULT_APP_ID = ApplicationId.newInstance(0, -1);
+ public static final TimelineEntityGroupId DEFAULT_GROUP_ID =
+ TimelineEntityGroupId.newInstance(DEFAULT_APP_ID, "");
+
+ @Override
+ public synchronized TimelinePutResponse putEntities(TimelineEntity... entities)
+ throws IOException, YarnException {
+ return putEntities(null, DEFAULT_GROUP_ID, entities);
+ }
+
+ @Override
+ public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
+ TimelineEntityGroupId groupId,
+ TimelineEntity... entities) throws IOException, YarnException {
+ List<TimelineEntity> groupEntities = entityLog.get(groupId);
+ if (groupEntities == null) {
+ groupEntities = new ArrayList<>();
+ entityLog.put(groupId, groupEntities);
+ }
+ for (TimelineEntity entity : entities) {
+ groupEntities.add(entity);
+ }
+ return null;
+ }
+
+ @Override
+ public void putDomain(TimelineDomain domain) throws IOException, YarnException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void putDomain(ApplicationAttemptId appAttemptId, TimelineDomain domain)
+ throws IOException, YarnException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Token<TimelineDelegationTokenIdentifier> getDelegationToken(String renewer)
+ throws IOException, YarnException {
+ return null;
+ }
+
+ @Override
+ public long renewDelegationToken(Token<TimelineDelegationTokenIdentifier> timelineDT)
+ throws IOException, YarnException {
+ return 0;
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<TimelineDelegationTokenIdentifier> timelineDT)
+ throws IOException, YarnException {
+ }
+ }
+}