You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/05/22 00:13:13 UTC
[1/3] TEZ-1066. Generate events to integrate with YARN timeline
server. (hitesh)
Repository: incubator-tez
Updated Branches:
refs/heads/master 6e07fc7e7 -> bc6579614
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
new file mode 100644
index 0000000..8ab9c91
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.history.utils.DAGUtils;
+
+public class HistoryEventTimelineConversion {
+
+ public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) {
+ if (!historyEvent.isHistoryEvent()) {
+ throw new UnsupportedOperationException("Invalid Event, does not support history"
+ + ", eventType=" + historyEvent.getEventType());
+ }
+ TimelineEntity timelineEntity = null;
+ switch (historyEvent.getEventType()) {
+ case AM_LAUNCHED:
+ timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
+ break;
+ case AM_STARTED:
+ timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent);
+ break;
+ case CONTAINER_LAUNCHED:
+ timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
+ break;
+ case CONTAINER_STOPPED:
+ timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
+ break;
+ case DAG_SUBMITTED:
+ timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
+ break;
+ case DAG_INITIALIZED:
+ timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
+ break;
+ case DAG_STARTED:
+ timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
+ break;
+ case DAG_FINISHED:
+ timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
+ break;
+ case VERTEX_INITIALIZED:
+ timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
+ break;
+ case VERTEX_STARTED:
+ timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
+ break;
+ case VERTEX_FINISHED:
+ timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
+ break;
+ case TASK_STARTED:
+ timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
+ break;
+ case TASK_FINISHED:
+ timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
+ break;
+ case TASK_ATTEMPT_STARTED:
+ timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
+ break;
+ case TASK_ATTEMPT_FINISHED:
+ timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
+ break;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ case VERTEX_COMMIT_STARTED:
+ case VERTEX_GROUP_COMMIT_STARTED:
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ case VERTEX_PARALLELISM_UPDATED:
+ case DAG_COMMIT_STARTED:
+ throw new UnsupportedOperationException("Invalid Event, does not support history"
+ + ", eventType=" + historyEvent.getEventType());
+ default:
+ throw new UnsupportedOperationException("Unhandled Event"
+ + ", eventType=" + historyEvent.getEventType());
+ }
+ return timelineEntity;
+ }
+
+ private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId("tez_"
+ + event.getApplicationAttemptId().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+ event.getApplicationAttemptId().getApplicationId().toString());
+ atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
+ event.getApplicationAttemptId().toString());
+ atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+ atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+
+ atsEntity.setStartTime(event.getLaunchTime());
+
+ TimelineEvent launchEvt = new TimelineEvent();
+ launchEvt.setEventType(HistoryEventType.AM_LAUNCHED.name());
+ launchEvt.setTimestamp(event.getLaunchTime());
+ atsEntity.addEvent(launchEvt);
+
+ atsEntity.addOtherInfo(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertAMStartedEvent(AMStartedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId("tez_"
+ + event.getApplicationAttemptId().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+
+ TimelineEvent startEvt = new TimelineEvent();
+ startEvt.setEventType(HistoryEventType.AM_STARTED.name());
+ startEvt.setTimestamp(event.getStartTime());
+ atsEntity.addEvent(startEvt);
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertContainerLaunchedEvent(ContainerLaunchedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId("tez_"
+ + event.getContainerId().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
+
+ atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+ "tez_" + event.getApplicationAttemptId().toString());
+ atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID,
+ event.getContainerId().toString());
+
+ atsEntity.setStartTime(event.getLaunchTime());
+
+ TimelineEvent launchEvt = new TimelineEvent();
+ launchEvt.setEventType(HistoryEventType.CONTAINER_LAUNCHED.name());
+ launchEvt.setTimestamp(event.getLaunchTime());
+ atsEntity.addEvent(launchEvt);
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertContainerStoppedEvent(ContainerStoppedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId("tez_"
+ + event.getContainerId().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
+
+ // In case, a container is stopped in a different attempt
+ atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+ "tez_" + event.getApplicationAttemptId().toString());
+
+ TimelineEvent stoppedEvt = new TimelineEvent();
+ stoppedEvt.setEventType(HistoryEventType.CONTAINER_STOPPED.name());
+ stoppedEvt.setTimestamp(event.getStoppedTime());
+ atsEntity.addEvent(stoppedEvt);
+
+ atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus());
+ atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime());
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertDAGFinishedEvent(DAGFinishedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getDagID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+ TimelineEvent finishEvt = new TimelineEvent();
+ finishEvt.setEventType(HistoryEventType.DAG_FINISHED.name());
+ finishEvt.setTimestamp(event.getFinishTime());
+ atsEntity.addEvent(finishEvt);
+
+ atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+ atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+ atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+ atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+ atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+ atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+ atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getDagID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+ TimelineEvent finishEvt = new TimelineEvent();
+ finishEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name());
+ finishEvt.setTimestamp(event.getInitTime());
+ atsEntity.addEvent(finishEvt);
+
+ atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+ atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+ atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime());
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertDAGStartedEvent(DAGStartedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getDagID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+ TimelineEvent startEvt = new TimelineEvent();
+ startEvt.setEventType(HistoryEventType.DAG_STARTED.name());
+ startEvt.setTimestamp(event.getStartTime());
+ atsEntity.addEvent(startEvt);
+
+ atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+ atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+ atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getDagID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+ atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+ "tez_" + event.getApplicationAttemptId().toString());
+ atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+ event.getApplicationAttemptId().getApplicationId().toString());
+ atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
+ event.getApplicationAttemptId().toString());
+ atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+ TimelineEvent submitEvt = new TimelineEvent();
+ submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name());
+ submitEvt.setTimestamp(event.getSubmitTime());
+ atsEntity.addEvent(submitEvt);
+
+ atsEntity.setStartTime(event.getSubmitTime());
+
+ atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+ atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName());
+
+ atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
+ DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+ atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID,
+ event.getApplicationAttemptId().getApplicationId().toString());
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getTaskAttemptID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+ event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+ event.getTaskAttemptID().getTaskID().getVertexID().toString());
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
+ event.getTaskAttemptID().getTaskID().toString());
+
+ TimelineEvent finishEvt = new TimelineEvent();
+ finishEvt.setEventType(HistoryEventType.TASK_ATTEMPT_FINISHED.name());
+ finishEvt.setTimestamp(event.getFinishTime());
+ atsEntity.addEvent(finishEvt);
+
+ atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+ atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+ atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+ atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToATSMap(event.getCounters()));
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getTaskAttemptID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+ atsEntity.setStartTime(event.getStartTime());
+
+ atsEntity.addRelatedEntity(ATSConstants.NODE_ID, event.getNodeId().toString());
+ atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+ atsEntity.addRelatedEntity(EntityTypes.TEZ_TASK_ID.name(),
+ event.getTaskAttemptID().getTaskID().toString());
+
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+ event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+ event.getTaskAttemptID().getTaskID().getVertexID().toString());
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
+ event.getTaskAttemptID().getTaskID().toString());
+
+ TimelineEvent startEvt = new TimelineEvent();
+ startEvt.setEventType(HistoryEventType.TASK_ATTEMPT_STARTED.name());
+ startEvt.setTimestamp(event.getStartTime());
+ atsEntity.addEvent(startEvt);
+
+ atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+ atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+ atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertTaskFinishedEvent(TaskFinishedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getTaskID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
+
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+ event.getTaskID().getVertexID().getDAGId().toString());
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+ event.getTaskID().getVertexID().toString());
+
+ TimelineEvent finishEvt = new TimelineEvent();
+ finishEvt.setEventType(HistoryEventType.TASK_FINISHED.name());
+ finishEvt.setTimestamp(event.getFinishTime());
+ atsEntity.addEvent(finishEvt);
+
+ atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+ atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+
+ atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertTaskStartedEvent(TaskStartedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getTaskID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
+
+ atsEntity.addRelatedEntity(EntityTypes.TEZ_VERTEX_ID.name(),
+ event.getTaskID().getVertexID().toString());
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+ event.getTaskID().getVertexID().getDAGId().toString());
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+ event.getTaskID().getVertexID().toString());
+
+ TimelineEvent startEvt = new TimelineEvent();
+ startEvt.setEventType(HistoryEventType.TASK_STARTED.name());
+ startEvt.setTimestamp(event.getStartTime());
+ atsEntity.addEvent(startEvt);
+
+ atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+ atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertVertexFinishedEvent(VertexFinishedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getVertexID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+ event.getVertexID().getDAGId().toString());
+
+ TimelineEvent finishEvt = new TimelineEvent();
+ finishEvt.setEventType(HistoryEventType.VERTEX_FINISHED.name());
+ finishEvt.setTimestamp(event.getFinishTime());
+ atsEntity.addEvent(finishEvt);
+
+ atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+ atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+
+ atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+ atsEntity.addOtherInfo(ATSConstants.STATS,
+ DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertVertexInitializedEvent(VertexInitializedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getVertexID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+ atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(),
+ event.getVertexID().getDAGId().toString());
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+ event.getVertexID().getDAGId().toString());
+
+ TimelineEvent initEvt = new TimelineEvent();
+ initEvt.setEventType(HistoryEventType.VERTEX_INITIALIZED.name());
+ initEvt.setTimestamp(event.getInitedTime());
+ atsEntity.addEvent(initEvt);
+
+ atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME, event.getVertexName());
+ atsEntity.addOtherInfo(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
+ atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime());
+ atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
+ atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+
+ return atsEntity;
+ }
+
+ private static TimelineEntity convertVertexStartedEvent(VertexStartedEvent event) {
+ TimelineEntity atsEntity = new TimelineEntity();
+ atsEntity.setEntityId(event.getVertexID().toString());
+ atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+ atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+ event.getVertexID().getDAGId().toString());
+
+ TimelineEvent startEvt = new TimelineEvent();
+ startEvt.setEventType(HistoryEventType.VERTEX_STARTED.name());
+ startEvt.setTimestamp(event.getStartTime());
+ atsEntity.addEvent(startEvt);
+
+ atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
+ atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+
+ return atsEntity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
new file mode 100644
index 0000000..7bd3b91
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestATSHistoryLoggingService {
+
+ private ATSHistoryLoggingService atsHistoryLoggingService;
+ private AppContext appContext;
+ private Configuration conf;
+ private int atsInvokeCounter;
+ private SystemClock clock = new SystemClock();
+
+ @Before
+ public void setup() throws Exception {
+ appContext = mock(AppContext.class);
+ atsHistoryLoggingService = new ATSHistoryLoggingService();
+ atsHistoryLoggingService.setAppContext(appContext);
+ conf = new Configuration(false);
+ conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
+ 1000l);
+ atsInvokeCounter = 0;
+ atsHistoryLoggingService.init(conf);
+ atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
+ atsHistoryLoggingService.start();
+ when(appContext.getClock()).thenReturn(clock);
+ when(atsHistoryLoggingService.timelineClient.putEntities(any(TimelineEntity.class))).thenAnswer(
+ new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ ++atsInvokeCounter;
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ return null;
+ }
+ }
+ );
+ }
+
+ @After
+ public void teardown() {
+ atsHistoryLoggingService.stop();
+ atsHistoryLoggingService = null;
+ }
+
+ @Test(timeout=20000)
+ public void testATSHistoryLoggingServiceShutdown() {
+ TezDAGID tezDAGID = TezDAGID.getInstance(
+ ApplicationId.newInstance(100l, 1), 1);
+ DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
+ new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
+
+ for (int i = 0; i < 20; ++i) {
+ atsHistoryLoggingService.handle(historyEvent);
+ }
+
+ try {
+ Thread.sleep(2500l);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ atsHistoryLoggingService.stop();
+
+ Assert.assertTrue(atsInvokeCounter >= 4);
+ Assert.assertTrue(atsInvokeCounter < 10);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
new file mode 100644
index 0000000..c605585
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryEventTimelineConversion {
+
+ private ApplicationAttemptId applicationAttemptId;
+ private ApplicationId applicationId;
+ private String user = "user";
+ private Random random = new Random();
+ private TezDAGID tezDAGID;
+ private TezVertexID tezVertexID;
+ private TezTaskID tezTaskID;
+ private TezTaskAttemptID tezTaskAttemptID;
+ private DAGPlan dagPlan;
+ private ContainerId containerId;
+ private NodeId nodeId;
+
+ @Before
+ public void setup() {
+ applicationId = ApplicationId.newInstance(9999l, 1);
+ applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+ tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
+ tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
+ tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
+ tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
+ dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+ containerId = ContainerId.newInstance(applicationAttemptId, 111);
+ nodeId = NodeId.newInstance("node", 13435);
+ }
+
+ @Test
+ public void testHandlerExists() throws JSONException {
+ for (HistoryEventType eventType : HistoryEventType.values()) {
+ HistoryEvent event = null;
+ switch (eventType) {
+ case AM_LAUNCHED:
+ event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
+ user);
+ break;
+ case AM_STARTED:
+ event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
+ break;
+ case DAG_SUBMITTED:
+ event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
+ null, user);
+ break;
+ case DAG_INITIALIZED:
+ event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+ break;
+ case DAG_STARTED:
+ event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+ break;
+ case DAG_FINISHED:
+ event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
+ null, null, user, dagPlan.getName());
+ break;
+ case VERTEX_INITIALIZED:
+ event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+ random.nextInt(), "proc", null);
+ break;
+ case VERTEX_STARTED:
+ event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
+ break;
+ case VERTEX_PARALLELISM_UPDATED:
+ event = new VertexParallelismUpdatedEvent();
+ break;
+ case VERTEX_FINISHED:
+ event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+ random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
+ null, null, null);
+ break;
+ case TASK_STARTED:
+ event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
+ break;
+ case TASK_FINISHED:
+ event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
+ tezTaskAttemptID, TaskState.FAILED, null);
+ break;
+ case TASK_ATTEMPT_STARTED:
+ event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
+ nodeId, null, null);
+ break;
+ case TASK_ATTEMPT_FINISHED:
+ event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
+ random.nextInt(), TaskAttemptState.FAILED, null, null);
+ break;
+ case CONTAINER_LAUNCHED:
+ event = new ContainerLaunchedEvent(containerId, random.nextInt(),
+ applicationAttemptId);
+ break;
+ case CONTAINER_STOPPED:
+ event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
+ break;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ event = new VertexDataMovementEventsGeneratedEvent();
+ break;
+ case DAG_COMMIT_STARTED:
+ event = new DAGCommitStartedEvent();
+ break;
+ case VERTEX_COMMIT_STARTED:
+ event = new VertexCommitStartedEvent();
+ break;
+ case VERTEX_GROUP_COMMIT_STARTED:
+ event = new VertexGroupCommitStartedEvent();
+ break;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ event = new VertexGroupCommitFinishedEvent();
+ break;
+ default:
+ Assert.fail("Unhandled event type " + eventType);
+ }
+ if (event == null || !event.isHistoryEvent()) {
+ continue;
+ }
+ HistoryEventTimelineConversion.convertToTimelineEntity(event);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 171458f..ffea08c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezSession;
@@ -60,6 +61,7 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
import org.apache.tez.mapreduce.examples.ExampleDriver;
import org.apache.tez.mapreduce.examples.IntersectDataGen;
import org.apache.tez.mapreduce.examples.IntersectExample;
@@ -67,6 +69,7 @@ import org.apache.tez.mapreduce.examples.IntersectValidate;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -362,5 +365,59 @@ public class TestTezJobs {
}
+ // Submits a simple 5 stage sleep job using tez session. Then kills it.
+ @Test(timeout = 60000)
+ public void testHistoryLogging() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+ DAG dag = new DAG("TezSleepProcessorHistoryLogging");
+ Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+ SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 2,
+ Resource.newInstance(1024, 1));
+ dag.addVertex(vertex);
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+ .nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+ FileSystem localFs = FileSystem.getLocal(tezConf);
+ Path historyLogDir = new Path(TEST_ROOT_DIR, "testHistoryLogging");
+ localFs.mkdirs(historyLogDir);
+
+ tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR,
+ localFs.makeQualified(historyLogDir).toString());
+
+ TezClient tezClient = new TezClient(tezConf);
+ AMConfiguration amConf = new AMConfiguration(new HashMap<String, String>(),
+ new HashMap<String, LocalResource>(), tezConf, null);
+
+ DAGClient dagClient = tezClient.submitDAGApplication(dag, amConf);
+
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ + dagStatus.getState());
+ Thread.sleep(500l);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+ assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+
+ FileStatus historyLogFileStatus = null;
+ for (FileStatus fileStatus : localFs.listStatus(historyLogDir)) {
+ if (fileStatus.isDirectory()) {
+ continue;
+ }
+ Path p = fileStatus.getPath();
+ if (p.getName().startsWith(SimpleHistoryLoggingService.LOG_FILE_NAME_PREFIX)) {
+ historyLogFileStatus = fileStatus;
+ break;
+ }
+ }
+ Assert.assertNotNull(historyLogFileStatus);
+ Assert.assertTrue(historyLogFileStatus.getLen() > 0);
+ }
}
[2/3] TEZ-1066. Generate events to integrate with YARN timeline
server. (hitesh)
Posted by hi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index e6023f1..da00b06 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -18,19 +18,14 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class VertexStartedEvent implements HistoryEvent {
@@ -54,39 +49,6 @@ public class VertexStartedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, vertexID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
- // Related entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject vertexEntity = new JSONObject();
- vertexEntity.put(ATSConstants.ENTITY, vertexID.getDAGId().toString());
- vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
- relatedEntities.put(vertexEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject startEvent = new JSONObject();
- startEvent.put(ATSConstants.TIMESTAMP, startTime);
- startEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.VERTEX_STARTED.name());
- events.put(startEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info
- // TODO fix requested times to be events
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.START_REQUESTED_TIME, startRequestedTime);
- otherInfo.put(ATSConstants.START_TIME, startTime);
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
new file mode 100644
index 0000000..00cac28
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging;
+
+public enum EntityTypes {
+ TEZ_APPLICATION_ATTEMPT,
+ TEZ_CONTAINER_ID,
+ TEZ_DAG_ID,
+ TEZ_VERTEX_ID,
+ TEZ_TASK_ID,
+ TEZ_TASK_ATTEMPT_ID,
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
new file mode 100644
index 0000000..44efad7
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+
+public abstract class HistoryLoggingService extends AbstractService {
+
+ protected AppContext appContext;
+
+ public void setAppContext(AppContext appContext) {
+ this.appContext = appContext;
+ }
+
+ public HistoryLoggingService(String name) {
+ super(name);
+ }
+
+ /**
+ * Handle logging of history event
+ * @param event History event to be logged
+ */
+ public abstract void handle(DAGHistoryEvent event);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
new file mode 100644
index 0000000..e5bb1e5
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -0,0 +1,633 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.history.utils.DAGUtils;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class HistoryEventJsonConversion {
+
+ public static JSONObject convertToJson(HistoryEvent historyEvent) throws JSONException {
+ if (!historyEvent.isHistoryEvent()) {
+ throw new UnsupportedOperationException("Invalid Event, does not support history"
+ + ", eventType=" + historyEvent.getEventType());
+ }
+ JSONObject jsonObject = null;
+ switch (historyEvent.getEventType()) {
+ case AM_LAUNCHED:
+ jsonObject = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
+ break;
+ case AM_STARTED:
+ jsonObject = convertAMStartedEvent((AMStartedEvent) historyEvent);
+ break;
+ case CONTAINER_LAUNCHED:
+ jsonObject = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
+ break;
+ case CONTAINER_STOPPED:
+ jsonObject = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
+ break;
+ case DAG_SUBMITTED:
+ jsonObject = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
+ break;
+ case DAG_INITIALIZED:
+ jsonObject = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
+ break;
+ case DAG_STARTED:
+ jsonObject = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
+ break;
+ case DAG_FINISHED:
+ jsonObject = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
+ break;
+ case VERTEX_INITIALIZED:
+ jsonObject = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
+ break;
+ case VERTEX_STARTED:
+ jsonObject = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
+ break;
+ case VERTEX_FINISHED:
+ jsonObject = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
+ break;
+ case TASK_STARTED:
+ jsonObject = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
+ break;
+ case TASK_FINISHED:
+ jsonObject = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
+ break;
+ case TASK_ATTEMPT_STARTED:
+ jsonObject = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
+ break;
+ case TASK_ATTEMPT_FINISHED:
+ jsonObject = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
+ break;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ case VERTEX_COMMIT_STARTED:
+ case VERTEX_GROUP_COMMIT_STARTED:
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ case VERTEX_PARALLELISM_UPDATED:
+ case DAG_COMMIT_STARTED:
+ throw new UnsupportedOperationException("Invalid Event, does not support history"
+ + ", eventType=" + historyEvent.getEventType());
+ default:
+ throw new UnsupportedOperationException("Unhandled Event"
+ + ", eventType=" + historyEvent.getEventType());
+ }
+ return jsonObject;
+ }
+
+ private static JSONObject convertAMLaunchedEvent(AMLaunchedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + event.getApplicationAttemptId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ // Related Entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appEntity = new JSONObject();
+ appEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().getApplicationId().toString());
+ appEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ID);
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ATTEMPT_ID);
+ relatedEntities.put(appEntity);
+ relatedEntities.put(appAttemptEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject initEvent = new JSONObject();
+ initEvent.put(ATSConstants.TIMESTAMP, event.getLaunchTime());
+ initEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.AM_LAUNCHED.name());
+ events.put(initEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info to tag with Tez AM
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertAMStartedEvent(AMStartedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + event.getApplicationAttemptId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ // Related Entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appEntity = new JSONObject();
+ appEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().getApplicationId().toString());
+ appEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ID);
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ATTEMPT_ID);
+ relatedEntities.put(appEntity);
+ relatedEntities.put(appAttemptEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.AM_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ return jsonObject; }
+
+ private static JSONObject convertContainerLaunchedEvent(ContainerLaunchedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + event.getContainerId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_CONTAINER_ID.name());
+
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ JSONObject containerEntity = new JSONObject();
+ containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+ containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+ relatedEntities.put(appAttemptEntity);
+ relatedEntities.put(containerEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject launchEvent = new JSONObject();
+ launchEvent.put(ATSConstants.TIMESTAMP, event.getLaunchTime());
+ launchEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.CONTAINER_LAUNCHED.name());
+ events.put(launchEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // TODO add other container info here? or assume AHS will have this?
+ // TODO container logs?
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertContainerStoppedEvent(ContainerStoppedEvent event) throws JSONException {
+ // structure is identical to ContainerLaunchedEvent
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + event.getContainerId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_CONTAINER_ID.name());
+
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ JSONObject containerEntity = new JSONObject();
+ containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+ containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+ relatedEntities.put(appAttemptEntity);
+ relatedEntities.put(containerEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject stopEvent = new JSONObject();
+ stopEvent.put(ATSConstants.TIMESTAMP, event.getStoppedTime());
+ stopEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.CONTAINER_STOPPED.name());
+ events.put(stopEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // TODO add other container info here? or assume AHS will have this?
+ // TODO container logs?
+
+ // Other info
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.EXIT_STATUS, event.getExitStatus());
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject; }
+
+ private static JSONObject convertDAGFinishedEvent(DAGFinishedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ event.getDagID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities not needed as should have been done in
+ // dag submission event
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.DAG_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ otherInfo.put(ATSConstants.STATUS, event.getState().name());
+ otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+ otherInfo.put(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToJSON(event.getTezCounters()));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertDAGInitializedEvent(DAGInitializedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ event.getDagID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities not needed as should have been done in
+ // dag submission event
+
+ JSONArray events = new JSONArray();
+ JSONObject initEvent = new JSONObject();
+ initEvent.put(ATSConstants.TIMESTAMP, event.getInitTime());
+ initEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.DAG_INITIALIZED.name());
+ events.put(initEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertDAGStartedEvent(DAGStartedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ event.getDagID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities not needed as should have been done in
+ // dag submission event
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.DAG_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertDAGSubmittedEvent(DAGSubmittedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ event.getDagID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject tezAppEntity = new JSONObject();
+ tezAppEntity.put(ATSConstants.ENTITY,
+ "tez_" + event.getApplicationAttemptId().toString());
+ tezAppEntity.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+ JSONObject appEntity = new JSONObject();
+ appEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().getApplicationId().toString());
+ appEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ID);
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
+ event.getApplicationAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.APPLICATION_ATTEMPT_ID);
+ JSONObject userEntity = new JSONObject();
+ userEntity.put(ATSConstants.ENTITY,
+ event.getUser());
+ userEntity.put(ATSConstants.ENTITY_TYPE,
+ ATSConstants.USER);
+
+ relatedEntities.put(tezAppEntity);
+ relatedEntities.put(appEntity);
+ relatedEntities.put(appAttemptEntity);
+ relatedEntities.put(userEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // filters
+ JSONObject primaryFilters = new JSONObject();
+ primaryFilters.put(ATSConstants.DAG_NAME,
+ event.getDAGName());
+ jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject submitEvent = new JSONObject();
+ submitEvent.put(ATSConstants.TIMESTAMP, event.getSubmitTime());
+ submitEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.DAG_SUBMITTED.name());
+ events.put(submitEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info such as dag plan
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.DAG_PLAN,
+ DAGUtils.generateSimpleJSONPlan(event.getDAGPlan()));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.TASK_ATTEMPT_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ otherInfo.put(ATSConstants.STATUS, event.getState().name());
+ otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+ otherInfo.put(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToJSON(event.getCounters()));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
+ EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject nodeEntity = new JSONObject();
+ nodeEntity.put(ATSConstants.ENTITY, event.getNodeId().toString());
+ nodeEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.NODE_ID);
+
+ JSONObject containerEntity = new JSONObject();
+ containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+ containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+ JSONObject taskEntity = new JSONObject();
+ taskEntity.put(ATSConstants.ENTITY, event.getTaskAttemptID().getTaskID().toString());
+ taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+ relatedEntities.put(nodeEntity);
+ relatedEntities.put(containerEntity);
+ relatedEntities.put(taskEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.TASK_ATTEMPT_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+ otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskFinishedEvent(TaskFinishedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.TASK_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ otherInfo.put(ATSConstants.STATUS, event.getState().name());
+ otherInfo.put(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToJSON(event.getTezCounters()));
+
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskStartedEvent(TaskStartedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject vertexEntity = new JSONObject();
+ vertexEntity.put(ATSConstants.ENTITY, event.getTaskID().getVertexID().toString());
+ vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+ relatedEntities.put(vertexEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.TASK_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ // TODO fix schedule/launch time to be events
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+ otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertVertexFinishedEvent(VertexFinishedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.VERTEX_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+ otherInfo.put(ATSConstants.STATUS, event.getState().name());
+ otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+ otherInfo.put(ATSConstants.COUNTERS,
+ DAGUtils.convertCountersToJSON(event.getTezCounters()));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertVertexInitializedEvent(VertexInitializedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject vertexEntity = new JSONObject();
+ vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString());
+ vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+ relatedEntities.put(vertexEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject initEvent = new JSONObject();
+ initEvent.put(ATSConstants.TIMESTAMP, event.getInitedTime());
+ initEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.VERTEX_INITIALIZED.name());
+ events.put(initEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ // TODO fix requested times to be events
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.VERTEX_NAME, event.getVertexName());
+ otherInfo.put(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
+ otherInfo.put(ATSConstants.INIT_TIME, event.getInitedTime());
+ otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
+ otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertVertexStartedEvent(VertexStartedEvent event) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject vertexEntity = new JSONObject();
+ vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString());
+ vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+ relatedEntities.put(vertexEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
+ HistoryEventType.VERTEX_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ // TODO fix requested times to be events
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
+ otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
new file mode 100644
index 0000000..257b6c8
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class SimpleHistoryLoggingService extends HistoryLoggingService {
+
+ private static final Log LOG = LogFactory.getLog(SimpleHistoryLoggingService.class);
+ private Path logFileLocation;
+ private FileSystem logFileFS;
+ private FSDataOutputStream outputStream;
+ private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+ new LinkedBlockingQueue<DAGHistoryEvent>();
+ public static final String RECORD_SEPARATOR = System.getProperty("line.separator") + "\u0001";
+ public static final String LOG_FILE_NAME_PREFIX = "history.txt";
+
+ private Thread eventHandlingThread;
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private int consecutiveErrors = 0;
+ private int maxErrors;
+ private boolean loggingDisabled = false;
+
+ public SimpleHistoryLoggingService() {
+ super(SimpleHistoryLoggingService.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ String logDirPath = conf.get(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR);
+ final String logFileName = LOG_FILE_NAME_PREFIX + "." + appContext.getApplicationAttemptId();
+ if (logDirPath == null || logDirPath.isEmpty()) {
+ String logDir = TezUtils.getContainerLogDir();
+ Path p;
+ logFileFS = FileSystem.getLocal(conf);
+ if (logDir != null) {
+ p = new Path(logDir, logFileName);
+ } else {
+ p = new Path(logFileName);
+ }
+ logFileLocation = p;
+ } else {
+ Path p = new Path(logDirPath);
+ logFileFS = p.getFileSystem(conf);
+ if (!logFileFS.exists(p)) {
+ logFileFS.mkdirs(p);
+ }
+ logFileLocation = new Path(logFileFS.resolvePath(p), logFileName);
+ }
+ maxErrors = conf.getInt(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS,
+ TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS_DEFAULT);
+ LOG.info("Initializing SimpleHistoryLoggingService, logFileLocation=" + logFileLocation
+ + ", maxErrors=" + maxErrors);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ LOG.info("Starting SimpleHistoryLoggingService");
+ outputStream = logFileFS.create(logFileLocation, true);
+ eventHandlingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ DAGHistoryEvent event;
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ event = eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.info("EventQueue take interrupted. Returning");
+ return;
+ }
+ handleEvent(event);
+ }
+ }
+ }, "HistoryEventHandlingThread");
+ eventHandlingThread.start();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Stopping SimpleHistoryLoggingService"
+ + ", eventQueueBacklog=" + eventQueue.size());
+ stopped.set(true);
+ if (eventHandlingThread != null) {
+ eventHandlingThread.interrupt();
+ }
+ while (!eventQueue.isEmpty()) {
+ DAGHistoryEvent event = eventQueue.poll();
+ if (event == null) {
+ break;
+ }
+ handleEvent(event);
+ }
+ try {
+ if (outputStream != null) {
+ outputStream.hsync();
+ outputStream.close();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Failed to close output stream", ioe);
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public void handle(DAGHistoryEvent event) {
+ eventQueue.add(event);
+ }
+
+ private synchronized void handleEvent(DAGHistoryEvent event) {
+ if (loggingDisabled) {
+ return;
+ }
+ LOG.info("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+ try {
+ try {
+ JSONObject eventJson = HistoryEventJsonConversion.convertToJson(event.getHistoryEvent());
+ outputStream.writeBytes(eventJson.toString());
+ outputStream.writeBytes(RECORD_SEPARATOR);
+ } catch (JSONException e) {
+ LOG.warn("Failed to convert event to json", e);
+ }
+ consecutiveErrors = 0;
+ } catch (IOException ioe) {
+ ++consecutiveErrors;
+ if (consecutiveErrors < maxErrors) {
+ LOG.error("Failed to write to output stream, consecutiveErrorCount=" + consecutiveErrors, ioe);
+ } else {
+ loggingDisabled = true;
+ LOG.error("Disabling SimpleHistoryLoggingService due to multiple errors," +
+ "consecutive max errors reached, maxErrors=" + maxErrors);
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
index f53cc7d..0188a8e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
@@ -39,6 +39,7 @@ public class ATSConstants {
public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
public static final String CONTAINER_ID = "containerId";
public static final String NODE_ID = "nodeId";
+ public static final String USER = "user";
/* Keys used in other info */
public static final String APP_SUBMIT_TIME = "appSubmitTime";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 782fff1..3997c2f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -18,10 +18,18 @@
package org.apache.tez.dag.history.utils;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.records.TezTaskID;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -149,4 +157,247 @@ public class DAGUtils {
return jsonObject;
}
+ public static Map<String,Object> convertCountersToATSMap(TezCounters counters) {
+ Map<String,Object> object = new LinkedHashMap<String, Object>();
+ if (counters == null) {
+ return object;
+ }
+ ArrayList<Object> counterGroupsList = new ArrayList<Object>();
+ for (CounterGroup group : counters) {
+ Map<String,Object> counterGroupMap = new LinkedHashMap<String, Object>();
+ counterGroupMap.put(ATSConstants.COUNTER_GROUP_NAME, group.getName());
+ counterGroupMap.put(ATSConstants.COUNTER_GROUP_DISPLAY_NAME,
+ group.getDisplayName());
+ ArrayList<Object> counterList = new ArrayList<Object>();
+ for (TezCounter counter : group) {
+ Map<String,Object> counterMap = new LinkedHashMap<String, Object>();
+ counterMap.put(ATSConstants.COUNTER_NAME, counter.getName());
+ counterMap.put(ATSConstants.COUNTER_DISPLAY_NAME,
+ counter.getDisplayName());
+ counterMap.put(ATSConstants.COUNTER_VALUE, counter.getValue());
+ counterList.add(counterMap);
+ }
+ putInto(counterGroupMap, ATSConstants.COUNTERS, counterList);
+ counterGroupsList.add(counterGroupMap);
+ }
+ putInto(object, ATSConstants.COUNTER_GROUPS, counterGroupsList);
+ return object;
+ }
+
+ public static Map<String,Object> convertDAGPlanToATSMap(
+ DAGProtos.DAGPlan dagPlan) {
+
+ final String VERSION_KEY = "version";
+ final int version = 1;
+ final String DAG_NAME_KEY = "dagName";
+ final String VERTICES_KEY = "vertices";
+ final String EDGES_KEY = "edges";
+ final String VERTEX_GROUPS_KEY = "vertexGroups";
+
+ final String VERTEX_NAME_KEY = "vertexName";
+ final String PROCESSOR_CLASS_KEY = "processorClass";
+ final String IN_EDGE_IDS_KEY = "inEdgeIds";
+ final String OUT_EDGE_IDS_KEY = "outEdgeIds";
+ final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
+ final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
+ final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
+ "vertexManagerPluginClass";
+
+ final String EDGE_ID_KEY = "edgeId";
+ final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
+ final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
+ final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
+ final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
+ final String SCHEDULING_TYPE_KEY = "schedulingType";
+ final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
+ final String EDGE_DESTINATION_CLASS_KEY =
+ "edgeDestinationClass";
+
+ final String NAME_KEY = "name";
+ final String CLASS_KEY = "class";
+ final String INITIALIZER_KEY = "initializer";
+
+ final String VERTEX_GROUP_NAME_KEY = "groupName";
+ final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
+ final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
+ final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
+ final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
+
+ Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
+ dagMap.put(DAG_NAME_KEY, dagPlan.getName());
+ dagMap.put(VERSION_KEY, version);
+ ArrayList<Object> verticesList = new ArrayList<Object>();
+ for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
+ Map<String,Object> vertexMap = new LinkedHashMap<String, Object>();
+ vertexMap.put(VERTEX_NAME_KEY, vertexPlan.getName());
+
+ if (vertexPlan.hasProcessorDescriptor()) {
+ vertexMap.put(PROCESSOR_CLASS_KEY,
+ vertexPlan.getProcessorDescriptor().getClassName());
+ }
+
+ ArrayList<Object> inEdgeIdList = new ArrayList<Object>();
+ inEdgeIdList.addAll(vertexPlan.getInEdgeIdList());
+ putInto(vertexMap, IN_EDGE_IDS_KEY, inEdgeIdList);
+
+ ArrayList<Object> outEdgeIdList = new ArrayList<Object>();
+ outEdgeIdList.addAll(vertexPlan.getOutEdgeIdList());
+ putInto(vertexMap, OUT_EDGE_IDS_KEY, outEdgeIdList);
+
+ ArrayList<Object> inputsList = new ArrayList<Object>();
+ for (DAGProtos.RootInputLeafOutputProto input :
+ vertexPlan.getInputsList()) {
+ Map<String,Object> inputMap = new LinkedHashMap<String, Object>();
+ inputMap.put(NAME_KEY, input.getName());
+ inputMap.put(CLASS_KEY, input.getEntityDescriptor().getClassName());
+ if (input.hasInitializerClassName()) {
+ inputMap.put(INITIALIZER_KEY, input.getInitializerClassName());
+ }
+ inputsList.add(inputMap);
+ }
+ putInto(vertexMap, ADDITIONAL_INPUTS_KEY, inputsList);
+
+ ArrayList<Object> outputsList = new ArrayList<Object>();
+ for (DAGProtos.RootInputLeafOutputProto output :
+ vertexPlan.getOutputsList()) {
+ Map<String,Object> outputMap = new LinkedHashMap<String, Object>();
+ outputMap.put(NAME_KEY, output.getName());
+ outputMap.put(CLASS_KEY, output.getEntityDescriptor().getClassName());
+ if (output.hasInitializerClassName()) {
+ outputMap.put(INITIALIZER_KEY, output.getInitializerClassName());
+ }
+ outputsList.add(outputMap);
+ }
+ putInto(vertexMap, ADDITIONAL_OUTPUTS_KEY, outputsList);
+
+ if (vertexPlan.hasVertexManagerPlugin()) {
+ vertexMap.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY,
+ vertexPlan.getVertexManagerPlugin().getClassName());
+ }
+
+ verticesList.add(vertexMap);
+ }
+ putInto(dagMap, VERTICES_KEY, verticesList);
+
+ ArrayList<Object> edgesList = new ArrayList<Object>();
+ for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) {
+ Map<String,Object> edgeMap = new LinkedHashMap<String, Object>();
+ edgeMap.put(EDGE_ID_KEY, edgePlan.getId());
+ edgeMap.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName());
+ edgeMap.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName());
+ edgeMap.put(DATA_MOVEMENT_TYPE_KEY,
+ edgePlan.getDataMovementType().name());
+ edgeMap.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name());
+ edgeMap.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name());
+ edgeMap.put(EDGE_SOURCE_CLASS_KEY,
+ edgePlan.getEdgeSource().getClassName());
+ edgeMap.put(EDGE_DESTINATION_CLASS_KEY,
+ edgePlan.getEdgeDestination().getClassName());
+
+ edgesList.add(edgeMap);
+ }
+ putInto(dagMap, EDGES_KEY, edgesList);
+
+ ArrayList<Object> vertexGroupsList = new ArrayList<Object>();
+ for (DAGProtos.PlanVertexGroupInfo vertexGroupInfo :
+ dagPlan.getVertexGroupsList()) {
+ Map<String,Object> groupMap = new LinkedHashMap<String, Object>();
+ groupMap.put(VERTEX_GROUP_NAME_KEY, vertexGroupInfo.getGroupName());
+ if (vertexGroupInfo.getGroupMembersCount() > 0 ) {
+ groupMap.put(VERTEX_GROUP_MEMBERS_KEY, vertexGroupInfo.getGroupMembersList());
+ }
+ if (vertexGroupInfo.getOutputsCount() > 0) {
+ groupMap.put(VERTEX_GROUP_OUTPUTS_KEY, vertexGroupInfo.getOutputsList());
+ }
+
+ if (vertexGroupInfo.getEdgeMergedInputsCount() > 0) {
+ ArrayList<Object> edgeMergedInputs = new ArrayList<Object>();
+ for (PlanGroupInputEdgeInfo edgeMergedInputInfo :
+ vertexGroupInfo.getEdgeMergedInputsList()) {
+ Map<String,Object> edgeMergedInput = new LinkedHashMap<String, Object>();
+ edgeMergedInput.put(VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY,
+ edgeMergedInputInfo.getDestVertexName());
+ if (edgeMergedInputInfo.hasMergedInput()
+ && edgeMergedInputInfo.getMergedInput().hasClassName()) {
+ edgeMergedInput.put(PROCESSOR_CLASS_KEY,
+ edgeMergedInputInfo.getMergedInput().getClassName());
+ }
+ edgeMergedInputs.add(edgeMergedInput);
+ }
+ groupMap.put(VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY, edgeMergedInputs);
+ }
+ vertexGroupsList.add(groupMap);
+ }
+ putInto(dagMap, VERTEX_GROUPS_KEY, vertexGroupsList);
+
+ return dagMap;
+ }
+
+ private static void putInto(Map<String, Object> map, String key,
+ ArrayList<Object> list) {
+ if (list.isEmpty()) {
+ return;
+ }
+ map.put(key, list);
+ }
+
+ private static ArrayList<String> convertToStringArrayList(
+ Collection<TezTaskID> collection) {
+ ArrayList<String> list = new ArrayList<String>(collection.size());
+ for (TezTaskID t : collection) {
+ list.add(t.toString());
+ }
+ return list;
+ }
+
+ public static Map<String,Object> convertVertexStatsToATSMap(
+ VertexStats vertexStats) {
+ Map<String,Object> vertexStatsMap = new LinkedHashMap<String, Object>();
+ if (vertexStats == null) {
+ return vertexStatsMap;
+ }
+
+ final String FIRST_TASK_START_TIME_KEY = "firstTaskStartTime";
+ final String FIRST_TASKS_TO_START_KEY = "firstTasksToStart";
+ final String LAST_TASK_FINISH_TIME_KEY = "lastTaskFinishTime";
+ final String LAST_TASKS_TO_FINISH_KEY = "lastTasksToFinish";
+
+ final String MIN_TASK_DURATION = "minTaskDuration";
+ final String MAX_TASK_DURATION = "maxTaskDuration";
+ final String AVG_TASK_DURATION = "avgTaskDuration";
+
+ final String SHORTEST_DURATION_TASKS = "shortestDurationTasks";
+ final String LONGEST_DURATION_TASKS = "longestDurationTasks";
+
+ vertexStatsMap.put(FIRST_TASK_START_TIME_KEY, vertexStats.getFirstTaskStartTime());
+ if (vertexStats.getFirstTasksToStart() != null
+ && !vertexStats.getFirstTasksToStart().isEmpty()) {
+ vertexStatsMap.put(FIRST_TASKS_TO_START_KEY,
+ convertToStringArrayList(vertexStats.getFirstTasksToStart()));
+ }
+ vertexStatsMap.put(LAST_TASK_FINISH_TIME_KEY, vertexStats.getLastTaskFinishTime());
+ if (vertexStats.getLastTasksToFinish() != null
+ && !vertexStats.getLastTasksToFinish().isEmpty()) {
+ vertexStatsMap.put(LAST_TASKS_TO_FINISH_KEY,
+ convertToStringArrayList(vertexStats.getLastTasksToFinish()));
+ }
+
+ vertexStatsMap.put(MIN_TASK_DURATION, vertexStats.getMinTaskDuration());
+ vertexStatsMap.put(MAX_TASK_DURATION, vertexStats.getMaxTaskDuration());
+ vertexStatsMap.put(AVG_TASK_DURATION, vertexStats.getAvgTaskDuration());
+
+ if (vertexStats.getShortestDurationTasks() != null
+ && !vertexStats.getShortestDurationTasks().isEmpty()) {
+ vertexStatsMap.put(SHORTEST_DURATION_TASKS,
+ convertToStringArrayList(vertexStats.getShortestDurationTasks()));
+ }
+ if (vertexStats.getLongestDurationTasks() != null
+ && !vertexStats.getLongestDurationTasks().isEmpty()) {
+ vertexStatsMap.put(LONGEST_DURATION_TASKS,
+ convertToStringArrayList(vertexStats.getLongestDurationTasks()));
+ }
+
+ return vertexStatsMap;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index b22b162..164bd2f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -112,7 +112,7 @@ public class TestHistoryEventsProtoConversion {
AMLaunchedEvent event = new AMLaunchedEvent(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1),
- 100, 100);
+ 100, 100, null);
AMLaunchedEvent deserializedEvent = (AMLaunchedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getApplicationAttemptId(),
@@ -127,7 +127,7 @@ public class TestHistoryEventsProtoConversion {
private void testAMStartedEvent() throws Exception {
AMStartedEvent event = new AMStartedEvent(
ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(0, 1), 1), 100);
+ ApplicationId.newInstance(0, 1), 1), 100, "");
AMStartedEvent deserializedEvent = (AMStartedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getApplicationAttemptId(),
@@ -142,7 +142,7 @@ public class TestHistoryEventsProtoConversion {
ApplicationId.newInstance(0, 1), 1), 1001l,
DAGPlan.newBuilder().setName("foo").build(),
ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(0, 1), 1), null);
+ ApplicationId.newInstance(0, 1), 1), null, "");
DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getApplicationAttemptId(),
@@ -160,7 +160,8 @@ public class TestHistoryEventsProtoConversion {
private void testDAGInitializedEvent() throws Exception {
DAGInitializedEvent event = new DAGInitializedEvent(
- TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,
+ "user", "dagName");
DAGInitializedEvent deserializedEvent = (DAGInitializedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getDagID(),
@@ -171,7 +172,8 @@ public class TestHistoryEventsProtoConversion {
private void testDAGStartedEvent() throws Exception {
DAGStartedEvent event = new DAGStartedEvent(
- TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,
+ "user", "dagName");
DAGStartedEvent deserializedEvent = (DAGStartedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getDagID(),
@@ -184,7 +186,7 @@ public class TestHistoryEventsProtoConversion {
{
DAGFinishedEvent event = new DAGFinishedEvent(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
- DAGState.FAILED, null, null);
+ DAGState.FAILED, null, null, "user", "dagName");
DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(
@@ -204,7 +206,8 @@ public class TestHistoryEventsProtoConversion {
tezCounters.getGroup("foo").findCounter("c1").increment(1);
DAGFinishedEvent event = new DAGFinishedEvent(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
- DAGState.FAILED, "bad diagnostics", tezCounters);
+ DAGState.FAILED, "bad diagnostics", tezCounters,
+ "user", "dagName");
DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
new file mode 100644
index 0000000..67be9f5
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryEventJsonConversion {
+
+ private ApplicationAttemptId applicationAttemptId;
+ private ApplicationId applicationId;
+ private String user = "user";
+ private Random random = new Random();
+ private TezDAGID tezDAGID;
+ private TezVertexID tezVertexID;
+ private TezTaskID tezTaskID;
+ private TezTaskAttemptID tezTaskAttemptID;
+ private DAGPlan dagPlan;
+ private ContainerId containerId;
+ private NodeId nodeId;
+
+ @Before
+ public void setup() {
+ applicationId = ApplicationId.newInstance(9999l, 1);
+ applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+ tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
+ tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
+ tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
+ tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
+ dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+ containerId = ContainerId.newInstance(applicationAttemptId, 111);
+ nodeId = NodeId.newInstance("node", 13435);
+ }
+
+ @Test
+ public void testHandlerExists() throws JSONException {
+ for (HistoryEventType eventType : HistoryEventType.values()) {
+ HistoryEvent event = null;
+ switch (eventType) {
+ case AM_LAUNCHED:
+ event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
+ user);
+ break;
+ case AM_STARTED:
+ event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
+ break;
+ case DAG_SUBMITTED:
+ event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
+ null, user);
+ break;
+ case DAG_INITIALIZED:
+ event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+ break;
+ case DAG_STARTED:
+ event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+ break;
+ case DAG_FINISHED:
+ event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
+ null, null, user, dagPlan.getName());
+ break;
+ case VERTEX_INITIALIZED:
+ event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+ random.nextInt(), "proc", null);
+ break;
+ case VERTEX_STARTED:
+ event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
+ break;
+ case VERTEX_PARALLELISM_UPDATED:
+ event = new VertexParallelismUpdatedEvent();
+ break;
+ case VERTEX_FINISHED:
+ event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+ random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
+ null, null, null);
+ break;
+ case TASK_STARTED:
+ event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
+ break;
+ case TASK_FINISHED:
+ event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
+ tezTaskAttemptID, TaskState.FAILED, null);
+ break;
+ case TASK_ATTEMPT_STARTED:
+ event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
+ nodeId, null, null);
+ break;
+ case TASK_ATTEMPT_FINISHED:
+ event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
+ random.nextInt(), TaskAttemptState.FAILED, null, null);
+ break;
+ case CONTAINER_LAUNCHED:
+ event = new ContainerLaunchedEvent(containerId, random.nextInt(),
+ applicationAttemptId);
+ break;
+ case CONTAINER_STOPPED:
+ event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
+ break;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ event = new VertexDataMovementEventsGeneratedEvent();
+ break;
+ case DAG_COMMIT_STARTED:
+ event = new DAGCommitStartedEvent();
+ break;
+ case VERTEX_COMMIT_STARTED:
+ event = new VertexCommitStartedEvent();
+ break;
+ case VERTEX_GROUP_COMMIT_STARTED:
+ event = new VertexGroupCommitStartedEvent();
+ break;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ event = new VertexGroupCommitFinishedEvent();
+ break;
+ default:
+ Assert.fail("Unhandled event type " + eventType);
+ }
+ if (event == null || !event.isHistoryEvent()) {
+ continue;
+ }
+ HistoryEventJsonConversion.convertToJson(event);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index bbe3256..95668a9 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -54,6 +54,19 @@
<profiles>
<profile>
+ <id>hadoop24</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-yarn-timeline-history</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
<id>dist-tar</id>
<activation>
<activeByDefault>false</activeByDefault>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/pom.xml b/tez-plugins/pom.xml
new file mode 100644
index 0000000..393de62
--- /dev/null
+++ b/tez-plugins/pom.xml
@@ -0,0 +1,50 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez</artifactId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-plugins</artifactId>
+ <packaging>pom</packaging>
+
+ <profiles>
+ <profile>
+ <id>hadoop24</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <modules>
+ <module>tez-yarn-timeline-history</module>
+ </modules>
+ </profile>
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/pom.xml b/tez-plugins/tez-yarn-timeline-history/pom.xml
new file mode 100644
index 0000000..150a149
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/pom.xml
@@ -0,0 +1,71 @@
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-plugins</artifactId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-yarn-timeline-history</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
new file mode 100644
index 0000000..11114b6
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.HashSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ATSHistoryLoggingService extends HistoryLoggingService {
+
+ private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class);
+
+ private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+ new LinkedBlockingQueue<DAGHistoryEvent>();
+
+ private Thread eventHandlingThread;
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+ private int eventCounter = 0;
+ private int eventsProcessed = 0;
+ private final Object lock = new Object();
+
+ @VisibleForTesting
+ TimelineClient timelineClient;
+
+ private HashSet<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
+ private long maxTimeToWaitOnShutdown;
+
+ public ATSHistoryLoggingService() {
+ super(ATSHistoryLoggingService.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ LOG.info("Initializing ATSService");
+ timelineClient = TimelineClient.createTimelineClient();
+ timelineClient.init(conf);
+ maxTimeToWaitOnShutdown = conf.getLong(
+ TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
+ TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT);
+ }
+
+ @Override
+ public void serviceStart() {
+ LOG.info("Starting ATSService");
+ timelineClient.start();
+ eventHandlingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ DAGHistoryEvent event;
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+
+ // Log the size of the event-queue every so often.
+ if (eventCounter != 0 && eventCounter % 1000 == 0) {
+ LOG.info("Event queue stats"
+ + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
+ + ", eventQueueSize=" + eventQueue.size());
+ eventCounter = 0;
+ eventsProcessed = 0;
+ } else {
+ ++eventCounter;
+ }
+
+ try {
+ event = eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.info("EventQueue take interrupted. Returning");
+ return;
+ }
+
+ synchronized (lock) {
+ ++eventsProcessed;
+ try {
+ handleEvent(event);
+ } catch (Exception e) {
+ // TODO handle failures - treat as fatal or ignore?
+ LOG.warn("Error handling event", e);
+ }
+ }
+ }
+ }
+ }, "HistoryEventHandlingThread");
+ eventHandlingThread.start();
+ }
+
+ @Override
+ public void serviceStop() {
+ LOG.info("Stopping ATSService"
+ + ", eventQueueBacklog=" + eventQueue.size());
+ stopped.set(true);
+ if (eventHandlingThread != null) {
+ eventHandlingThread.interrupt();
+ }
+ synchronized (lock) {
+ if (!eventQueue.isEmpty()) {
+ LOG.warn("ATSService being stopped"
+ + ", eventQueueBacklog=" + eventQueue.size()
+ + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown);
+ long startTime = appContext.getClock().getTime();
+ if (maxTimeToWaitOnShutdown > 0) {
+ long endTime = startTime + maxTimeToWaitOnShutdown;
+ while (endTime >= appContext.getClock().getTime()) {
+ DAGHistoryEvent event = eventQueue.poll();
+ if (event == null) {
+ break;
+ }
+ try {
+ handleEvent(event);
+ } catch (Exception e) {
+ LOG.warn("Error handling event", e);
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (!eventQueue.isEmpty()) {
+ LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
+ + ", eventQueueBacklog=" + eventQueue.size());
+ }
+ timelineClient.stop();
+ }
+
+ public void handle(DAGHistoryEvent event) {
+ eventQueue.add(event);
+ }
+
+ private void handleEvent(DAGHistoryEvent event) {
+ HistoryEventType eventType = event.getHistoryEvent().getEventType();
+
+ TezDAGID dagId = event.getDagID();
+
+ if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+ DAGSubmittedEvent dagSubmittedEvent =
+ (DAGSubmittedEvent) event.getHistoryEvent();
+ String dagName = dagSubmittedEvent.getDAGName();
+ if (dagName != null
+ && dagName.startsWith(
+ TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+ // Skip recording pre-warm DAG events
+ skippedDAGs.add(dagId);
+ return;
+ }
+ }
+ if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+ // Remove from set to keep size small
+ // No more events should be seen after this point.
+ if (skippedDAGs.remove(dagId)) {
+ return;
+ }
+ }
+
+ if (dagId != null && skippedDAGs.contains(dagId)) {
+ // Skip pre-warm DAGs
+ return;
+ }
+
+ try {
+ TimelinePutResponse response =
+ timelineClient.putEntities(
+ HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()));
+ if (response != null
+ && !response.getErrors().isEmpty()) {
+ TimelinePutError err = response.getErrors().get(0);
+ if (err.getErrorCode() != 0) {
+ LOG.warn("Could not post history event to ATS, eventType="
+ + eventType
+ + ", atsPutError=" + err.getErrorCode());
+ }
+ }
+ // Do nothing additional, ATS client library should handle throttling
+ // or auto-disable as needed
+ } catch (Exception e) {
+ LOG.warn("Could not handle history event, eventType="
+ + eventType, e);
+ }
+ }
+
+}
[3/3] git commit: TEZ-1066. Generate events to integrate with YARN
timeline server. (hitesh)
Posted by hi...@apache.org.
TEZ-1066. Generate events to integrate with YARN timeline server. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bc657961
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bc657961
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bc657961
Branch: refs/heads/master
Commit: bc65796146fca5d7d6eb49e85e20fda5bade0b57
Parents: 6e07fc7
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed May 21 14:54:08 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed May 21 14:54:08 2014 -0700
----------------------------------------------------------------------
BUILDING.txt | 14 +-
pom.xml | 8 +-
.../apache/tez/dag/api/TezConfiguration.java | 19 +-
.../java/org/apache/tez/common/TezUtils.java | 18 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 8 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 9 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +
.../dag/app/launcher/ContainerLauncherImpl.java | 2 +-
.../apache/tez/dag/history/HistoryEvent.java | 6 +-
.../tez/dag/history/HistoryEventHandler.java | 32 +-
.../apache/tez/dag/history/ats/ATSService.java | 124 ----
.../apache/tez/dag/history/ats/EntityTypes.java | 28 -
.../tez/dag/history/events/AMLaunchedEvent.java | 63 +-
.../tez/dag/history/events/AMStartedEvent.java | 57 +-
.../history/events/ContainerLaunchedEvent.java | 52 +-
.../history/events/ContainerStoppedEvent.java | 60 +-
.../history/events/DAGCommitStartedEvent.java | 8 -
.../dag/history/events/DAGFinishedEvent.java | 58 +-
.../dag/history/events/DAGInitializedEvent.java | 33 +-
.../tez/dag/history/events/DAGStartedEvent.java | 53 +-
.../dag/history/events/DAGSubmittedEvent.java | 81 +--
.../events/TaskAttemptFinishedEvent.java | 51 +-
.../history/events/TaskAttemptStartedEvent.java | 66 +-
.../dag/history/events/TaskFinishedEvent.java | 51 +-
.../dag/history/events/TaskStartedEvent.java | 47 +-
.../events/VertexCommitStartedEvent.java | 8 -
.../VertexDataMovementEventsGeneratedEvent.java | 20 +-
.../dag/history/events/VertexFinishedEvent.java | 49 +-
.../events/VertexGroupCommitFinishedEvent.java | 8 -
.../events/VertexGroupCommitStartedEvent.java | 8 -
.../history/events/VertexInitializedEvent.java | 58 +-
.../events/VertexParallelismUpdatedEvent.java | 22 +-
.../dag/history/events/VertexStartedEvent.java | 46 +-
.../tez/dag/history/logging/EntityTypes.java | 28 +
.../history/logging/HistoryLoggingService.java | 43 ++
.../impl/HistoryEventJsonConversion.java | 633 +++++++++++++++++++
.../impl/SimpleHistoryLoggingService.java | 169 +++++
.../tez/dag/history/utils/ATSConstants.java | 1 +
.../apache/tez/dag/history/utils/DAGUtils.java | 251 ++++++++
.../TestHistoryEventsProtoConversion.java | 17 +-
.../impl/TestHistoryEventJsonConversion.java | 179 ++++++
tez-dist/pom.xml | 13 +
tez-plugins/pom.xml | 50 ++
tez-plugins/tez-yarn-timeline-history/pom.xml | 71 +++
.../logging/ats/ATSHistoryLoggingService.java | 209 ++++++
.../ats/HistoryEventTimelineConversion.java | 462 ++++++++++++++
.../ats/TestATSHistoryLoggingService.java | 108 ++++
.../ats/TestHistoryEventTimelineConversion.java | 179 ++++++
.../java/org/apache/tez/test/TestTezJobs.java | 57 ++
49 files changed, 2736 insertions(+), 903 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/BUILDING.txt
----------------------------------------------------------------------
diff --git a/BUILDING.txt b/BUILDING.txt
index 5245101..4f2f44e 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -20,6 +20,7 @@ Maven main modules:
- tez-mapreduce ...............(Tez mapreduce)
- tez-dag .....................(Tez dag)
- tez-mapreduce-examples ......(Tez mapreduce examples)
+ - tez-plugins .................(Tez plugins)
- tez-tests ...................(Tez tests)
- tez-dist ....................(Tez dist)
@@ -37,7 +38,7 @@ Maven build goals:
* Run clover : mvn test -Pclover [-Dclover.license=${user.home}/clover.license]
* Run Rat : mvn apache-rat:check
* Build javadocs : mvn javadoc:javadoc
- * Build distribution : mvn package[-Dtar][-Dhadoop.version=2.2.0]
+ * Build distribution : mvn package[-Dtar][-Dhadoop.version=2.4.0]
* Visualize state machines : mvn compile -Pvisualize -DskipTests=true
Build options:
@@ -58,11 +59,18 @@ Tests options:
----------------------------------------------------------------------------------
Building against a specific version of hadoop:
-Tez runs on top of Apache Hadoop YARN and requires hadoop version 2.2.0 or higher
-For example to build tez against hadoop 3.0.0-SNAPSHOT
+Tez runs on top of Apache Hadoop YARN and requires hadoop version 2.2.0 or higher.
+
+By default, it can be compiled against hadoop versions 2.4.0 and higher by just
+specifying the hadoop.version. For example, to build tez against hadoop 3.0.0-SNAPSHOT
$ mvn package -Dtar -Dhadoop.version=3.0.0-SNAPSHOT
+However, to compile against hadoop versions lower than 2.4.0, the hadoop24 profile needs
+to be disabled
+
+ $ mvn package -Dtar -Dhadoop.version=2.2.0 -P\!hadoop24
+
To skip Tests and java docs
$ mvn package -Dtar -Dhadoop.version=3.0.0-SNAPSHOT -DskipTests -Dmaven.javadoc.skip=true
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d1d8b6d..6b0b591 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
<properties>
<maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
<clover.license>${user.home}/clover.license</clover.license>
- <hadoop.version>2.2.0</hadoop.version>
+ <hadoop.version>2.4.0</hadoop.version>
<jetty.version>7.6.10.v20130312</jetty.version>
<distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
<distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
@@ -140,6 +140,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-plugins</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
@@ -326,6 +331,7 @@
<module>tez-mapreduce-examples</module>
<module>tez-tests</module>
<module>tez-dag</module>
+ <module>tez-plugins</module>
<module>tez-dist</module>
<module>docs</module>
</modules>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index ba735fa..36fdd59 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -365,9 +365,22 @@ public class TezConfiguration extends Configuration {
@Private
public static final String TEZ_PREWARM_DAG_NAME_PREFIX = "TezPreWarmDAG";
- public static final String YARN_ATS_ENABLED =
- TEZ_PREFIX + "yarn.ats.enabled";
- public static final boolean YARN_ATS_ENABLED_DEFAULT = false;
+ public static final String TEZ_HISTORY_LOGGING_SERVICE_CLASS =
+ TEZ_PREFIX + "history.logging.service.class";
+
+ public static final String TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT =
+ "org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService";
+
+ public static final String TEZ_SIMPLE_HISTORY_LOGGING_DIR =
+ TEZ_PREFIX + "simple.history.logging.dir";
+ public static final String TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS =
+ TEZ_PREFIX + "simple.history.max.errors";
+ public static final int TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS_DEFAULT = 10;
+
+ public static final String YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS =
+ TEZ_PREFIX + "yarn.ats.event.flush.timeout.millis";
+ public static final long YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT =
+ 3000l;
public static final String DAG_RECOVERY_ENABLED =
TEZ_PREFIX + "dag.recovery.enabled";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index b900527..3e3f5eb 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -28,6 +28,7 @@ import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.DataFormatException;
@@ -41,6 +42,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.log4j.Appender;
import org.apache.log4j.Logger;
import org.apache.tez.dag.api.TezConfiguration;
@@ -54,6 +57,7 @@ import com.google.protobuf.ByteString;
public class TezUtils {
private static final Log LOG = LogFactory.getLog(TezUtils.class);
+ private static final Random RANDOM = new Random();
public static void addUserSpecifiedTezConfiguration(Configuration conf) throws IOException {
FileInputStream confPBBinaryStream = null;
@@ -329,4 +333,18 @@ public class TezUtils {
}
return bytes;
}
+
+ public static String getContainerLogDir() {
+ String logDirsStr = System.getenv(Environment.LOG_DIRS.name());
+ if (logDirsStr == null || logDirsStr.isEmpty()) {
+ return null;
+ }
+ String[] logDirs = StringUtils.split(logDirsStr, ',');
+ if (logDirs.length == 0) {
+ return null;
+ }
+ int logIndex = RANDOM.nextInt(logDirs.length);
+ return logDirs[logIndex];
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e16064a..aed9aa7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -416,7 +416,7 @@ public class DAGAppMaster extends AbstractService {
super.serviceInit(conf);
AMLaunchedEvent launchedEvent = new AMLaunchedEvent(appAttemptID,
- startTime, appSubmitTime);
+ startTime, appSubmitTime, appMasterUgi.getShortUserName());
historyEventHandler.handle(
new DAGHistoryEvent(launchedEvent));
@@ -1553,7 +1553,8 @@ public class DAGAppMaster extends AbstractService {
DefaultMetricsSystem.initialize("DAGAppMaster");
this.appsStartTime = clock.getTime();
- AMStartedEvent startEvent = new AMStartedEvent(appAttemptID, appsStartTime);
+ AMStartedEvent startEvent = new AMStartedEvent(appAttemptID,
+ appsStartTime, appMasterUgi.getShortUserName());
historyEventHandler.handle(
new DAGHistoryEvent(startEvent));
@@ -1891,7 +1892,8 @@ public class DAGAppMaster extends AbstractService {
// Job name is the same as the app name until we support multiple dags
// for an app later
DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
- submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources);
+ submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
+ newDAG.getUserName());
try {
historyEventHandler.handleCriticalEvent(
new DAGHistoryEvent(newDAG.getID(), submittedEvent));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index c92d51b..71d92b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -895,21 +895,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
void logJobHistoryFinishedEvent() throws IOException {
this.setFinishTime();
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
- finishTime, DAGState.SUCCEEDED, "", getAllCounters());
+ finishTime, DAGState.SUCCEEDED, "", getAllCounters(),
+ this.userName, this.dagName);
this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(dagId, finishEvt));
}
void logJobHistoryInitedEvent() {
DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId,
- this.initTime);
+ this.initTime, this.userName, this.dagName);
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(dagId, initEvt));
}
void logJobHistoryStartedEvent() {
DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId,
- this.startTime);
+ this.startTime, this.userName, this.dagName);
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(dagId, startEvt));
}
@@ -918,7 +919,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
clock.getTime(), state,
StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
- getAllCounters());
+ getAllCounters(), this.userName, this.dagName);
this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(dagId, finishEvt));
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9701be4..17257ea 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -56,6 +57,7 @@ import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 437d057..8ea7c8c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -174,7 +174,7 @@ public class ContainerLauncherImpl extends AbstractService implements
ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
containerID, clock.getTime(), context.getApplicationAttemptId());
context.getHistoryHandler().handle(new DAGHistoryEvent(
- context.getCurrentDAGID(), lEvt));
+ null, lEvt));
this.state = ContainerState.RUNNING;
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
index 3f756c0..1ca0d5f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
@@ -22,15 +22,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
public interface HistoryEvent {
public HistoryEventType getEventType();
- public JSONObject convertToATSJSON() throws JSONException;
-
public boolean isRecoveryEvent();
public boolean isHistoryEvent();
@@ -38,4 +33,5 @@ public interface HistoryEvent {
public void toProtoStream(OutputStream outputStream) throws IOException;
public void fromProtoStream(InputStream inputStream) throws IOException;
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 4eb094f..82e063a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -18,28 +18,27 @@
package org.apache.tez.dag.history;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.history.ats.ATSService;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.records.TezDAGID;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class HistoryEventHandler extends CompositeService {
private static Log LOG = LogFactory.getLog(HistoryEventHandler.class);
private final AppContext context;
- private boolean yarnATSEnabled;
- private ATSService atsService;
private RecoveryService recoveryService;
private boolean recoveryEnabled;
+ private HistoryLoggingService historyLoggingService;
public HistoryEventHandler(AppContext context) {
super(HistoryEventHandler.class.getName());
@@ -49,14 +48,19 @@ public class HistoryEventHandler extends CompositeService {
@Override
public void serviceInit(Configuration conf) throws Exception {
LOG.info("Initializing HistoryEventHandler");
- this.yarnATSEnabled = context.getAMConf().getBoolean(TezConfiguration.YARN_ATS_ENABLED,
- TezConfiguration.YARN_ATS_ENABLED_DEFAULT);
+
this.recoveryEnabled = context.getAMConf().getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
- if (yarnATSEnabled) {
- atsService = new ATSService();
- addService(atsService);
- }
+
+ String historyServiceClassName = context.getAMConf().get(
+ TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
+
+ historyLoggingService =
+ RuntimeUtils.createClazzInstance(historyServiceClassName);
+ historyLoggingService.setAppContext(context);
+ addService(historyLoggingService);
+
if (recoveryEnabled) {
recoveryService = new RecoveryService(context);
addService(recoveryService);
@@ -97,8 +101,8 @@ public class HistoryEventHandler extends CompositeService {
if (recoveryEnabled && event.getHistoryEvent().isRecoveryEvent()) {
recoveryService.handle(event);
}
- if (yarnATSEnabled && event.getHistoryEvent().isHistoryEvent()) {
- atsService.handle(event);
+ if (event.getHistoryEvent().isHistoryEvent()) {
+ historyLoggingService.handle(event);
}
// TODO at some point we should look at removing this once
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
deleted file mode 100644
index 690e850..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.ats;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ATSService extends AbstractService {
-
- private static final Log LOG = LogFactory.getLog(ATSService.class);
-
- private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
- new LinkedBlockingQueue<DAGHistoryEvent>();
-
- private final AtomicInteger historyCounter =
- new AtomicInteger(0);
- private String outputFilePrefix;
- private Thread eventHandlingThread;
- private AtomicBoolean stopped = new AtomicBoolean(false);
- private int eventCounter = 0;
- private int eventsProcessed = 0;
- private final Object lock = new Object();
-
- public ATSService() {
- super(ATSService.class.getName());
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- LOG.info("Initializing ATSService");
-
- }
-
- @Override
- public void serviceStart() {
- LOG.info("Starting ATSService");
- eventHandlingThread = new Thread(new Runnable() {
- @Override
- public void run() {
- DAGHistoryEvent event;
- while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
-
- // Log the size of the event-queue every so often.
- if (eventCounter != 0 && eventCounter % 1000 == 0) {
- LOG.info("Event queue stats"
- + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
- + ", eventQueueSize=" + eventQueue.size());
- eventCounter = 0;
- eventsProcessed = 0;
- } else {
- ++eventCounter;
- }
-
- try {
- event = eventQueue.take();
- } catch (InterruptedException e) {
- LOG.info("EventQueue take interrupted. Returning");
- return;
- }
-
- synchronized (lock) {
- ++eventsProcessed;
- try {
- handleEvent(event);
- } catch (Exception e) {
- // TODO handle failures - treat as fatal or ignore?
- LOG.warn("Error handling event", e);
- }
- }
- }
- }
- }, "HistoryEventHandlingThread");
- eventHandlingThread.start();
- }
-
- @Override
- public void serviceStop() {
- LOG.info("Stopping ATSService");
- stopped.set(true);
- if (eventHandlingThread != null) {
- eventHandlingThread.interrupt();
- }
- }
-
- public void handle(DAGHistoryEvent event) {
- eventQueue.add(event);
- }
-
- private void handleEvent(DAGHistoryEvent event) {
- HistoryEventType eventType = event.getHistoryEvent().getEventType();
- try {
- // TODO integrate with ATS
- } catch (Exception e) {
- LOG.warn("Could not handle history event, eventType="
- + eventType, e);
- // TODO handle error as a fatal event or ignore/skip?
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
deleted file mode 100644
index a7f0208..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.ats;
-
-public enum EntityTypes {
- TEZ_APPLICATION_ATTEMPT,
- TEZ_CONTAINER_ID,
- TEZ_DAG_ID,
- TEZ_VERTEX_ID,
- TEZ_TASK_ID,
- TEZ_TASK_ATTEMPT_ID,
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
index 54bc658..049d340 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -18,35 +18,32 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.recovery.records.RecoveryProtos.AMLaunchedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class AMLaunchedEvent implements HistoryEvent {
private ApplicationAttemptId applicationAttemptId;
private long launchTime;
private long appSubmitTime;
+ private String user;
public AMLaunchedEvent() {
}
public AMLaunchedEvent(ApplicationAttemptId appAttemptId,
- long launchTime, long appSubmitTime) {
+ long launchTime, long appSubmitTime, String user) {
this.applicationAttemptId = appAttemptId;
this.launchTime = launchTime;
this.appSubmitTime = appSubmitTime;
+ this.user = user;
}
@Override
@@ -55,48 +52,6 @@ public class AMLaunchedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- "tez_" + applicationAttemptId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
- // Related Entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject appEntity = new JSONObject();
- appEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.getApplicationId().toString());
- appEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ID);
- JSONObject appAttemptEntity = new JSONObject();
- appAttemptEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.toString());
- appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ATTEMPT_ID);
- relatedEntities.put(appEntity);
- relatedEntities.put(appAttemptEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject initEvent = new JSONObject();
- initEvent.put(ATSConstants.TIMESTAMP, launchTime);
- initEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.AM_LAUNCHED.name());
- events.put(initEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info to tag with Tez AM
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.APP_SUBMIT_TIME, appSubmitTime);
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return false;
}
@@ -151,4 +106,8 @@ public class AMLaunchedEvent implements HistoryEvent {
return appSubmitTime;
}
+ public String getUser() {
+ return user;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index e66141b..aa7b3f2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -18,33 +18,30 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.recovery.records.RecoveryProtos.AMStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class AMStartedEvent implements HistoryEvent {
private ApplicationAttemptId applicationAttemptId;
private long startTime;
+ private String user;
public AMStartedEvent() {
}
public AMStartedEvent(ApplicationAttemptId appAttemptId,
- long startTime) {
+ long startTime, String user) {
this.applicationAttemptId = appAttemptId;
this.startTime = startTime;
+ this.user = user;
}
@Override
@@ -53,43 +50,6 @@ public class AMStartedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- "tez_" + applicationAttemptId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
- // Related Entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject appEntity = new JSONObject();
- appEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.getApplicationId().toString());
- appEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ID);
- JSONObject appAttemptEntity = new JSONObject();
- appAttemptEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.toString());
- appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ATTEMPT_ID);
- relatedEntities.put(appEntity);
- relatedEntities.put(appAttemptEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject startEvent = new JSONObject();
- startEvent.put(ATSConstants.TIMESTAMP, startTime);
- startEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.AM_STARTED.name());
- events.put(startEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -137,5 +97,8 @@ public class AMStartedEvent implements HistoryEvent {
return startTime;
}
+ public String getUser() {
+ return user;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index 471ddd1..c37b7f1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -18,21 +18,16 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.recovery.records.RecoveryProtos.ContainerLaunchedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class ContainerLaunchedEvent implements HistoryEvent {
@@ -57,45 +52,6 @@ public class ContainerLaunchedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- "tez_" + containerId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_CONTAINER_ID.name());
-
- JSONArray relatedEntities = new JSONArray();
- JSONObject appAttemptEntity = new JSONObject();
- appAttemptEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.toString());
- appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
- JSONObject containerEntity = new JSONObject();
- containerEntity.put(ATSConstants.ENTITY, containerId.toString());
- containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
-
- relatedEntities.put(appAttemptEntity);
- relatedEntities.put(containerEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject launchEvent = new JSONObject();
- launchEvent.put(ATSConstants.TIMESTAMP, launchTime);
- launchEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.CONTAINER_LAUNCHED.name());
- events.put(launchEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // TODO add other container info here? or assume AHS will have this?
- // TODO container logs?
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
index a544354..549720a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
@@ -18,23 +18,16 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.recovery.records.RecoveryProtos.ContainerStoppedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class ContainerStoppedEvent implements HistoryEvent {
@@ -62,51 +55,6 @@ public class ContainerStoppedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // structure is identical to ContainerLaunchedEvent
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- "tez_" + containerId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_CONTAINER_ID.name());
-
- JSONArray relatedEntities = new JSONArray();
- JSONObject appAttemptEntity = new JSONObject();
- appAttemptEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.toString());
- appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
- JSONObject containerEntity = new JSONObject();
- containerEntity.put(ATSConstants.ENTITY, containerId.toString());
- containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
-
- relatedEntities.put(appAttemptEntity);
- relatedEntities.put(containerEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject stopEvent = new JSONObject();
- stopEvent.put(ATSConstants.TIMESTAMP, stopTime);
- stopEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.CONTAINER_STOPPED.name());
- events.put(stopEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // TODO add other container info here? or assume AHS will have this?
- // TODO container logs?
-
- // Other info
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.EXIT_STATUS, exitStatus);
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index 627751a..95e8630 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGCommitStartedProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.utils.ProtoUtils;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
@@ -51,12 +49,6 @@ public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // TODO
- return null;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 14381b3..1cfc36e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -22,47 +22,50 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGFinishedProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
+ private static final Log LOG = LogFactory.getLog(DAGFinishedEvent.class);
+
private TezDAGID dagID;
private long startTime;
private long finishTime;
private DAGState state;
private String diagnostics;
private TezCounters tezCounters;
+ private String user;
+ private String dagName;
public DAGFinishedEvent() {
}
public DAGFinishedEvent(TezDAGID dagId, long startTime,
long finishTime, DAGState state,
- String diagnostics, TezCounters counters) {
+ String diagnostics, TezCounters counters,
+ String user, String dagName) {
this.dagID = dagId;
this.startTime = startTime;
this.finishTime = finishTime;
this.state = state;
this.diagnostics = diagnostics;
this.tezCounters = counters;
+ this.user = user;
+ this.dagName = dagName;
}
@Override
@@ -71,40 +74,6 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- dagID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_DAG_ID.name());
-
- // Related Entities not needed as should have been done in
- // dag submission event
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject finishEvent = new JSONObject();
- finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
- finishEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.DAG_FINISHED.name());
- events.put(finishEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.START_TIME, startTime);
- otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
- otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
- otherInfo.put(ATSConstants.STATUS, state.name());
- otherInfo.put(ATSConstants.DIAGNOSTICS, diagnostics);
- otherInfo.put(ATSConstants.COUNTERS,
- DAGUtils.convertCountersToJSON(this.tezCounters));
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -215,4 +184,11 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
return tezCounters;
}
+ public String getUser() {
+ return user;
+ }
+
+ public String getDagName() {
+ return dagName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 9b001b6..b3a0165 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -18,29 +18,31 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-// TODO fix class
public class DAGInitializedEvent implements HistoryEvent {
private TezDAGID dagID;
private long initTime;
+ private String user;
+ private String dagName;
public DAGInitializedEvent() {
}
- public DAGInitializedEvent(TezDAGID dagID, long initTime) {
+ public DAGInitializedEvent(TezDAGID dagID, long initTime,
+ String user, String dagName) {
this.dagID = dagID;
this.initTime = initTime;
+ this.user = user;
+ this.dagName = dagName;
}
@Override
@@ -49,12 +51,6 @@ public class DAGInitializedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // TODO
- throw new UnsupportedOperationException();
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -101,4 +97,13 @@ public class DAGInitializedEvent implements HistoryEvent {
public TezDAGID getDagID() {
return dagID;
}
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getDagName() {
+ return dagName;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index a1bcdf2..4e7b880 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -18,31 +18,31 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class DAGStartedEvent implements HistoryEvent {
private TezDAGID dagID;
private long startTime;
+ private String user;
+ private String dagName;
public DAGStartedEvent() {
}
- public DAGStartedEvent(TezDAGID dagID, long startTime) {
+ public DAGStartedEvent(TezDAGID dagID, long startTime,
+ String user, String dagName) {
this.dagID = dagID;
this.startTime = startTime;
+ this.user = user;
+ this.dagName = dagName;
}
@Override
@@ -51,30 +51,6 @@ public class DAGStartedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- dagID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_DAG_ID.name());
-
- // Related Entities not needed as should have been done in
- // dag submission event
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject startEvent = new JSONObject();
- startEvent.put(ATSConstants.TIMESTAMP, startTime);
- startEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.DAG_STARTED.name());
- events.put(startEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -120,4 +96,13 @@ public class DAGStartedEvent implements HistoryEvent {
public TezDAGID getDagID() {
return dagID;
}
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getDagName() {
+ return dagName;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 18f2205..f04afa4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -23,31 +23,32 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGSubmittedProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.utils.ProtoUtils;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
+
public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
+ private static final Log LOG = LogFactory.getLog(DAGSubmittedEvent.class);
+
private TezDAGID dagID;
private long submitTime;
private DAGProtos.DAGPlan dagPlan;
private ApplicationAttemptId applicationAttemptId;
+ private String user;
private Map<String, LocalResource> cumulativeAdditionalLocalResources;
public DAGSubmittedEvent() {
@@ -55,12 +56,14 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
- Map<String, LocalResource> cumulativeAdditionalLocalResources) {
+ Map<String, LocalResource> cumulativeAdditionalLocalResources,
+ String user) {
this.dagID = dagID;
this.submitTime = submitTime;
this.dagPlan = dagPlan;
this.applicationAttemptId = applicationAttemptId;
this.cumulativeAdditionalLocalResources = cumulativeAdditionalLocalResources;
+ this.user = user;
}
@Override
@@ -69,62 +72,6 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- dagID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_DAG_ID.name());
-
- // Related Entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject tezAppEntity = new JSONObject();
- tezAppEntity.put(ATSConstants.ENTITY,
- "tez_" + applicationAttemptId.toString());
- tezAppEntity.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
- JSONObject appEntity = new JSONObject();
- appEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.getApplicationId().toString());
- appEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ID);
- JSONObject appAttemptEntity = new JSONObject();
- appAttemptEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.toString());
- appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ATTEMPT_ID);
-
- relatedEntities.put(tezAppEntity);
- relatedEntities.put(appEntity);
- relatedEntities.put(appAttemptEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // filters
- JSONObject primaryFilters = new JSONObject();
- primaryFilters.put(ATSConstants.DAG_NAME,
- dagPlan.getName());
- jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject submitEvent = new JSONObject();
- submitEvent.put(ATSConstants.TIMESTAMP, submitTime);
- submitEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.DAG_SUBMITTED.name());
- events.put(submitEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info such as dag plan
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.DAG_PLAN,
- DAGUtils.generateSimpleJSONPlan(dagPlan));
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -220,4 +167,12 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
return submitTime;
}
+ public DAGPlan getDagPlan() {
+ return dagPlan;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index ecb6818..aa85efb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -18,26 +18,24 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class TaskAttemptFinishedEvent implements HistoryEvent {
+ private static final Log LOG = LogFactory.getLog(TaskAttemptFinishedEvent.class);
+
private TezTaskAttemptID taskAttemptId;
private String vertexName;
private long startTime;
@@ -71,35 +69,6 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, taskAttemptId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject finishEvent = new JSONObject();
- finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
- finishEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.TASK_ATTEMPT_FINISHED.name());
- events.put(finishEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.START_TIME, startTime);
- otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
- otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
- otherInfo.put(ATSConstants.STATUS, state.name());
- otherInfo.put(ATSConstants.DIAGNOSTICS, diagnostics);
- otherInfo.put(ATSConstants.COUNTERS,
- DAGUtils.convertCountersToJSON(this.tezCounters));
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -183,4 +152,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
return state;
}
+ public long getStartTime() {
+ return startTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index ba91db8..76109ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -18,22 +18,17 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class TaskAttemptStartedEvent implements HistoryEvent {
@@ -67,50 +62,6 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, taskAttemptId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
-
- // Related entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject nodeEntity = new JSONObject();
- nodeEntity.put(ATSConstants.ENTITY, nodeId.toString());
- nodeEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.NODE_ID);
-
- JSONObject containerEntity = new JSONObject();
- containerEntity.put(ATSConstants.ENTITY, containerId.toString());
- containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
-
- JSONObject taskEntity = new JSONObject();
- taskEntity.put(ATSConstants.ENTITY, taskAttemptId.getTaskID().toString());
- taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
-
- relatedEntities.put(nodeEntity);
- relatedEntities.put(containerEntity);
- relatedEntities.put(taskEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject startEvent = new JSONObject();
- startEvent.put(ATSConstants.TIMESTAMP, startTime);
- startEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.TASK_ATTEMPT_STARTED.name());
- events.put(startEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, inProgressLogsUrl);
- otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, completedLogsUrl);
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -173,4 +124,13 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
public NodeId getNodeId() {
return nodeId;
}
+
+ public String getInProgressLogsUrl() {
+ return inProgressLogsUrl;
+ }
+
+ public String getCompletedLogsUrl() {
+ return completedLogsUrl;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index 713ecd8..0940987 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -18,27 +18,25 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskFinishedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class TaskFinishedEvent implements HistoryEvent {
+ private static final Log LOG = LogFactory.getLog(TaskFinishedEvent.class);
+
private TezTaskID taskID;
private String vertexName;
private long startTime;
@@ -68,34 +66,6 @@ public class TaskFinishedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, taskID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject finishEvent = new JSONObject();
- finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
- finishEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.TASK_FINISHED.name());
- events.put(finishEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.START_TIME, startTime);
- otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
- otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
- otherInfo.put(ATSConstants.STATUS, state.name());
- otherInfo.put(ATSConstants.COUNTERS,
- DAGUtils.convertCountersToJSON(this.tezCounters));
-
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -178,4 +148,9 @@ public class TaskFinishedEvent implements HistoryEvent {
public TezTaskAttemptID getSuccessfulAttemptID() {
return successfulAttemptID;
}
+
+ public long getStartTime() {
+ return startTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
index c2a380b..e93f3b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
@@ -18,19 +18,14 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class TaskStartedEvent implements HistoryEvent {
@@ -56,40 +51,6 @@ public class TaskStartedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, taskID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
-
- // Related entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject vertexEntity = new JSONObject();
- vertexEntity.put(ATSConstants.ENTITY, taskID.getVertexID().toString());
- vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
- relatedEntities.put(vertexEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject startEvent = new JSONObject();
- startEvent.put(ATSConstants.TIMESTAMP, startTime);
- startEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.TASK_STARTED.name());
- events.put(startEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info
- // TODO fix schedule/launch time to be events
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.START_TIME, startTime);
- otherInfo.put(ATSConstants.SCHEDULED_TIME, scheduledTime);
-
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index 387bff1..a7e74e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexCommitStartedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
import com.google.protobuf.ByteString;
@@ -53,12 +51,6 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // TODO
- return null;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
index 035c9ca..03a3cdf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
@@ -18,7 +18,12 @@
package org.apache.tez.dag.history.events;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.List;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.ProtoConverters;
@@ -36,14 +41,8 @@ import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TezEvent;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.EnumSet;
-import java.util.List;
+import com.google.common.collect.Lists;
public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
@@ -79,11 +78,6 @@ public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- return null;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 8321a38..6c18d9f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.app.dag.VertexState;
@@ -29,20 +31,16 @@ import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishStateProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
+ private static final Log LOG = LogFactory.getLog(VertexFinishedEvent.class);
+
private TezVertexID vertexID;
private String vertexName;
private long initRequestedTime;
@@ -83,33 +81,6 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, vertexID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject finishEvent = new JSONObject();
- finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
- finishEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.VERTEX_FINISHED.name());
- events.put(finishEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
- otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
- otherInfo.put(ATSConstants.STATUS, state.name());
- otherInfo.put(ATSConstants.DIAGNOSTICS, diagnostics);
- otherInfo.put(ATSConstants.COUNTERS,
- DAGUtils.convertCountersToJSON(this.tezCounters));
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -198,6 +169,18 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
return tezCounters;
}
+ public VertexStats getVertexStats() {
+ return vertexStats;
+ }
+
+ public String getVertexName() {
+ return vertexName;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
@Override
public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
VertexFinishStateProto finishStateProto =
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
index 99a5288..4c042e1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitFinishedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEvent {
@@ -54,12 +52,6 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // TODO
- return null;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
index 04d6276..d7df13a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitStartedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent {
@@ -54,12 +52,6 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // TODO
- return null;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index e9e4a8c..2ae1b3a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -18,6 +18,12 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.dag.api.DagTypeConverters;
@@ -26,20 +32,9 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexInitializedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
public class VertexInitializedEvent implements HistoryEvent {
@@ -75,42 +70,6 @@ public class VertexInitializedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, vertexID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
- // Related entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject vertexEntity = new JSONObject();
- vertexEntity.put(ATSConstants.ENTITY, vertexID.getDAGId().toString());
- vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
- relatedEntities.put(vertexEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject initEvent = new JSONObject();
- initEvent.put(ATSConstants.TIMESTAMP, initedTime);
- initEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.VERTEX_INITIALIZED.name());
- events.put(initEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info
- // TODO fix requested times to be events
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.VERTEX_NAME, vertexName);
- otherInfo.put(ATSConstants.INIT_REQUESTED_TIME, initRequestedTime);
- otherInfo.put(ATSConstants.INIT_TIME, initedTime);
- otherInfo.put(ATSConstants.NUM_TASKS, numTasks);
- otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME, processorName);
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -214,4 +173,9 @@ public class VertexInitializedEvent implements HistoryEvent {
public String getProcessorName() {
return processorName;
}
+
+ public String getVertexName() {
+ return vertexName;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
index 43cc787..39748ec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
@@ -18,6 +18,13 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.VertexLocationHint;
@@ -26,15 +33,6 @@ import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexParallelismUpdatedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
public class VertexParallelismUpdatedEvent implements HistoryEvent {
@@ -61,12 +59,6 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- throw new UnsupportedOperationException("VertexParallelismUpdatedEvent"
- + " not a History event");
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}