You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/07/21 21:36:02 UTC

tez git commit: TEZ-3358. Support using the same TimelineGroupId for multiple DAGs. (Harish Jaiprakash via hitesh)

Repository: tez
Updated Branches:
  refs/heads/master 9930011b0 -> e610b00d3


TEZ-3358. Support using the same TimelineGroupId for multiple DAGs. (Harish Jaiprakash via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e610b00d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e610b00d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e610b00d

Branch: refs/heads/master
Commit: e610b00d388b61c2cc66a60b78d26e1fb4ce74de
Parents: 9930011
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Jul 21 14:35:51 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Jul 21 14:35:51 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../ats/ATSV15HistoryLoggingService.java        |  13 +-
 .../ats/TestATSV15HistoryLoggingService.java    | 286 +++++++++++++++++++
 3 files changed, 298 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e610b00d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a925104..364ff2c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3358. Support using the same TimelineGroupId for multiple DAGs.
   TEZ-3357. Change TimelineCachePlugin to handle DAG grouping.
   TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat.
   TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
@@ -86,6 +87,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3358. Support using the same TimelineGroupId for multiple DAGs.
   TEZ-3357. Change TimelineCachePlugin to handle DAG grouping.
   TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat.
   TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.

http://git-wip-us.apache.org/repos/asf/tez/blob/e610b00d/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
index fc7e97a..dd21d2d 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV15HistoryLoggingService.java
@@ -54,7 +54,8 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
 
   private static final Logger LOG = LoggerFactory.getLogger(ATSV15HistoryLoggingService.class);
 
-  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+  @VisibleForTesting
+  LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
       new LinkedBlockingQueue<DAGHistoryEvent>();
 
   private Thread eventHandlingThread;
@@ -81,6 +82,8 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
       "org.apache.tez.dag.history.ats.acls.ATSV15HistoryACLPolicyManager";
   private HistoryACLPolicyManager historyACLPolicyManager;
 
+  private int numDagsPerGroup;
+
   public ATSV15HistoryLoggingService() {
     super(ATSV15HistoryLoggingService.class.getName());
   }
@@ -151,6 +154,8 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
       historyACLPolicyManager = null;
     }
 
+    numDagsPerGroup = conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP,
+        TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT);
   }
 
   @Override
