You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:33 UTC
[27/50] [abbrv] TEZ-1495. ATS integration for TezClient (Prakash
Ramachandran via bikas)
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 8057714..d9cafc7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -52,12 +53,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
private TezCounters tezCounters;
private boolean fromSummary = false;
private VertexStats vertexStats;
+ private Map<String, Integer> vertexTaskStats;
- public VertexFinishedEvent(TezVertexID vertexId,
- String vertexName, long initRequestedTime, long initedTime,
- long startRequestedTime, long startedTime, long finishTime,
- VertexState state, String diagnostics, TezCounters counters,
- VertexStats vertexStats) {
+ public VertexFinishedEvent(TezVertexID vertexId, String vertexName, long initRequestedTime,
+ long initedTime, long startRequestedTime, long startedTime,
+ long finishTime, VertexState state, String diagnostics,
+ TezCounters counters, VertexStats vertexStats,
+ Map<String, Integer> vertexTaskStats) {
this.vertexName = vertexName;
this.vertexID = vertexId;
this.initRequestedTime = initRequestedTime;
@@ -69,6 +71,7 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
this.diagnostics = diagnostics;
this.tezCounters = counters;
this.vertexStats = vertexStats;
+ this.vertexTaskStats = vertexTaskStats;
}
public VertexFinishedEvent() {
@@ -138,10 +141,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
+ ", status=" + state.name()
+ ", diagnostics=" + diagnostics
+ ", counters=" + ( tezCounters == null ? "null" :
- tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " "))
- + ", vertexStats=" + (vertexStats == null ? "null"
- : vertexStats.toString());
+ tezCounters.toString().replaceAll("\\n", ", ").replaceAll("\\s+", " "))
+ + ", vertexStats=" + (vertexStats == null ? "null" : vertexStats.toString())
+ + ", vertexTaskStats=" + (vertexTaskStats == null ? "null" : vertexTaskStats.toString());
}
public TezVertexID getVertexID() {
@@ -176,6 +178,10 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
return startTime;
}
+ public Map<String, Integer> getVertexTaskStats() {
+ return vertexTaskStats;
+ }
+
@Override
public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
VertexFinishStateProto finishStateProto =
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index 136c5f1..a8bd21e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.records.TezVertexID;
@@ -105,4 +106,7 @@ public class VertexStartedEvent implements HistoryEvent {
return startTime;
}
+ public VertexState getVertexState() {
+ return VertexState.RUNNING;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 12dcf1c..a9987d6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.history.logging.impl;
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
@@ -36,7 +37,6 @@ import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
deleted file mode 100644
index 0188a8e..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.utils;
-
-public class ATSConstants {
-
- // TODO remove once YARN exposes proper constants
-
- /* Top level keys */
- public static final String ENTITY = "entity";
- public static final String ENTITY_TYPE = "entitytype";
- public static final String EVENTS = "events";
- public static final String EVENT_TYPE = "eventtype";
- public static final String TIMESTAMP = "ts";
- public static final String EVENT_INFO = "eventinfo";
- public static final String RELATED_ENTITIES = "relatedEntities";
- public static final String PRIMARY_FILTERS = "primaryfilters";
- public static final String SECONDARY_FILTERS = "secondaryfilters";
- public static final String OTHER_INFO = "otherinfo";
-
- /* Section for related entities */
- public static final String APPLICATION_ID = "applicationId";
- public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
- public static final String CONTAINER_ID = "containerId";
- public static final String NODE_ID = "nodeId";
- public static final String USER = "user";
-
- /* Keys used in other info */
- public static final String APP_SUBMIT_TIME = "appSubmitTime";
-
- /* Tez-specific info */
- public static final String DAG_PLAN = "dagPlan";
- public static final String DAG_NAME = "dagName";
- public static final String VERTEX_NAME = "vertexName";
- public static final String SCHEDULED_TIME = "scheduledTime";
- public static final String INIT_REQUESTED_TIME = "initRequestedTime";
- public static final String INIT_TIME = "initTime";
- public static final String START_REQUESTED_TIME = "startRequestedTime";
- public static final String START_TIME = "startTime";
- public static final String FINISH_TIME = "endTime";
- public static final String TIME_TAKEN = "timeTaken";
- public static final String STATUS = "status";
- public static final String DIAGNOSTICS = "diagnostics";
- public static final String COUNTERS = "counters";
- public static final String STATS = "stats";
- public static final String NUM_TASKS = "numTasks";
- public static final String PROCESSOR_CLASS_NAME = "processorClassName";
- public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
- public static final String COMPLETED_LOGS_URL = "completedLogsURL";
- public static final String EXIT_STATUS = "exitStatus";
-
- /* Counters-related keys */
- public static final String COUNTER_GROUPS = "counterGroups";
- public static final String COUNTER_GROUP_NAME = "counterGroupName";
- public static final String COUNTER_GROUP_DISPLAY_NAME = "counterGroupDisplayName";
- public static final String COUNTER_NAME = "counterName";
- public static final String COUNTER_DISPLAY_NAME = "counterDisplayName";
- public static final String COUNTER_VALUE = "counterValue";
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 232a3b2..cab3c83 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 9042a93..b278d8f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -572,7 +572,7 @@ public class TestVertexRecovery {
vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
"vertex1", initRequestedTime, initedTime, startRequestedTime,
startTime, finishTime, VertexState.SUCCEEDED, "",
- new TezCounters(), new VertexStats()));
+ new TezCounters(), new VertexStats(), null));
assertEquals(finishTime, vertex1.finishTime);
assertEquals(VertexState.SUCCEEDED, recoveredState);
assertEquals(false, vertex1.recoveryCommitInProgress);
@@ -801,7 +801,7 @@ public class TestVertexRecovery {
recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
"vertex1", initRequestedTime, initedTime, initRequestedTime + 300L,
initRequestedTime + 400L, initRequestedTime + 500L,
- VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats()));
+ VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), null));
assertEquals(VertexState.SUCCEEDED, recoveredState);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index bcbe6f1..f52671c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -346,7 +346,7 @@ public class TestHistoryEventsProtoConversion {
new VertexFinishedEvent(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
"vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
- null, null, null);
+ null, null, null, null);
VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
@@ -361,7 +361,7 @@ public class TestHistoryEventsProtoConversion {
new VertexFinishedEvent(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
"vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
- "diagnose", new TezCounters(), new VertexStats());
+ "diagnose", new TezCounters(), new VertexStats(), null);
VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index f674fc0..2149053 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -127,7 +127,7 @@ public class TestHistoryEventJsonConversion {
case VERTEX_FINISHED:
event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
- null, null, null);
+ null, null, null, null);
break;
case TASK_STARTED:
event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index f078f1d..a81bdf4 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -19,9 +19,11 @@
package org.apache.tez.dag.history.logging.ats;
import java.io.IOException;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
@@ -41,7 +43,6 @@ import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
public class HistoryEventTimelineConversion {
@@ -253,6 +254,7 @@ public class HistoryEventTimelineConversion {
atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+ atsEntity.addOtherInfo(ATSConstants.STATUS, event.getDagState().toString());
return atsEntity;
}
@@ -422,6 +424,13 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.STATS,
DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
+ final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
+ if (vertexTaskStats != null) {
+ for(String key : vertexTaskStats.keySet()) {
+ atsEntity.addOtherInfo(key, vertexTaskStats.get(key));
+ }
+ }
+
return atsEntity;
}
@@ -464,6 +473,7 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+ atsEntity.addOtherInfo(ATSConstants.STATUS, event.getVertexState().toString());
return atsEntity;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4cf6472e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index b04b8d4..d2e366d 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -127,7 +127,7 @@ public class TestHistoryEventTimelineConversion {
case VERTEX_FINISHED:
event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
- null, null, null);
+ null, null, null, null);
break;
case TASK_STARTED:
event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());