You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:33 UTC

[27/50] [abbrv] TEZ-1495. ATS integration for TezClient (Prakash Ramachandran via bikas)

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 8057714..d9cafc7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,12 +53,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
   private TezCounters tezCounters;
   private boolean fromSummary = false;
   private VertexStats vertexStats;
+  private Map<String, Integer> vertexTaskStats;
 
-  public VertexFinishedEvent(TezVertexID vertexId,
-      String vertexName, long initRequestedTime, long initedTime,
-      long startRequestedTime, long startedTime, long finishTime,
-      VertexState state, String diagnostics, TezCounters counters,
-      VertexStats vertexStats) {
+  public VertexFinishedEvent(TezVertexID vertexId, String vertexName, long initRequestedTime,
+                             long initedTime, long startRequestedTime, long startedTime,
+                             long finishTime, VertexState state, String diagnostics,
+                             TezCounters counters, VertexStats vertexStats,
+                             Map<String, Integer> vertexTaskStats) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
     this.initRequestedTime = initRequestedTime;
@@ -69,6 +71,7 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
     this.diagnostics = diagnostics;
     this.tezCounters = counters;
     this.vertexStats = vertexStats;
+    this.vertexTaskStats = vertexTaskStats;
   }
 
   public VertexFinishedEvent() {
@@ -138,10 +141,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
         + ", status=" + state.name()
         + ", diagnostics=" + diagnostics
         + ", counters=" + ( tezCounters == null ? "null" :
-          tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " "))
-        + ", vertexStats=" + (vertexStats == null ? "null"
-              : vertexStats.toString());
+          tezCounters.toString().replaceAll("\\n", ", ").replaceAll("\\s+", " "))
+        + ", vertexStats=" + (vertexStats == null ? "null" : vertexStats.toString())
+        + ", vertexTaskStats=" + (vertexTaskStats == null ? "null" : vertexTaskStats.toString());
   }
 
   public TezVertexID getVertexID() {
@@ -176,6 +178,10 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
     return startTime;
   }
 
+  public Map<String, Integer> getVertexTaskStats() {
+    return vertexTaskStats;
+  }
+
   @Override
   public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
     VertexFinishStateProto finishStateProto =

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index 136c5f1..a8bd21e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TezVertexID;
@@ -105,4 +106,7 @@ public class VertexStartedEvent implements HistoryEvent {
     return startTime;
   }
 
