You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/05/22 00:13:14 UTC
[2/3] TEZ-1066. Generate events to integrate with YARN timeline
server. (hitesh)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index e6023f1..da00b06 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -18,19 +18,14 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class VertexStartedEvent implements HistoryEvent {
@@ -54,39 +49,6 @@ public class VertexStartedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, vertexID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
- // Related entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject vertexEntity = new JSONObject();
- vertexEntity.put(ATSConstants.ENTITY, vertexID.getDAGId().toString());
- vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
- relatedEntities.put(vertexEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject startEvent = new JSONObject();
- startEvent.put(ATSConstants.TIMESTAMP, startTime);
- startEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.VERTEX_STARTED.name());
- events.put(startEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info
- // TODO fix requested times to be events
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.START_REQUESTED_TIME, startRequestedTime);
- otherInfo.put(ATSConstants.START_TIME, startTime);
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
new file mode 100644
index 0000000..00cac28
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging;
+
+public enum EntityTypes {
+ TEZ_APPLICATION_ATTEMPT,
+ TEZ_CONTAINER_ID,
+ TEZ_DAG_ID,
+ TEZ_VERTEX_ID,
+ TEZ_TASK_ID,
+ TEZ_TASK_ATTEMPT_ID,
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
new file mode 100644
index 0000000..44efad7
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+
+public abstract class HistoryLoggingService extends AbstractService {
+
+ protected AppContext appContext;
+
+ public void setAppContext(AppContext appContext) {
+ this.appContext = appContext;
+ }
+
+ public HistoryLoggingService(String name) {
+ super(name);
+ }
+
+ /**
+ * Handle logging of history event
+ * @param event History event to be logged
+ */
+ public abstract void handle(DAGHistoryEvent event);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
new file mode 100644
index 0000000..e5bb1e5
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -0,0 +1,633 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.history.utils.DAGUtils;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class HistoryEventJsonConversion {
+
+ public static JSONObject convertToJson(HistoryEvent historyEvent) throws JSONException {
+ if (!historyEvent.isHistoryEvent()) {
+ throw new UnsupportedOperationException("Invalid Event, does not support history"
+ + ", eventType=" + historyEvent.getEventType());
+ }
+ JSONObject jsonObject = null;
+ switch (historyEvent.getEventType()) {
+ case AM_LAUNCHED:
+ jsonObject = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
+ break;
+ case AM_STARTED:
+ jsonObject = convertAMStartedEvent((AMStartedEvent) historyEvent);
+ break;
+ case CONTAINER_LAUNCHED:
+ jsonObject = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
+ break;
+ case CONTAINER_STOPPED:
+ jsonObject = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
+ break;
+ case DAG_SUBMITTED:
+ jsonObject = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
+ break;
+ case DAG_INITIALIZED:
+ jsonObject = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
+ break;
+ case DAG_STARTED:
+ jsonObject = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
+ break;
+ case DAG_FINISHED:
+ jsonObject = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
+ break;
+ case VERTEX_INITIALIZED:
+ jsonObject = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
+ break;
+ case VERTEX_STARTED:
+ jsonObject = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
+ break;
+ case VERTEX_FINISHED:
+ jsonObject = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
+ break;
+ case TASK_STARTED:
+ jsonObject = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
+ break;
+ case TASK_FINISHED:
+ jsonObject = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
+ break;
+ case TASK_ATTEMPT_STARTED:
+ jsonObject = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
+ break;
+ case TASK_ATTEMPT_FINISHED:
+ jsonObject = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
+ break;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ case VERTEX_COMMIT_STARTED:
+ case VERTEX_GROUP_COMMIT_STARTED:
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ case VERTEX_PARALLELISM_UPDATED:
+ case DAG_COMMIT_STARTED:
+ throw new UnsupportedOperationException("Invalid Event, does not support history"
+ + ", eventType=" + historyEvent.getEventType());
+ default:
+ throw new UnsupportedOperationException("Unhandled Event"
+ + ", eventType=" + historyEvent.getEventType());
+ }
+ return jsonObject;
+ }
+
+ private static JSONObject convertAMLaunchedEvent(AMLaunchedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + event.getApplicationAttemptId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ // Related Entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appEntity = new JSONObject();
+ appEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().getApplicationId().toString());
+ appEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ID);
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ATTEMPT_ID);
+ relatedEntities.put(appEntity);
+ relatedEntities.put(appAttemptEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject initEvent = new JSONObject();
+ initEvent.put(ATSConstants.TIMESTAMP, event.getLaunchTime());
+ initEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.AM_LAUNCHED.name());
+ events.put(initEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info to tag with Tez AM
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertAMStartedEvent(AMStartedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + event.getApplicationAttemptId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ // Related Entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appEntity = new JSONObject();
+ appEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().getApplicationId().toString());
+ appEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ID);
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ATTEMPT_ID);
+ relatedEntities.put(appEntity);
+ relatedEntities.put(appAttemptEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.AM_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ return jsonObject; }
+
+ private static JSONObject convertContainerLaunchedEvent(ContainerLaunchedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + event.getContainerId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_CONTAINER_ID.name());
+
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ JSONObject containerEntity = new JSONObject();
+ containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+ containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+ relatedEntities.put(appAttemptEntity);
+ relatedEntities.put(containerEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject launchEvent = new JSONObject();
+ launchEvent.put(ATSConstants.TIMESTAMP, event.getLaunchTime());
+ launchEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.CONTAINER_LAUNCHED.name());
+ events.put(launchEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // TODO add other container info here? or assume AHS will have this?
+ // TODO container logs?
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertContainerStoppedEvent(ContainerStoppedEvent event) throws JSONException {
+ // structure is identical to ContainerLaunchedEvent
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + event.getContainerId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_CONTAINER_ID.name());
+
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ JSONObject containerEntity = new JSONObject();
+ containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+ containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+ relatedEntities.put(appAttemptEntity);
+ relatedEntities.put(containerEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject stopEvent = new JSONObject();
+ stopEvent.put(ATSConstants.TIMESTAMP, event.getStoppedTime());
+ stopEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.CONTAINER_STOPPED.name());
+ events.put(stopEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // TODO add other container info here? or assume AHS will have this?
+ // TODO container logs?
+
+ // Other info
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.EXIT_STATUS, event.getExitStatus());
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject; }
+
+ private static JSONObject convertDAGFinishedEvent(DAGFinishedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ event.getDagID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities not needed as should have been done in
+ // dag submission event
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.DAG_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ otherInfo.put(ATSConstants.STATUS, event.getState().name());
+ otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+ otherInfo.put(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToJSON(event.getTezCounters()));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertDAGInitializedEvent(DAGInitializedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ event.getDagID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities not needed as should have been done in
+ // dag submission event
+
+ JSONArray events = new JSONArray();
+ JSONObject initEvent = new JSONObject();
+ initEvent.put(ATSConstants.TIMESTAMP, event.getInitTime());
+ initEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.DAG_INITIALIZED.name());
+ events.put(initEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertDAGStartedEvent(DAGStartedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ event.getDagID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities not needed as should have been done in
+ // dag submission event
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.DAG_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertDAGSubmittedEvent(DAGSubmittedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ event.getDagID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject tezAppEntity = new JSONObject();
+ tezAppEntity.put(ATSConstants.ENTITY,
+ "tez_" + event.getApplicationAttemptId().toString());
+ tezAppEntity.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+ JSONObject appEntity = new JSONObject();
+ appEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().getApplicationId().toString());
+ appEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ID);
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ATTEMPT_ID);
+ JSONObject userEntity = new JSONObject();
+ userEntity.put(ATSConstants.ENTITY,
+ event.getUser());
+ userEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.USER);
+
+ relatedEntities.put(tezAppEntity);
+ relatedEntities.put(appEntity);
+ relatedEntities.put(appAttemptEntity);
+ relatedEntities.put(userEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // filters
+ JSONObject primaryFilters = new JSONObject();
+ primaryFilters.put(ATSConstants.DAG_NAME,
+ event.getDAGName());
+ jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject submitEvent = new JSONObject();
+ submitEvent.put(ATSConstants.TIMESTAMP, event.getSubmitTime());
+ submitEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.DAG_SUBMITTED.name());
+ events.put(submitEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info such as dag plan
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.DAG_PLAN,
+ DAGUtils.generateSimpleJSONPlan(event.getDAGPlan()));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.TASK_ATTEMPT_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ otherInfo.put(ATSConstants.STATUS, event.getState().name());
+ otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+ otherInfo.put(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToJSON(event.getCounters()));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject nodeEntity = new JSONObject();
+ nodeEntity.put(ATSConstants.ENTITY, event.getNodeId().toString());
+ nodeEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.NODE_ID);
+
+ JSONObject containerEntity = new JSONObject();
+ containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+ containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+ JSONObject taskEntity = new JSONObject();
+ taskEntity.put(ATSConstants.ENTITY, event.getTaskAttemptID().getTaskID().toString());
+ taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+ relatedEntities.put(nodeEntity);
+ relatedEntities.put(containerEntity);
+ relatedEntities.put(taskEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.TASK_ATTEMPT_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+ otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskFinishedEvent(TaskFinishedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.TASK_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ otherInfo.put(ATSConstants.STATUS, event.getState().name());
+ otherInfo.put(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToJSON(event.getTezCounters()));
+
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskStartedEvent(TaskStartedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject vertexEntity = new JSONObject();
+ vertexEntity.put(ATSConstants.ENTITY, event.getTaskID().getVertexID().toString());
+ vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+ relatedEntities.put(vertexEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.TASK_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ // TODO fix schedule/launch time to be events
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+ otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertVertexFinishedEvent(VertexFinishedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.VERTEX_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ otherInfo.put(ATSConstants.STATUS, event.getState().name());
+ otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+ otherInfo.put(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToJSON(event.getTezCounters()));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertVertexInitializedEvent(VertexInitializedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject vertexEntity = new JSONObject();
+ vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString());
+ vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+ relatedEntities.put(vertexEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject initEvent = new JSONObject();
+ initEvent.put(ATSConstants.TIMESTAMP, event.getInitedTime());
+ initEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.VERTEX_INITIALIZED.name());
+ events.put(initEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ // TODO fix requested times to be events
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.VERTEX_NAME, event.getVertexName());
+ otherInfo.put(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
+ otherInfo.put(ATSConstants.INIT_TIME, event.getInitedTime());
+ otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
+ otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertVertexStartedEvent(VertexStartedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject vertexEntity = new JSONObject();
+ vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString());
+ vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+ relatedEntities.put(vertexEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.VERTEX_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ // TODO fix requested times to be events
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
+ otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
new file mode 100644
index 0000000..257b6c8
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class SimpleHistoryLoggingService extends HistoryLoggingService {
+
+ private static final Log LOG = LogFactory.getLog(SimpleHistoryLoggingService.class);
+ private Path logFileLocation;
+ private FileSystem logFileFS;
+ private FSDataOutputStream outputStream;
+ private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+ new LinkedBlockingQueue<DAGHistoryEvent>();
+ public static final String RECORD_SEPARATOR = System.getProperty("line.separator") + "\u0001";
+ public static final String LOG_FILE_NAME_PREFIX = "history.txt";
+
+ private Thread eventHandlingThread;
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private int consecutiveErrors = 0;
+ private int maxErrors;
+ private boolean loggingDisabled = false;
+
+ public SimpleHistoryLoggingService() {
+ super(SimpleHistoryLoggingService.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ String logDirPath = conf.get(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR);
+ final String logFileName = LOG_FILE_NAME_PREFIX + "." + appContext.getApplicationAttemptId();
+ if (logDirPath == null || logDirPath.isEmpty()) {
+ String logDir = TezUtils.getContainerLogDir();
+ Path p;
+ logFileFS = FileSystem.getLocal(conf);
+ if (logDir != null) {
+ p = new Path(logDir, logFileName);
+ } else {
+ p = new Path(logFileName);
+ }
+ logFileLocation = p;
+ } else {
+ Path p = new Path(logDirPath);
+ logFileFS = p.getFileSystem(conf);
+ if (!logFileFS.exists(p)) {
+ logFileFS.mkdirs(p);
+ }
+ logFileLocation = new Path(logFileFS.resolvePath(p), logFileName);
+ }
+ maxErrors = conf.getInt(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS,
+ TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS_DEFAULT);
+ LOG.info("Initializing SimpleHistoryLoggingService, logFileLocation=" + logFileLocation
+ + ", maxErrors=" + maxErrors);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ LOG.info("Starting SimpleHistoryLoggingService");
+ outputStream = logFileFS.create(logFileLocation, true);
+ eventHandlingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ DAGHistoryEvent event;
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ event = eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.info("EventQueue take interrupted. Returning");
+ return;
+ }
+ handleEvent(event);
+ }
+ }
+ }, "HistoryEventHandlingThread");
+ eventHandlingThread.start();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Stopping SimpleHistoryLoggingService"
+ + ", eventQueueBacklog=" + eventQueue.size());
+ stopped.set(true);
+ if (eventHandlingThread != null) {
+ eventHandlingThread.interrupt();
+ }
+ while (!eventQueue.isEmpty()) {
+ DAGHistoryEvent event = eventQueue.poll();
+ if (event == null) {
+ break;
+ }
+ handleEvent(event);
+ }
+ try {
+ if (outputStream != null) {
+ outputStream.hsync();
+ outputStream.close();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Failed to close output stream", ioe);
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public void handle(DAGHistoryEvent event) {
+ eventQueue.add(event);
+ }
+
+ private synchronized void handleEvent(DAGHistoryEvent event) {
+ if (loggingDisabled) {
+ return;
+ }
+ LOG.info("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+ try {
+ try {
+ JSONObject eventJson = HistoryEventJsonConversion.convertToJson(event.getHistoryEvent());
+ outputStream.writeBytes(eventJson.toString());
+ outputStream.writeBytes(RECORD_SEPARATOR);
+ } catch (JSONException e) {
+ LOG.warn("Failed to convert event to json", e);
+ }
+ consecutiveErrors = 0;
+ } catch (IOException ioe) {
+ ++consecutiveErrors;
+ if (consecutiveErrors < maxErrors) {
+ LOG.error("Failed to write to output stream, consecutiveErrorCount=" + consecutiveErrors, ioe);
+ } else {
+ loggingDisabled = true;
+ LOG.error("Disabling SimpleHistoryLoggingService due to multiple errors," +
+ "consecutive max errors reached, maxErrors=" + maxErrors);
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
index f53cc7d..0188a8e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
@@ -39,6 +39,7 @@ public class ATSConstants {
public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
public static final String CONTAINER_ID = "containerId";
public static final String NODE_ID = "nodeId";
+ public static final String USER = "user";
/* Keys used in other info */
public static final String APP_SUBMIT_TIME = "appSubmitTime";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 782fff1..3997c2f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -18,10 +18,18 @@
package org.apache.tez.dag.history.utils;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.records.TezTaskID;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -149,4 +157,247 @@ public class DAGUtils {
return jsonObject;
}
+ public static Map<String,Object> convertCountersToATSMap(TezCounters counters) {
+ Map<String,Object> object = new LinkedHashMap<String, Object>();
+ if (counters == null) {
+ return object;
+ }
+ ArrayList<Object> counterGroupsList = new ArrayList<Object>();
+ for (CounterGroup group : counters) {
+ Map<String,Object> counterGroupMap = new LinkedHashMap<String, Object>();
+ counterGroupMap.put(ATSConstants.COUNTER_GROUP_NAME, group.getName());
+ counterGroupMap.put(ATSConstants.COUNTER_GROUP_DISPLAY_NAME,
+ group.getDisplayName());
+ ArrayList<Object> counterList = new ArrayList<Object>();
+ for (TezCounter counter : group) {
+ Map<String,Object> counterMap = new LinkedHashMap<String, Object>();
+ counterMap.put(ATSConstants.COUNTER_NAME, counter.getName());
+ counterMap.put(ATSConstants.COUNTER_DISPLAY_NAME,
+ counter.getDisplayName());
+ counterMap.put(ATSConstants.COUNTER_VALUE, counter.getValue());
+ counterList.add(counterMap);
+ }
+ putInto(counterGroupMap, ATSConstants.COUNTERS, counterList);
+ counterGroupsList.add(counterGroupMap);
+ }
+ putInto(object, ATSConstants.COUNTER_GROUPS, counterGroupsList);
+ return object;
+ }
+
+ public static Map<String,Object> convertDAGPlanToATSMap(
+ DAGProtos.DAGPlan dagPlan) {
+
+ final String VERSION_KEY = "version";
+ final int version = 1;
+ final String DAG_NAME_KEY = "dagName";
+ final String VERTICES_KEY = "vertices";
+ final String EDGES_KEY = "edges";
+ final String VERTEX_GROUPS_KEY = "vertexGroups";
+
+ final String VERTEX_NAME_KEY = "vertexName";
+ final String PROCESSOR_CLASS_KEY = "processorClass";
+ final String IN_EDGE_IDS_KEY = "inEdgeIds";
+ final String OUT_EDGE_IDS_KEY = "outEdgeIds";
+ final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
+ final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
+ final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
+ "vertexManagerPluginClass";
+
+ final String EDGE_ID_KEY = "edgeId";
+ final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
+ final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
+ final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
+ final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
+ final String SCHEDULING_TYPE_KEY = "schedulingType";
+ final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
+ final String EDGE_DESTINATION_CLASS_KEY =
+ "edgeDestinationClass";
+
+ final String NAME_KEY = "name";
+ final String CLASS_KEY = "class";
+ final String INITIALIZER_KEY = "initializer";
+
+ final String VERTEX_GROUP_NAME_KEY = "groupName";
+ final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
+ final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
+ final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
+ final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
+
+ Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
+ dagMap.put(DAG_NAME_KEY, dagPlan.getName());
+ dagMap.put(VERSION_KEY, version);
+ ArrayList<Object> verticesList = new ArrayList<Object>();
+ for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
+ Map<String,Object> vertexMap = new LinkedHashMap<String, Object>();
+ vertexMap.put(VERTEX_NAME_KEY, vertexPlan.getName());
+
+ if (vertexPlan.hasProcessorDescriptor()) {
+ vertexMap.put(PROCESSOR_CLASS_KEY,
+ vertexPlan.getProcessorDescriptor().getClassName());
+ }
+
+ ArrayList<Object> inEdgeIdList = new ArrayList<Object>();
+ inEdgeIdList.addAll(vertexPlan.getInEdgeIdList());
+ putInto(vertexMap, IN_EDGE_IDS_KEY, inEdgeIdList);
+
+ ArrayList<Object> outEdgeIdList = new ArrayList<Object>();
+ outEdgeIdList.addAll(vertexPlan.getOutEdgeIdList());
+ putInto(vertexMap, OUT_EDGE_IDS_KEY, outEdgeIdList);
+
+ ArrayList<Object> inputsList = new ArrayList<Object>();
+ for (DAGProtos.RootInputLeafOutputProto input :
+ vertexPlan.getInputsList()) {
+ Map<String,Object> inputMap = new LinkedHashMap<String, Object>();
+ inputMap.put(NAME_KEY, input.getName());
+ inputMap.put(CLASS_KEY, input.getEntityDescriptor().getClassName());
+ if (input.hasInitializerClassName()) {
+ inputMap.put(INITIALIZER_KEY, input.getInitializerClassName());
+ }
+ inputsList.add(inputMap);
+ }
+ putInto(vertexMap, ADDITIONAL_INPUTS_KEY, inputsList);
+
+ ArrayList<Object> outputsList = new ArrayList<Object>();
+ for (DAGProtos.RootInputLeafOutputProto output :
+ vertexPlan.getOutputsList()) {
+ Map<String,Object> outputMap = new LinkedHashMap<String, Object>();
+ outputMap.put(NAME_KEY, output.getName());
+ outputMap.put(CLASS_KEY, output.getEntityDescriptor().getClassName());
+ if (output.hasInitializerClassName()) {
+ outputMap.put(INITIALIZER_KEY, output.getInitializerClassName());
+ }
+ outputsList.add(outputMap);
+ }
+ putInto(vertexMap, ADDITIONAL_OUTPUTS_KEY, outputsList);
+
+ if (vertexPlan.hasVertexManagerPlugin()) {
+ vertexMap.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY,
+ vertexPlan.getVertexManagerPlugin().getClassName());
+ }
+
+ verticesList.add(vertexMap);
+ }
+ putInto(dagMap, VERTICES_KEY, verticesList);
+
+ ArrayList<Object> edgesList = new ArrayList<Object>();
+ for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) {
+ Map<String,Object> edgeMap = new LinkedHashMap<String, Object>();
+ edgeMap.put(EDGE_ID_KEY, edgePlan.getId());
+ edgeMap.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName());
+ edgeMap.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName());
+ edgeMap.put(DATA_MOVEMENT_TYPE_KEY,
+ edgePlan.getDataMovementType().name());
+ edgeMap.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name());
+ edgeMap.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name());
+ edgeMap.put(EDGE_SOURCE_CLASS_KEY,
+ edgePlan.getEdgeSource().getClassName());
+ edgeMap.put(EDGE_DESTINATION_CLASS_KEY,
+ edgePlan.getEdgeDestination().getClassName());
+
+ edgesList.add(edgeMap);
+ }
+ putInto(dagMap, EDGES_KEY, edgesList);
+
+ ArrayList<Object> vertexGroupsList = new ArrayList<Object>();
+ for (DAGProtos.PlanVertexGroupInfo vertexGroupInfo :
+ dagPlan.getVertexGroupsList()) {
+ Map<String,Object> groupMap = new LinkedHashMap<String, Object>();
+ groupMap.put(VERTEX_GROUP_NAME_KEY, vertexGroupInfo.getGroupName());
+ if (vertexGroupInfo.getGroupMembersCount() > 0 ) {
+ groupMap.put(VERTEX_GROUP_MEMBERS_KEY, vertexGroupInfo.getGroupMembersList());
+ }
+ if (vertexGroupInfo.getOutputsCount() > 0) {
+ groupMap.put(VERTEX_GROUP_OUTPUTS_KEY, vertexGroupInfo.getOutputsList());
+ }
+
+ if (vertexGroupInfo.getEdgeMergedInputsCount() > 0) {
+ ArrayList<Object> edgeMergedInputs = new ArrayList<Object>();
+ for (PlanGroupInputEdgeInfo edgeMergedInputInfo :
+ vertexGroupInfo.getEdgeMergedInputsList()) {
+ Map<String,Object> edgeMergedInput = new LinkedHashMap<String, Object>();
+ edgeMergedInput.put(VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY,
+ edgeMergedInputInfo.getDestVertexName());
+ if (edgeMergedInputInfo.hasMergedInput()
+ && edgeMergedInputInfo.getMergedInput().hasClassName()) {
+ edgeMergedInput.put(PROCESSOR_CLASS_KEY,
+ edgeMergedInputInfo.getMergedInput().getClassName());
+ }
+ edgeMergedInputs.add(edgeMergedInput);
+ }
+ groupMap.put(VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY, edgeMergedInputs);
+ }
+ vertexGroupsList.add(groupMap);
+ }
+ putInto(dagMap, VERTEX_GROUPS_KEY, vertexGroupsList);
+
+ return dagMap;
+ }
+
+ private static void putInto(Map<String, Object> map, String key,
+ ArrayList<Object> list) {
+ if (list.isEmpty()) {
+ return;
+ }
+ map.put(key, list);
+ }
+
+ private static ArrayList<String> convertToStringArrayList(
+ Collection<TezTaskID> collection) {
+ ArrayList<String> list = new ArrayList<String>(collection.size());
+ for (TezTaskID t : collection) {
+ list.add(t.toString());
+ }
+ return list;
+ }
+
+ public static Map<String,Object> convertVertexStatsToATSMap(
+ VertexStats vertexStats) {
+ Map<String,Object> vertexStatsMap = new LinkedHashMap<String, Object>();
+ if (vertexStats == null) {
+ return vertexStatsMap;
+ }
+
+ final String FIRST_TASK_START_TIME_KEY = "firstTaskStartTime";
+ final String FIRST_TASKS_TO_START_KEY = "firstTasksToStart";
+ final String LAST_TASK_FINISH_TIME_KEY = "lastTaskFinishTime";
+ final String LAST_TASKS_TO_FINISH_KEY = "lastTasksToFinish";
+
+ final String MIN_TASK_DURATION = "minTaskDuration";
+ final String MAX_TASK_DURATION = "maxTaskDuration";
+ final String AVG_TASK_DURATION = "avgTaskDuration";
+
+ final String SHORTEST_DURATION_TASKS = "shortestDurationTasks";
+ final String LONGEST_DURATION_TASKS = "longestDurationTasks";
+
+ vertexStatsMap.put(FIRST_TASK_START_TIME_KEY, vertexStats.getFirstTaskStartTime());
+ if (vertexStats.getFirstTasksToStart() != null
+ && !vertexStats.getFirstTasksToStart().isEmpty()) {
+ vertexStatsMap.put(FIRST_TASKS_TO_START_KEY,
+ convertToStringArrayList(vertexStats.getFirstTasksToStart()));
+ }
+ vertexStatsMap.put(LAST_TASK_FINISH_TIME_KEY, vertexStats.getLastTaskFinishTime());
+ if (vertexStats.getLastTasksToFinish() != null
+ && !vertexStats.getLastTasksToFinish().isEmpty()) {
+ vertexStatsMap.put(LAST_TASKS_TO_FINISH_KEY,
+ convertToStringArrayList(vertexStats.getLastTasksToFinish()));
+ }
+
+ vertexStatsMap.put(MIN_TASK_DURATION, vertexStats.getMinTaskDuration());
+ vertexStatsMap.put(MAX_TASK_DURATION, vertexStats.getMaxTaskDuration());
+ vertexStatsMap.put(AVG_TASK_DURATION, vertexStats.getAvgTaskDuration());
+
+ if (vertexStats.getShortestDurationTasks() != null
+ && !vertexStats.getShortestDurationTasks().isEmpty()) {
+ vertexStatsMap.put(SHORTEST_DURATION_TASKS,
+ convertToStringArrayList(vertexStats.getShortestDurationTasks()));
+ }
+ if (vertexStats.getLongestDurationTasks() != null
+ && !vertexStats.getLongestDurationTasks().isEmpty()) {
+ vertexStatsMap.put(LONGEST_DURATION_TASKS,
+ convertToStringArrayList(vertexStats.getLongestDurationTasks()));
+ }
+
+ return vertexStatsMap;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index b22b162..164bd2f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -112,7 +112,7 @@ public class TestHistoryEventsProtoConversion {
AMLaunchedEvent event = new AMLaunchedEvent(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1),
- 100, 100);
+ 100, 100, null);
AMLaunchedEvent deserializedEvent = (AMLaunchedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getApplicationAttemptId(),
@@ -127,7 +127,7 @@ public class TestHistoryEventsProtoConversion {
private void testAMStartedEvent() throws Exception {
AMStartedEvent event = new AMStartedEvent(
ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(0, 1), 1), 100);
+ ApplicationId.newInstance(0, 1), 1), 100, "");
AMStartedEvent deserializedEvent = (AMStartedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getApplicationAttemptId(),
@@ -142,7 +142,7 @@ public class TestHistoryEventsProtoConversion {
ApplicationId.newInstance(0, 1), 1), 1001l,
DAGPlan.newBuilder().setName("foo").build(),
ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(0, 1), 1), null);
+ ApplicationId.newInstance(0, 1), 1), null, "");
DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getApplicationAttemptId(),
@@ -160,7 +160,8 @@ public class TestHistoryEventsProtoConversion {
private void testDAGInitializedEvent() throws Exception {
DAGInitializedEvent event = new DAGInitializedEvent(
- TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,
+ "user", "dagName");
DAGInitializedEvent deserializedEvent = (DAGInitializedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getDagID(),
@@ -171,7 +172,8 @@ public class TestHistoryEventsProtoConversion {
private void testDAGStartedEvent() throws Exception {
DAGStartedEvent event = new DAGStartedEvent(
- TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,
+ "user", "dagName");
DAGStartedEvent deserializedEvent = (DAGStartedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getDagID(),
@@ -184,7 +186,7 @@ public class TestHistoryEventsProtoConversion {
{
DAGFinishedEvent event = new DAGFinishedEvent(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
- DAGState.FAILED, null, null);
+ DAGState.FAILED, null, null, "user", "dagName");
DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(
@@ -204,7 +206,8 @@ public class TestHistoryEventsProtoConversion {
tezCounters.getGroup("foo").findCounter("c1").increment(1);
DAGFinishedEvent event = new DAGFinishedEvent(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
- DAGState.FAILED, "bad diagnostics", tezCounters);
+ DAGState.FAILED, "bad diagnostics", tezCounters,
+ "user", "dagName");
DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
new file mode 100644
index 0000000..67be9f5
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryEventJsonConversion {
+
+ private ApplicationAttemptId applicationAttemptId;
+ private ApplicationId applicationId;
+ private String user = "user";
+ private Random random = new Random();
+ private TezDAGID tezDAGID;
+ private TezVertexID tezVertexID;
+ private TezTaskID tezTaskID;
+ private TezTaskAttemptID tezTaskAttemptID;
+ private DAGPlan dagPlan;
+ private ContainerId containerId;
+ private NodeId nodeId;
+
+ @Before
+ public void setup() {
+ applicationId = ApplicationId.newInstance(9999l, 1);
+ applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+ tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
+ tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
+ tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
+ tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
+ dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+ containerId = ContainerId.newInstance(applicationAttemptId, 111);
+ nodeId = NodeId.newInstance("node", 13435);
+ }
+
+ @Test
+ public void testHandlerExists() throws JSONException {
+ for (HistoryEventType eventType : HistoryEventType.values()) {
+ HistoryEvent event = null;
+ switch (eventType) {
+ case AM_LAUNCHED:
+ event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
+ user);
+ break;
+ case AM_STARTED:
+ event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
+ break;
+ case DAG_SUBMITTED:
+ event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
+ null, user);
+ break;
+ case DAG_INITIALIZED:
+ event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+ break;
+ case DAG_STARTED:
+ event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+ break;
+ case DAG_FINISHED:
+ event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
+ null, null, user, dagPlan.getName());
+ break;
+ case VERTEX_INITIALIZED:
+ event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+ random.nextInt(), "proc", null);
+ break;
+ case VERTEX_STARTED:
+ event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
+ break;
+ case VERTEX_PARALLELISM_UPDATED:
+ event = new VertexParallelismUpdatedEvent();
+ break;
+ case VERTEX_FINISHED:
+ event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+ random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
+ null, null, null);
+ break;
+ case TASK_STARTED:
+ event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
+ break;
+ case TASK_FINISHED:
+ event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
+ tezTaskAttemptID, TaskState.FAILED, null);
+ break;
+ case TASK_ATTEMPT_STARTED:
+ event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
+ nodeId, null, null);
+ break;
+ case TASK_ATTEMPT_FINISHED:
+ event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
+ random.nextInt(), TaskAttemptState.FAILED, null, null);
+ break;
+ case CONTAINER_LAUNCHED:
+ event = new ContainerLaunchedEvent(containerId, random.nextInt(),
+ applicationAttemptId);
+ break;
+ case CONTAINER_STOPPED:
+ event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
+ break;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ event = new VertexDataMovementEventsGeneratedEvent();
+ break;
+ case DAG_COMMIT_STARTED:
+ event = new DAGCommitStartedEvent();
+ break;
+ case VERTEX_COMMIT_STARTED:
+ event = new VertexCommitStartedEvent();
+ break;
+ case VERTEX_GROUP_COMMIT_STARTED:
+ event = new VertexGroupCommitStartedEvent();
+ break;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ event = new VertexGroupCommitFinishedEvent();
+ break;
+ default:
+ Assert.fail("Unhandled event type " + eventType);
+ }
+ if (event == null || !event.isHistoryEvent()) {
+ continue;
+ }
+ HistoryEventJsonConversion.convertToJson(event);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index bbe3256..95668a9 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -54,6 +54,19 @@
<profiles>
<profile>
+ <id>hadoop24</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-yarn-timeline-history</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
<id>dist-tar</id>
<activation>
<activeByDefault>false</activeByDefault>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/pom.xml b/tez-plugins/pom.xml
new file mode 100644
index 0000000..393de62
--- /dev/null
+++ b/tez-plugins/pom.xml
@@ -0,0 +1,50 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez</artifactId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-plugins</artifactId>
+ <packaging>pom</packaging>
+
+ <profiles>
+ <profile>
+ <id>hadoop24</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <modules>
+ <module>tez-yarn-timeline-history</module>
+ </modules>
+ </profile>
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/pom.xml b/tez-plugins/tez-yarn-timeline-history/pom.xml
new file mode 100644
index 0000000..150a149
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/pom.xml
@@ -0,0 +1,71 @@
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-plugins</artifactId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-yarn-timeline-history</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
new file mode 100644
index 0000000..11114b6
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.HashSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ATSHistoryLoggingService extends HistoryLoggingService {
+
+ private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class);
+
+ private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+ new LinkedBlockingQueue<DAGHistoryEvent>();
+
+ private Thread eventHandlingThread;
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+ private int eventCounter = 0;
+ private int eventsProcessed = 0;
+ private final Object lock = new Object();
+
+ @VisibleForTesting
+ TimelineClient timelineClient;
+
+ private HashSet<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
+ private long maxTimeToWaitOnShutdown;
+
+ public ATSHistoryLoggingService() {
+ super(ATSHistoryLoggingService.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ LOG.info("Initializing ATSService");
+ timelineClient = TimelineClient.createTimelineClient();
+ timelineClient.init(conf);
+ maxTimeToWaitOnShutdown = conf.getLong(
+ TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
+ TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT);
+ }
+
+ @Override
+ public void serviceStart() {
+ LOG.info("Starting ATSService");
+ timelineClient.start();
+ eventHandlingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ DAGHistoryEvent event;
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+
+ // Log the size of the event-queue every so often.
+ if (eventCounter != 0 && eventCounter % 1000 == 0) {
+ LOG.info("Event queue stats"
+ + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
+ + ", eventQueueSize=" + eventQueue.size());
+ eventCounter = 0;
+ eventsProcessed = 0;
+ } else {
+ ++eventCounter;
+ }
+
+ try {
+ event = eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.info("EventQueue take interrupted. Returning");
+ return;
+ }
+
+ synchronized (lock) {
+ ++eventsProcessed;
+ try {
+ handleEvent(event);
+ } catch (Exception e) {
+ // TODO handle failures - treat as fatal or ignore?
+ LOG.warn("Error handling event", e);
+ }
+ }
+ }
+ }
+ }, "HistoryEventHandlingThread");
+ eventHandlingThread.start();
+ }
+
+ @Override
+ public void serviceStop() {
+ LOG.info("Stopping ATSService"
+ + ", eventQueueBacklog=" + eventQueue.size());
+ stopped.set(true);
+ if (eventHandlingThread != null) {
+ eventHandlingThread.interrupt();
+ }
+ synchronized (lock) {
+ if (!eventQueue.isEmpty()) {
+ LOG.warn("ATSService being stopped"
+ + ", eventQueueBacklog=" + eventQueue.size()
+ + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown);
+ long startTime = appContext.getClock().getTime();
+ if (maxTimeToWaitOnShutdown > 0) {
+ long endTime = startTime + maxTimeToWaitOnShutdown;
+ while (endTime >= appContext.getClock().getTime()) {
+ DAGHistoryEvent event = eventQueue.poll();
+ if (event == null) {
+ break;
+ }
+ try {
+ handleEvent(event);
+ } catch (Exception e) {
+ LOG.warn("Error handling event", e);
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (!eventQueue.isEmpty()) {
+ LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
+ + ", eventQueueBacklog=" + eventQueue.size());
+ }
+ timelineClient.stop();
+ }
+
+ public void handle(DAGHistoryEvent event) {
+ eventQueue.add(event);
+ }
+
+ private void handleEvent(DAGHistoryEvent event) {
+ HistoryEventType eventType = event.getHistoryEvent().getEventType();
+
+ TezDAGID dagId = event.getDagID();
+
+ if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+ DAGSubmittedEvent dagSubmittedEvent =
+ (DAGSubmittedEvent) event.getHistoryEvent();
+ String dagName = dagSubmittedEvent.getDAGName();
+ if (dagName != null
+ && dagName.startsWith(
+ TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+ // Skip recording pre-warm DAG events
+ skippedDAGs.add(dagId);
+ return;
+ }
+ }
+ if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+ // Remove from set to keep size small
+ // No more events should be seen after this point.
+ if (skippedDAGs.remove(dagId)) {
+ return;
+ }
+ }
+
+ if (dagId != null && skippedDAGs.contains(dagId)) {
+ // Skip pre-warm DAGs
+ return;
+ }
+
+ try {
+ TimelinePutResponse response =
+ timelineClient.putEntities(
+ HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()));
+ if (response != null
+ && !response.getErrors().isEmpty()) {
+ TimelinePutError err = response.getErrors().get(0);
+ if (err.getErrorCode() != 0) {
+ LOG.warn("Could not post history event to ATS, eventType="
+ + eventType
+ + ", atsPutError=" + err.getErrorCode());
+ }
+ }
+ // Do nothing additional, ATS client library should handle throttling
+ // or auto-disable as needed
+ } catch (Exception e) {
+ LOG.warn("Could not handle history event, eventType="
+ + eventType, e);
+ }
+ }
+
+}