@@ -290,8 +295,10 @@ public class ATSV15HistoryLoggingService extends HistoryLoggingService {
       case VERTEX_GROUP_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_FINISHED:
       case DAG_RECOVERED:
-        return TimelineEntityGroupId.newInstance(event.getDagID().getApplicationId(),
-            event.getDagID().toString());
+        String entityGroupId = numDagsPerGroup > 1
+            ? event.getDagID().getGroupId(numDagsPerGroup)
+            : event.getDagID().toString();
+        return TimelineEntityGroupId.newInstance(event.getDagID().getApplicationId(), entityGroupId);
       case APP_LAUNCHED:
       case AM_LAUNCHED:
       case AM_STARTED:

http://git-wip-us.apache.org/repos/asf/tez/blob/e610b00d/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
new file mode 100644
index 0000000..87a48f6
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.junit.Test;
+
+public class TestATSV15HistoryLoggingService {
+  private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+  private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+  private static String user = "TEST_USER";
+
+  private InMemoryTimelineClient timelineClient;
+
+  @Test(timeout=2000)
+  public void testDAGGroupingDefault() throws Exception {
+    ATSV15HistoryLoggingService service = createService(-1);
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+
+    assertEquals(2, timelineClient.entityLog.size());
+
+    List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+        TimelineEntityGroupId.newInstance(appId, appId.toString()));
+    assertNotNull(amEvents);
+    assertEquals(1, amEvents.size());
+
+    List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+        TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
+    assertNotNull(nonGroupedDagEvents);
+    assertEquals(4, nonGroupedDagEvents.size());
+
+    service.stop();
+  }
+
+  @Test(timeout=2000)
+  public void testDAGGroupingDisabled() throws Exception {
+    ATSV15HistoryLoggingService service = createService(1);
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+
+    assertEquals(2, timelineClient.entityLog.size());
+
+    List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+        TimelineEntityGroupId.newInstance(appId, appId.toString()));
+    assertNotNull(amEvents);
+    assertEquals(1, amEvents.size());
+
+    List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+        TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
+    assertNotNull(nonGroupedDagEvents);
+    assertEquals(4, nonGroupedDagEvents.size());
+
+    service.stop();
+  }
+
+  @Test(timeout=2000)
+  public void testDAGGroupingGroupingEnabled() throws Exception {
+    int numDagsPerGroup = 100;
+    ATSV15HistoryLoggingService service = createService(numDagsPerGroup);
+    TezDAGID dagId1 = TezDAGID.getInstance(appId, 0);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId1, service)) {
+      service.handle(event);
+    }
+    TezDAGID dagId2 = TezDAGID.getInstance(appId, numDagsPerGroup - 1);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId2, service)) {
+      service.handle(event);
+    }
+
+    TezDAGID dagId3 = TezDAGID.getInstance(appId, numDagsPerGroup);
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId3, service)) {
+      service.handle(event);
+    }
+
+    while (!service.eventQueue.isEmpty()) {
+      Thread.sleep(100);
+    }
+
+    assertEquals(dagId1.getGroupId(numDagsPerGroup), dagId2.getGroupId(numDagsPerGroup));
+    assertNotEquals(dagId2.getGroupId(numDagsPerGroup), dagId3.getGroupId(numDagsPerGroup));
+
+    assertEquals(3, timelineClient.entityLog.size());
+
+    List<TimelineEntity> amEvents = timelineClient.entityLog.get(
+        TimelineEntityGroupId.newInstance(appId, appId.toString()));
+    assertNotNull(amEvents);
+    assertEquals(3, amEvents.size());
+
+    List<TimelineEntity> nonGroupedDagEvents = timelineClient.entityLog.get(
+        TimelineEntityGroupId.newInstance(appId, dagId1.toString()));
+    assertNull(nonGroupedDagEvents);
+
+    List<TimelineEntity> groupedDagEvents = timelineClient.entityLog.get(
+        TimelineEntityGroupId.newInstance(appId, dagId1.getGroupId(numDagsPerGroup)));
+    assertNotNull(groupedDagEvents);
+    assertEquals(8, groupedDagEvents.size());
+
+    nonGroupedDagEvents = timelineClient.entityLog.get(
+        TimelineEntityGroupId.newInstance(appId, dagId3.toString()));
+    assertNull(nonGroupedDagEvents);
+
+    groupedDagEvents = timelineClient.entityLog.get(
+        TimelineEntityGroupId.newInstance(appId, dagId3.getGroupId(numDagsPerGroup)));
+    assertNotNull(groupedDagEvents);
+    assertEquals(4, groupedDagEvents.size());
+
+    service.stop();
+  }
+
+  private ATSV15HistoryLoggingService createService(int numDagsPerGroup) {
+    ATSV15HistoryLoggingService service = new ATSV15HistoryLoggingService();
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getApplicationID()).thenReturn(appId);
+    when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
+    service.setAppContext(appContext);
+
+    Configuration conf = new Configuration();
+    if (numDagsPerGroup != -1) {
+      conf.setInt(TezConfiguration.TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP,
+          numDagsPerGroup);
+    }
+    service.init(conf);
+
+    // Set timeline service.
+    timelineClient = new InMemoryTimelineClient();
+    timelineClient.init(conf);
+    service.timelineClient = timelineClient;
+
+    service.start();
+    return service;
+  }
+
+  private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
+                                                  ATSV15HistoryLoggingService service) {
+    List<DAGHistoryEvent> historyEvents = new ArrayList<>();
+
+    long time = System.currentTimeMillis();
+    Configuration conf = new Configuration(service.getConfig());
+    historyEvents.add(new DAGHistoryEvent(null,
+        new AMStartedEvent(attemptId, time, user)));
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user,
+            conf, null)));
+    TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new VertexStartedEvent(vertexID, time, time)));
+    TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new TaskStartedEvent(tezTaskID, "test", time, time)));
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time,
+            ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765), null,
+            null, null)));
+    return historyEvents;
+  }
+
+  private static class InMemoryTimelineClient extends TimelineClient {
+    Map<TimelineEntityGroupId, List<TimelineEntity>> entityLog = new HashMap<>();
+
+    protected InMemoryTimelineClient() {
+      super("InMemoryTimelineClient");
+    }
+
+    @Override
+    public void flush() throws IOException {
+    }
+
+    public static final ApplicationId DEFAULT_APP_ID = ApplicationId.newInstance(0, -1);
+    public static final TimelineEntityGroupId DEFAULT_GROUP_ID =
+        TimelineEntityGroupId.newInstance(DEFAULT_APP_ID, "");
+
+    @Override
+    public synchronized TimelinePutResponse putEntities(TimelineEntity... entities)
+        throws IOException, YarnException {
+      return putEntities(null, DEFAULT_GROUP_ID, entities);
+    }
+
+    @Override
+    public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
+        TimelineEntityGroupId groupId,
+        TimelineEntity... entities) throws IOException, YarnException {
+      List<TimelineEntity> groupEntities = entityLog.get(groupId);
+      if (groupEntities == null) {
+        groupEntities = new ArrayList<>();
+        entityLog.put(groupId, groupEntities);
+      }
+      for (TimelineEntity entity : entities) {
+        groupEntities.add(entity);
+      }
+      return null;
+    }
+
+    @Override
+    public void putDomain(TimelineDomain domain) throws IOException, YarnException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void putDomain(ApplicationAttemptId appAttemptId, TimelineDomain domain)
+        throws IOException, YarnException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Token<TimelineDelegationTokenIdentifier> getDelegationToken(String renewer)
+        throws IOException, YarnException {
+      return null;
+    }
+
+    @Override
+    public long renewDelegationToken(Token<TimelineDelegationTokenIdentifier> timelineDT)
+        throws IOException, YarnException {
+      return 0;
+    }
+
+    @Override
+    public void cancelDelegationToken(Token<TimelineDelegationTokenIdentifier> timelineDT)
+        throws IOException, YarnException {
+    }
+  }
+}