+  public VertexState getVertexState() {
+    return VertexState.RUNNING;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 12dcf1c..a9987d6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.logging.impl;
 
+import org.apache.tez.common.ATSConstants;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
@@ -36,7 +37,6 @@ import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
deleted file mode 100644
index 0188a8e..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.utils;
-
-public class ATSConstants {
-
-  // TODO remove once YARN exposes proper constants
-
-  /* Top level keys */
-  public static final String ENTITY = "entity";
-  public static final String ENTITY_TYPE = "entitytype";
-  public static final String EVENTS = "events";
-  public static final String EVENT_TYPE = "eventtype";
-  public static final String TIMESTAMP = "ts";
-  public static final String EVENT_INFO = "eventinfo";
-  public static final String RELATED_ENTITIES = "relatedEntities";
-  public static final String PRIMARY_FILTERS = "primaryfilters";
-  public static final String SECONDARY_FILTERS = "secondaryfilters";
-  public static final String OTHER_INFO = "otherinfo";
-
-  /* Section for related entities */
-  public static final String APPLICATION_ID = "applicationId";
-  public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
-  public static final String CONTAINER_ID = "containerId";
-  public static final String NODE_ID = "nodeId";
-  public static final String USER = "user";
-
-  /* Keys used in other info */
-  public static final String APP_SUBMIT_TIME = "appSubmitTime";
-
-  /* Tez-specific info */
-  public static final String DAG_PLAN = "dagPlan";
-  public static final String DAG_NAME = "dagName";
-  public static final String VERTEX_NAME = "vertexName";
-  public static final String SCHEDULED_TIME = "scheduledTime";
-  public static final String INIT_REQUESTED_TIME = "initRequestedTime";
-  public static final String INIT_TIME = "initTime";
-  public static final String START_REQUESTED_TIME = "startRequestedTime";
-  public static final String START_TIME = "startTime";
-  public static final String FINISH_TIME = "endTime";
-  public static final String TIME_TAKEN = "timeTaken";
-  public static final String STATUS = "status";
-  public static final String DIAGNOSTICS = "diagnostics";
-  public static final String COUNTERS = "counters";
-  public static final String STATS = "stats";
-  public static final String NUM_TASKS = "numTasks";
-  public static final String PROCESSOR_CLASS_NAME = "processorClassName";
-  public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
-  public static final String COMPLETED_LOGS_URL = "completedLogsURL";
-  public static final String EXIT_STATUS = "exitStatus";
-
-  /* Counters-related keys */
-  public static final String COUNTER_GROUPS = "counterGroups";
-  public static final String COUNTER_GROUP_NAME = "counterGroupName";
-  public static final String COUNTER_GROUP_DISPLAY_NAME = "counterGroupDisplayName";
-  public static final String COUNTER_NAME = "counterName";
-  public static final String COUNTER_DISPLAY_NAME = "counterDisplayName";
-  public static final String COUNTER_VALUE = "counterValue";
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 232a3b2..cab3c83 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 9042a93..b278d8f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -572,7 +572,7 @@ public class TestVertexRecovery {
         vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
             "vertex1", initRequestedTime, initedTime, startRequestedTime,
             startTime, finishTime, VertexState.SUCCEEDED, "",
-            new TezCounters(), new VertexStats()));
+            new TezCounters(), new VertexStats(), null));
     assertEquals(finishTime, vertex1.finishTime);
     assertEquals(VertexState.SUCCEEDED, recoveredState);
     assertEquals(false, vertex1.recoveryCommitInProgress);
@@ -801,7 +801,7 @@ public class TestVertexRecovery {
     recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
         "vertex1", initRequestedTime, initedTime, initRequestedTime + 300L,
         initRequestedTime + 400L, initRequestedTime + 500L,
-        VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats()));
+        VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), null));
     assertEquals(VertexState.SUCCEEDED, recoveredState);
 
     vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index bcbe6f1..f52671c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -346,7 +346,7 @@ public class TestHistoryEventsProtoConversion {
           new VertexFinishedEvent(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
               "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
-              null, null, null);
+              null, null, null, null);
       VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
@@ -361,7 +361,7 @@ public class TestHistoryEventsProtoConversion {
           new VertexFinishedEvent(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
               "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
-              "diagnose", new TezCounters(), new VertexStats());
+              "diagnose", new TezCounters(), new VertexStats(), null);
       VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index f674fc0..2149053 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -127,7 +127,7 @@ public class TestHistoryEventJsonConversion {
         case VERTEX_FINISHED:
           event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
               random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
-              null, null, null);
+              null, null, null, null);
           break;
         case TASK_STARTED:
           event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index f078f1d..a81bdf4 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -19,9 +19,11 @@
 package org.apache.tez.dag.history.logging.ats;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.common.ATSConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -41,7 +43,6 @@ import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
 
 public class HistoryEventTimelineConversion {
@@ -253,6 +254,7 @@ public class HistoryEventTimelineConversion {
     atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
 
     atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString());
 
     return atsEntity;
   }
@@ -422,6 +424,13 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.STATS,
         DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
 
+    final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
+    if (vertexTaskStats != null) {
+      for(String key : vertexTaskStats.keySet()) {
+        atsEntity.addOtherInfo(key, vertexTaskStats.get(key));
+      }
+    }
+
     return atsEntity;
   }
 
@@ -464,6 +473,7 @@ public class HistoryEventTimelineConversion {
 
     atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
     atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString());
 
     return atsEntity;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index b04b8d4..d2e366d 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -127,7 +127,7 @@ public class TestHistoryEventTimelineConversion {
         case VERTEX_FINISHED:
           event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
               random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
-              null, null, null);
+              null, null, null, null);
           break;
         case TASK_STARTED:
           event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());