You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/05/22 00:13:13 UTC

[1/3] TEZ-1066. Generate events to integrate with YARN timeline server. (hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 6e07fc7e7 -> bc6579614


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
new file mode 100644
index 0000000..8ab9c91
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.history.utils.DAGUtils;
+
+public class HistoryEventTimelineConversion {
+
+  public static TimelineEntity convertToTimelineEntity(HistoryEvent historyEvent) {
+    if (!historyEvent.isHistoryEvent()) {
+      throw new UnsupportedOperationException("Invalid Event, does not support history"
+          + ", eventType=" + historyEvent.getEventType());
+    }
+    TimelineEntity timelineEntity = null;
+    switch (historyEvent.getEventType()) {
+      case AM_LAUNCHED:
+        timelineEntity = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
+        break;
+      case AM_STARTED:
+        timelineEntity = convertAMStartedEvent((AMStartedEvent) historyEvent);
+        break;
+      case CONTAINER_LAUNCHED:
+        timelineEntity = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
+        break;
+      case CONTAINER_STOPPED:
+        timelineEntity = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
+        break;
+      case DAG_SUBMITTED:
+        timelineEntity = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
+        break;
+      case DAG_INITIALIZED:
+        timelineEntity = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
+        break;
+      case DAG_STARTED:
+        timelineEntity = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
+        break;
+      case DAG_FINISHED:
+        timelineEntity = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
+        break;
+      case VERTEX_INITIALIZED:
+        timelineEntity = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
+        break;
+      case VERTEX_STARTED:
+        timelineEntity = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
+        break;
+      case VERTEX_FINISHED:
+        timelineEntity = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
+      break;
+      case TASK_STARTED:
+        timelineEntity = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
+        break;
+      case TASK_FINISHED:
+        timelineEntity = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_STARTED:
+        timelineEntity = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_FINISHED:
+        timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
+        break;
+      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+      case VERTEX_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_FINISHED:
+      case VERTEX_PARALLELISM_UPDATED:
+      case DAG_COMMIT_STARTED:
+        throw new UnsupportedOperationException("Invalid Event, does not support history"
+            + ", eventType=" + historyEvent.getEventType());
+      default:
+        throw new UnsupportedOperationException("Unhandled Event"
+            + ", eventType=" + historyEvent.getEventType());
+    }
+    return timelineEntity;
+  }
+
+  private static TimelineEntity convertAMLaunchedEvent(AMLaunchedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getApplicationAttemptId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+
+    atsEntity.setStartTime(event.getLaunchTime());
+
+    TimelineEvent launchEvt = new TimelineEvent();
+    launchEvt.setEventType(HistoryEventType.AM_LAUNCHED.name());
+    launchEvt.setTimestamp(event.getLaunchTime());
+    atsEntity.addEvent(launchEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertAMStartedEvent(AMStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getApplicationAttemptId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.AM_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertContainerLaunchedEvent(ContainerLaunchedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getContainerId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID,
+        event.getContainerId().toString());
+
+    atsEntity.setStartTime(event.getLaunchTime());
+
+    TimelineEvent launchEvt = new TimelineEvent();
+    launchEvt.setEventType(HistoryEventType.CONTAINER_LAUNCHED.name());
+    launchEvt.setTimestamp(event.getLaunchTime());
+    atsEntity.addEvent(launchEvt);
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertContainerStoppedEvent(ContainerStoppedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId("tez_"
+        + event.getContainerId().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_CONTAINER_ID.name());
+
+    // In case, a container is stopped in a different attempt
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+
+    TimelineEvent stoppedEvt = new TimelineEvent();
+    stoppedEvt.setEventType(HistoryEventType.CONTAINER_STOPPED.name());
+    stoppedEvt.setTimestamp(event.getStoppedTime());
+    atsEntity.addEvent(stoppedEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.EXIT_STATUS, event.getExitStatus());
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getStoppedTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGFinishedEvent(DAGFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.DAG_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGInitializedEvent(DAGInitializedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.DAG_INITIALIZED.name());
+    finishEvt.setTimestamp(event.getInitTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+    atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGStartedEvent(DAGStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.DAG_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDagName());
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getDagID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_DAG_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(),
+        "tez_" + event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.APPLICATION_ATTEMPT_ID,
+        event.getApplicationAttemptId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.USER, event.getUser());
+
+    TimelineEvent submitEvt = new TimelineEvent();
+    submitEvt.setEventType(HistoryEventType.DAG_SUBMITTED.name());
+    submitEvt.setTimestamp(event.getSubmitTime());
+    atsEntity.addEvent(submitEvt);
+
+    atsEntity.setStartTime(event.getSubmitTime());
+
+    atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
+    atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName());
+
+    atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
+        DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+    atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID,
+        event.getApplicationAttemptId().getApplicationId().toString());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskAttemptID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.TASK_ATTEMPT_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getCounters()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskAttemptID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    atsEntity.setStartTime(event.getStartTime());
+
+    atsEntity.addRelatedEntity(ATSConstants.NODE_ID, event.getNodeId().toString());
+    atsEntity.addRelatedEntity(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskAttemptID().getTaskID().getVertexID().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_TASK_ID.name(),
+        event.getTaskAttemptID().getTaskID().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.TASK_ATTEMPT_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+    atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskFinishedEvent(TaskFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
+
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.TASK_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertTaskStartedEvent(TaskStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getTaskID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_TASK_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getTaskID().getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_VERTEX_ID.name(),
+        event.getTaskID().getVertexID().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.TASK_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+    atsEntity.addOtherInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexFinishedEvent(VertexFinishedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent finishEvt = new TimelineEvent();
+    finishEvt.setEventType(HistoryEventType.VERTEX_FINISHED.name());
+    finishEvt.setTimestamp(event.getFinishTime());
+    atsEntity.addEvent(finishEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
+    atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+
+    atsEntity.addOtherInfo(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToATSMap(event.getTezCounters()));
+    atsEntity.addOtherInfo(ATSConstants.STATS,
+        DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexInitializedEvent(VertexInitializedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addRelatedEntity(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent initEvt = new TimelineEvent();
+    initEvt.setEventType(HistoryEventType.VERTEX_INITIALIZED.name());
+    initEvt.setTimestamp(event.getInitedTime());
+    atsEntity.addEvent(initEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.VERTEX_NAME, event.getVertexName());
+    atsEntity.addOtherInfo(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
+    atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime());
+    atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
+    atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+
+    return atsEntity;
+  }
+
+  private static TimelineEntity convertVertexStartedEvent(VertexStartedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    atsEntity.addPrimaryFilter(EntityTypes.TEZ_DAG_ID.name(),
+        event.getVertexID().getDAGId().toString());
+
+    TimelineEvent startEvt = new TimelineEvent();
+    startEvt.setEventType(HistoryEventType.VERTEX_STARTED.name());
+    startEvt.setTimestamp(event.getStartTime());
+    atsEntity.addEvent(startEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
+    atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
+
+    return atsEntity;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
new file mode 100644
index 0000000..7bd3b91
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestATSHistoryLoggingService {
+
+  private ATSHistoryLoggingService atsHistoryLoggingService;
+  private AppContext appContext;
+  private Configuration conf;
+  private int atsInvokeCounter;
+  private SystemClock clock = new SystemClock();
+
+  @Before
+  public void setup() throws Exception {
+    appContext = mock(AppContext.class);
+    atsHistoryLoggingService = new ATSHistoryLoggingService();
+    atsHistoryLoggingService.setAppContext(appContext);
+    conf = new Configuration(false);
+    conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
+        1000l);
+    atsInvokeCounter = 0;
+    atsHistoryLoggingService.init(conf);
+    atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
+    atsHistoryLoggingService.start();
+    when(appContext.getClock()).thenReturn(clock);
+    when(atsHistoryLoggingService.timelineClient.putEntities(any(TimelineEntity.class))).thenAnswer(
+        new Answer<Object>() {
+          @Override
+          public Object answer(InvocationOnMock invocation) throws Throwable {
+            ++atsInvokeCounter;
+            try {
+              Thread.sleep(500);
+            } catch (InterruptedException e) {
+              // do nothing
+            }
+            return null;
+          }
+        }
+    );
+  }
+
+  @After
+  public void teardown() {
+    atsHistoryLoggingService.stop();
+    atsHistoryLoggingService = null;
+  }
+
+  @Test(timeout=20000)
+  public void testATSHistoryLoggingServiceShutdown() {
+    TezDAGID tezDAGID = TezDAGID.getInstance(
+        ApplicationId.newInstance(100l, 1), 1);
+    DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
+        new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
+
+    for (int i = 0; i < 20; ++i) {
+      atsHistoryLoggingService.handle(historyEvent);
+    }
+
+    try {
+      Thread.sleep(2500l);
+    } catch (InterruptedException e) {
+      // Do nothing
+    }
+    atsHistoryLoggingService.stop();
+
+    Assert.assertTrue(atsInvokeCounter >= 4);
+    Assert.assertTrue(atsInvokeCounter < 10);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
new file mode 100644
index 0000000..c605585
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryEventTimelineConversion {
+
+  private ApplicationAttemptId applicationAttemptId;
+  private ApplicationId applicationId;
+  private String user = "user";
+  private Random random = new Random();
+  private TezDAGID tezDAGID;
+  private TezVertexID tezVertexID;
+  private TezTaskID tezTaskID;
+  private TezTaskAttemptID tezTaskAttemptID;
+  private DAGPlan dagPlan;
+  private ContainerId containerId;
+  private NodeId nodeId;
+
+  @Before
+  public void setup() {
+    applicationId = ApplicationId.newInstance(9999l, 1);
+    applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+    tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
+    tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
+    tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
+    tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
+    dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+    containerId = ContainerId.newInstance(applicationAttemptId, 111);
+    nodeId = NodeId.newInstance("node", 13435);
+  }
+
+  @Test
+  public void testHandlerExists() throws JSONException {
+    for (HistoryEventType eventType : HistoryEventType.values()) {
+      HistoryEvent event = null;
+      switch (eventType) {
+        case AM_LAUNCHED:
+          event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
+              user);
+          break;
+        case AM_STARTED:
+          event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
+          break;
+        case DAG_SUBMITTED:
+          event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
+              null, user);
+          break;
+        case DAG_INITIALIZED:
+          event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          break;
+        case DAG_STARTED:
+          event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          break;
+        case DAG_FINISHED:
+          event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
+              null, null, user, dagPlan.getName());
+          break;
+        case VERTEX_INITIALIZED:
+          event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+              random.nextInt(), "proc", null);
+          break;
+        case VERTEX_STARTED:
+          event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
+          break;
+        case VERTEX_PARALLELISM_UPDATED:
+          event = new VertexParallelismUpdatedEvent();
+          break;
+        case VERTEX_FINISHED:
+          event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+              random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
+              null, null, null);
+          break;
+        case TASK_STARTED:
+          event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
+          break;
+        case TASK_FINISHED:
+          event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
+              tezTaskAttemptID, TaskState.FAILED, null);
+          break;
+        case TASK_ATTEMPT_STARTED:
+          event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
+              nodeId, null, null);
+          break;
+        case TASK_ATTEMPT_FINISHED:
+          event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
+              random.nextInt(), TaskAttemptState.FAILED, null, null);
+          break;
+        case CONTAINER_LAUNCHED:
+          event = new ContainerLaunchedEvent(containerId, random.nextInt(),
+              applicationAttemptId);
+          break;
+        case CONTAINER_STOPPED:
+          event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
+          break;
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+          event = new VertexDataMovementEventsGeneratedEvent();
+          break;
+        case DAG_COMMIT_STARTED:
+          event = new DAGCommitStartedEvent();
+          break;
+        case VERTEX_COMMIT_STARTED:
+          event = new VertexCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          event = new VertexGroupCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          event = new VertexGroupCommitFinishedEvent();
+          break;
+        default:
+          Assert.fail("Unhandled event type " + eventType);
+      }
+      if (event == null || !event.isHistoryEvent()) {
+        continue;
+      }
+      HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 171458f..ffea08c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezSession;
@@ -60,6 +61,7 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
 import org.apache.tez.mapreduce.examples.ExampleDriver;
 import org.apache.tez.mapreduce.examples.IntersectDataGen;
 import org.apache.tez.mapreduce.examples.IntersectExample;
@@ -67,6 +69,7 @@ import org.apache.tez.mapreduce.examples.IntersectValidate;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
 import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -362,5 +365,59 @@ public class TestTezJobs {
 
   }
 
+  // Submits a simple 5 stage sleep job using tez session. Then kills it.
+  @Test(timeout = 60000)
+  public void testHistoryLogging() throws IOException,
+      InterruptedException, TezException, ClassNotFoundException, YarnException {
+    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+    DAG dag = new DAG("TezSleepProcessorHistoryLogging");
+    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 2,
+        Resource.newInstance(1024, 1));
+    dag.addVertex(vertex);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+        .nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+    FileSystem localFs = FileSystem.getLocal(tezConf);
+    Path historyLogDir = new Path(TEST_ROOT_DIR, "testHistoryLogging");
+    localFs.mkdirs(historyLogDir);
+
+    tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR,
+        localFs.makeQualified(historyLogDir).toString());
+
+    TezClient tezClient = new TezClient(tezConf);
+    AMConfiguration amConf = new AMConfiguration(new HashMap<String, String>(),
+        new HashMap<String, LocalResource>(), tezConf, null);
+
+    DAGClient dagClient = tezClient.submitDAGApplication(dag, amConf);
+
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+          + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+
+    FileStatus historyLogFileStatus = null;
+    for (FileStatus fileStatus : localFs.listStatus(historyLogDir)) {
+      if (fileStatus.isDirectory()) {
+        continue;
+      }
+      Path p = fileStatus.getPath();
+      if (p.getName().startsWith(SimpleHistoryLoggingService.LOG_FILE_NAME_PREFIX)) {
+        historyLogFileStatus = fileStatus;
+        break;
+      }
+    }
+    Assert.assertNotNull(historyLogFileStatus);
+    Assert.assertTrue(historyLogFileStatus.getLen() > 0);
+  }
 
 }


[2/3] TEZ-1066. Generate events to integrate with YARN timeline server. (hitesh)

Posted by hi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index e6023f1..da00b06 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -18,19 +18,14 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public class VertexStartedEvent implements HistoryEvent {
 
@@ -54,39 +49,6 @@ public class VertexStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY, vertexID.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
-    // Related entities
-    JSONArray relatedEntities = new JSONArray();
-    JSONObject vertexEntity = new JSONObject();
-    vertexEntity.put(ATSConstants.ENTITY, vertexID.getDAGId().toString());
-    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
-    relatedEntities.put(vertexEntity);
-    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
-    // Events
-    JSONArray events = new JSONArray();
-    JSONObject startEvent = new JSONObject();
-    startEvent.put(ATSConstants.TIMESTAMP, startTime);
-    startEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.VERTEX_STARTED.name());
-    events.put(startEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    // Other info
-    // TODO fix requested times to be events
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.START_REQUESTED_TIME, startRequestedTime);
-    otherInfo.put(ATSConstants.START_TIME, startTime);
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
new file mode 100644
index 0000000..00cac28
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging;
+
+public enum EntityTypes {
+  TEZ_APPLICATION_ATTEMPT,
+  TEZ_CONTAINER_ID,
+  TEZ_DAG_ID,
+  TEZ_VERTEX_ID,
+  TEZ_TASK_ID,
+  TEZ_TASK_ATTEMPT_ID,
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
new file mode 100644
index 0000000..44efad7
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+
+public abstract class HistoryLoggingService extends AbstractService {
+
+  protected AppContext appContext;
+
+  public void setAppContext(AppContext appContext) {
+    this.appContext = appContext;
+  }
+
+  public HistoryLoggingService(String name) {
+    super(name);
+  }
+
+  /**
+   * Handle logging of history event
+   * @param event History event to be logged
+   */
+  public abstract void handle(DAGHistoryEvent event);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
new file mode 100644
index 0000000..e5bb1e5
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -0,0 +1,633 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.history.utils.DAGUtils;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class HistoryEventJsonConversion {
+
+  public static JSONObject convertToJson(HistoryEvent historyEvent) throws JSONException {
+    if (!historyEvent.isHistoryEvent()) {
+      throw new UnsupportedOperationException("Invalid Event, does not support history"
+          + ", eventType=" + historyEvent.getEventType());
+    }
+    JSONObject jsonObject = null;
+    switch (historyEvent.getEventType()) {
+      case AM_LAUNCHED:
+        jsonObject = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
+        break;
+      case AM_STARTED:
+        jsonObject = convertAMStartedEvent((AMStartedEvent) historyEvent);
+        break;
+      case CONTAINER_LAUNCHED:
+        jsonObject = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
+        break;
+      case CONTAINER_STOPPED:
+        jsonObject = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
+        break;
+      case DAG_SUBMITTED:
+        jsonObject = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
+        break;
+      case DAG_INITIALIZED:
+        jsonObject = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
+        break;
+      case DAG_STARTED:
+        jsonObject = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
+        break;
+      case DAG_FINISHED:
+        jsonObject = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
+        break;
+      case VERTEX_INITIALIZED:
+        jsonObject = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
+        break;
+      case VERTEX_STARTED:
+        jsonObject = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
+        break;
+      case VERTEX_FINISHED:
+        jsonObject = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
+      break;
+      case TASK_STARTED:
+        jsonObject = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
+        break;
+      case TASK_FINISHED:
+        jsonObject = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_STARTED:
+        jsonObject = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_FINISHED:
+        jsonObject = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
+        break;
+      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+      case VERTEX_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_FINISHED:
+      case VERTEX_PARALLELISM_UPDATED:
+      case DAG_COMMIT_STARTED:
+        throw new UnsupportedOperationException("Invalid Event, does not support history"
+            + ", eventType=" + historyEvent.getEventType());
+      default:
+        throw new UnsupportedOperationException("Unhandled Event"
+            + ", eventType=" + historyEvent.getEventType());
+    }
+    return jsonObject;
+  }
+
+  private static JSONObject convertAMLaunchedEvent(AMLaunchedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + event.getApplicationAttemptId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+            EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    // Related Entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appEntity = new JSONObject();
+    appEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    appEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ID);
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+            ATSConstants.APPLICATION_ATTEMPT_ID);
+    relatedEntities.put(appEntity);
+    relatedEntities.put(appAttemptEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject initEvent = new JSONObject();
+    initEvent.put(ATSConstants.TIMESTAMP, event.getLaunchTime());
+    initEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.AM_LAUNCHED.name());
+    events.put(initEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info to tag with Tez AM
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+    
+    return jsonObject;
+  }
+
+  private static JSONObject convertAMStartedEvent(AMStartedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + event.getApplicationAttemptId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    // Related Entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appEntity = new JSONObject();
+    appEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    appEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ID);
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ATTEMPT_ID);
+    relatedEntities.put(appEntity);
+    relatedEntities.put(appAttemptEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+    startEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.AM_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    return jsonObject;  }
+
+  private static JSONObject convertContainerLaunchedEvent(ContainerLaunchedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + event.getContainerId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_CONTAINER_ID.name());
+
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    JSONObject containerEntity = new JSONObject();
+    containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+    relatedEntities.put(appAttemptEntity);
+    relatedEntities.put(containerEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject launchEvent = new JSONObject();
+    launchEvent.put(ATSConstants.TIMESTAMP, event.getLaunchTime());
+    launchEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.CONTAINER_LAUNCHED.name());
+    events.put(launchEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // TODO add other container info here? or assume AHS will have this?
+    // TODO container logs?
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertContainerStoppedEvent(ContainerStoppedEvent event) throws JSONException {
+    // structure is identical to ContainerLaunchedEvent
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + event.getContainerId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_CONTAINER_ID.name());
+
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    JSONObject containerEntity = new JSONObject();
+    containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+    relatedEntities.put(appAttemptEntity);
+    relatedEntities.put(containerEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject stopEvent = new JSONObject();
+    stopEvent.put(ATSConstants.TIMESTAMP, event.getStoppedTime());
+    stopEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.CONTAINER_STOPPED.name());
+    events.put(stopEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // TODO add other container info here? or assume AHS will have this?
+    // TODO container logs?
+
+    // Other info
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.EXIT_STATUS, event.getExitStatus());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;  }
+
+  private static JSONObject convertDAGFinishedEvent(DAGFinishedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        event.getDagID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities not needed as should have been done in
+    // dag submission event
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.DAG_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    otherInfo.put(ATSConstants.STATUS, event.getState().name());
+    otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    otherInfo.put(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToJSON(event.getTezCounters()));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertDAGInitializedEvent(DAGInitializedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        event.getDagID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities not needed as should have been done in
+    // dag submission event
+
+    JSONArray events = new JSONArray();
+    JSONObject initEvent = new JSONObject();
+    initEvent.put(ATSConstants.TIMESTAMP, event.getInitTime());
+    initEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.DAG_INITIALIZED.name());
+    events.put(initEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertDAGStartedEvent(DAGStartedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        event.getDagID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities not needed as should have been done in
+    // dag submission event
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+    startEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.DAG_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertDAGSubmittedEvent(DAGSubmittedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        event.getDagID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject tezAppEntity = new JSONObject();
+    tezAppEntity.put(ATSConstants.ENTITY,
+        "tez_" + event.getApplicationAttemptId().toString());
+    tezAppEntity.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+    JSONObject appEntity = new JSONObject();
+    appEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    appEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ID);
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ATTEMPT_ID);
+    JSONObject userEntity = new JSONObject();
+    userEntity.put(ATSConstants.ENTITY,
+        event.getUser());
+    userEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.USER);
+
+    relatedEntities.put(tezAppEntity);
+    relatedEntities.put(appEntity);
+    relatedEntities.put(appAttemptEntity);
+    relatedEntities.put(userEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // filters
+    JSONObject primaryFilters = new JSONObject();
+    primaryFilters.put(ATSConstants.DAG_NAME,
+        event.getDAGName());
+    jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject submitEvent = new JSONObject();
+    submitEvent.put(ATSConstants.TIMESTAMP, event.getSubmitTime());
+    submitEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.DAG_SUBMITTED.name());
+    events.put(submitEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info such as dag plan
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.DAG_PLAN,
+        DAGUtils.generateSimpleJSONPlan(event.getDAGPlan()));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.TASK_ATTEMPT_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    otherInfo.put(ATSConstants.STATUS, event.getState().name());
+    otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    otherInfo.put(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToJSON(event.getCounters()));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject nodeEntity = new JSONObject();
+    nodeEntity.put(ATSConstants.ENTITY, event.getNodeId().toString());
+    nodeEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.NODE_ID);
+
+    JSONObject containerEntity = new JSONObject();
+    containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+    JSONObject taskEntity = new JSONObject();
+    taskEntity.put(ATSConstants.ENTITY, event.getTaskAttemptID().getTaskID().toString());
+    taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+    relatedEntities.put(nodeEntity);
+    relatedEntities.put(containerEntity);
+    relatedEntities.put(taskEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+    startEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.TASK_ATTEMPT_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+    otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskFinishedEvent(TaskFinishedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.TASK_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    otherInfo.put(ATSConstants.STATUS, event.getState().name());
+    otherInfo.put(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToJSON(event.getTezCounters()));
+
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskStartedEvent(TaskStartedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject vertexEntity = new JSONObject();
+    vertexEntity.put(ATSConstants.ENTITY, event.getTaskID().getVertexID().toString());
+    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+    relatedEntities.put(vertexEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+    startEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.TASK_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    // TODO fix schedule/launch time to be events
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+    otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertVertexFinishedEvent(VertexFinishedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.VERTEX_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    otherInfo.put(ATSConstants.STATUS, event.getState().name());
+    otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    otherInfo.put(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToJSON(event.getTezCounters()));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertVertexInitializedEvent(VertexInitializedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject vertexEntity = new JSONObject();
+    vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString());
+    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+    relatedEntities.put(vertexEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject initEvent = new JSONObject();
+    initEvent.put(ATSConstants.TIMESTAMP, event.getInitedTime());
+    initEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.VERTEX_INITIALIZED.name());
+    events.put(initEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    // TODO fix requested times to be events
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.VERTEX_NAME, event.getVertexName());
+    otherInfo.put(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
+    otherInfo.put(ATSConstants.INIT_TIME, event.getInitedTime());
+    otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
+    otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertVertexStartedEvent(VertexStartedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject vertexEntity = new JSONObject();
+    vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString());
+    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+    relatedEntities.put(vertexEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+    startEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.VERTEX_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    // TODO fix requested times to be events
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
+    otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
new file mode 100644
index 0000000..257b6c8
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class SimpleHistoryLoggingService extends HistoryLoggingService {
+
+  private static final Log LOG = LogFactory.getLog(SimpleHistoryLoggingService.class);
+  private Path logFileLocation;
+  private FileSystem logFileFS;
+  private FSDataOutputStream outputStream;
+  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+      new LinkedBlockingQueue<DAGHistoryEvent>();
+  public static final String RECORD_SEPARATOR = System.getProperty("line.separator") + "\u0001";
+  public static final String LOG_FILE_NAME_PREFIX = "history.txt";
+
+  private Thread eventHandlingThread;
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private int consecutiveErrors = 0;
+  private int maxErrors;
+  private boolean loggingDisabled = false;
+
+  public SimpleHistoryLoggingService() {
+    super(SimpleHistoryLoggingService.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    String logDirPath = conf.get(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR);
+    final String logFileName = LOG_FILE_NAME_PREFIX + "." + appContext.getApplicationAttemptId();
+    if (logDirPath == null || logDirPath.isEmpty()) {
+      String logDir = TezUtils.getContainerLogDir();
+      Path p;
+      logFileFS = FileSystem.getLocal(conf);
+      if (logDir != null) {
+        p = new Path(logDir, logFileName);
+      } else {
+        p = new Path(logFileName);
+      }
+      logFileLocation = p;
+    } else {
+      Path p = new Path(logDirPath);
+      logFileFS = p.getFileSystem(conf);
+      if (!logFileFS.exists(p)) {
+        logFileFS.mkdirs(p);
+      }
+      logFileLocation = new Path(logFileFS.resolvePath(p), logFileName);
+    }
+    maxErrors = conf.getInt(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS,
+        TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS_DEFAULT);
+    LOG.info("Initializing SimpleHistoryLoggingService, logFileLocation=" + logFileLocation
+        + ", maxErrors=" + maxErrors);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting SimpleHistoryLoggingService");
+    outputStream = logFileFS.create(logFileLocation, true);
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        DAGHistoryEvent event;
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.info("EventQueue take interrupted. Returning");
+            return;
+          }
+          handleEvent(event);
+        }
+      }
+    }, "HistoryEventHandlingThread");
+    eventHandlingThread.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping SimpleHistoryLoggingService"
+        + ", eventQueueBacklog=" + eventQueue.size());
+    stopped.set(true);
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+    }
+    while (!eventQueue.isEmpty()) {
+      DAGHistoryEvent event = eventQueue.poll();
+      if (event == null) {
+        break;
+      }
+      handleEvent(event);
+    }
+    try {
+      if (outputStream != null) {
+        outputStream.hsync();
+        outputStream.close();
+      }
+    } catch (IOException ioe) {
+      LOG.warn("Failed to close output stream", ioe);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void handle(DAGHistoryEvent event) {
+    eventQueue.add(event);
+  }
+
+  private synchronized void handleEvent(DAGHistoryEvent event) {
+    if (loggingDisabled) {
+      return;
+    }
+    LOG.info("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+    try {
+      try {
+        JSONObject eventJson = HistoryEventJsonConversion.convertToJson(event.getHistoryEvent());
+        outputStream.writeBytes(eventJson.toString());
+        outputStream.writeBytes(RECORD_SEPARATOR);
+      } catch (JSONException e) {
+        LOG.warn("Failed to convert event to json", e);
+      }
+      consecutiveErrors = 0;
+    } catch (IOException ioe) {
+      ++consecutiveErrors;
+      if (consecutiveErrors < maxErrors) {
+        LOG.error("Failed to write to output stream, consecutiveErrorCount=" + consecutiveErrors, ioe);
+      } else {
+        loggingDisabled = true;
+        LOG.error("Disabling SimpleHistoryLoggingService due to multiple errors," +
+            "consecutive max errors reached, maxErrors=" + maxErrors);
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
index f53cc7d..0188a8e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
@@ -39,6 +39,7 @@ public class ATSConstants {
   public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
   public static final String CONTAINER_ID = "containerId";
   public static final String NODE_ID = "nodeId";
+  public static final String USER = "user";
 
   /* Keys used in other info */
   public static final String APP_SUBMIT_TIME = "appSubmitTime";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 782fff1..3997c2f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -18,10 +18,18 @@
 
 package org.apache.tez.dag.history.utils;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.records.TezTaskID;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
@@ -149,4 +157,247 @@ public class DAGUtils {
     return jsonObject;
   }
 
+  public static Map<String,Object> convertCountersToATSMap(TezCounters counters) {
+    Map<String,Object> object = new LinkedHashMap<String, Object>();
+    if (counters == null) {
+        return object;
+      }
+    ArrayList<Object> counterGroupsList = new ArrayList<Object>();
+    for (CounterGroup group : counters) {
+        Map<String,Object> counterGroupMap = new LinkedHashMap<String, Object>();
+        counterGroupMap.put(ATSConstants.COUNTER_GROUP_NAME, group.getName());
+        counterGroupMap.put(ATSConstants.COUNTER_GROUP_DISPLAY_NAME,
+                group.getDisplayName());
+        ArrayList<Object> counterList = new ArrayList<Object>();
+        for (TezCounter counter : group) {
+            Map<String,Object> counterMap = new LinkedHashMap<String, Object>();
+            counterMap.put(ATSConstants.COUNTER_NAME, counter.getName());
+            counterMap.put(ATSConstants.COUNTER_DISPLAY_NAME,
+                    counter.getDisplayName());
+            counterMap.put(ATSConstants.COUNTER_VALUE, counter.getValue());
+            counterList.add(counterMap);
+          }
+        putInto(counterGroupMap, ATSConstants.COUNTERS, counterList);
+        counterGroupsList.add(counterGroupMap);
+      }
+    putInto(object, ATSConstants.COUNTER_GROUPS, counterGroupsList);
+    return object;
+  }
+
+  public static Map<String,Object> convertDAGPlanToATSMap(
+      DAGProtos.DAGPlan dagPlan) {
+
+    final String VERSION_KEY = "version";
+    final int version = 1;
+    final String DAG_NAME_KEY = "dagName";
+    final String VERTICES_KEY = "vertices";
+    final String EDGES_KEY = "edges";
+    final String VERTEX_GROUPS_KEY = "vertexGroups";
+
+    final String VERTEX_NAME_KEY = "vertexName";
+    final String PROCESSOR_CLASS_KEY = "processorClass";
+    final String IN_EDGE_IDS_KEY = "inEdgeIds";
+    final String OUT_EDGE_IDS_KEY = "outEdgeIds";
+    final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
+    final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
+    final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
+        "vertexManagerPluginClass";
+
+    final String EDGE_ID_KEY = "edgeId";
+    final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
+    final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
+    final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
+    final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
+    final String SCHEDULING_TYPE_KEY = "schedulingType";
+    final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
+    final String EDGE_DESTINATION_CLASS_KEY =
+        "edgeDestinationClass";
+
+    final String NAME_KEY = "name";
+    final String CLASS_KEY = "class";
+    final String INITIALIZER_KEY = "initializer";
+
+    final String VERTEX_GROUP_NAME_KEY = "groupName";
+    final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
+    final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
+    final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
+    final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
+
+    Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
+    dagMap.put(DAG_NAME_KEY, dagPlan.getName());
+    dagMap.put(VERSION_KEY, version);
+    ArrayList<Object> verticesList = new ArrayList<Object>();
+    for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
+      Map<String,Object> vertexMap = new LinkedHashMap<String, Object>();
+      vertexMap.put(VERTEX_NAME_KEY, vertexPlan.getName());
+
+      if (vertexPlan.hasProcessorDescriptor()) {
+        vertexMap.put(PROCESSOR_CLASS_KEY,
+            vertexPlan.getProcessorDescriptor().getClassName());
+      }
+
+      ArrayList<Object> inEdgeIdList = new ArrayList<Object>();
+      inEdgeIdList.addAll(vertexPlan.getInEdgeIdList());
+      putInto(vertexMap, IN_EDGE_IDS_KEY, inEdgeIdList);
+
+      ArrayList<Object> outEdgeIdList = new ArrayList<Object>();
+      outEdgeIdList.addAll(vertexPlan.getOutEdgeIdList());
+      putInto(vertexMap, OUT_EDGE_IDS_KEY, outEdgeIdList);
+
+      ArrayList<Object> inputsList = new ArrayList<Object>();
+      for (DAGProtos.RootInputLeafOutputProto input :
+          vertexPlan.getInputsList()) {
+        Map<String,Object> inputMap = new LinkedHashMap<String, Object>();
+        inputMap.put(NAME_KEY, input.getName());
+        inputMap.put(CLASS_KEY, input.getEntityDescriptor().getClassName());
+        if (input.hasInitializerClassName()) {
+          inputMap.put(INITIALIZER_KEY, input.getInitializerClassName());
+        }
+        inputsList.add(inputMap);
+      }
+      putInto(vertexMap, ADDITIONAL_INPUTS_KEY, inputsList);
+
+      ArrayList<Object> outputsList = new ArrayList<Object>();
+      for (DAGProtos.RootInputLeafOutputProto output :
+          vertexPlan.getOutputsList()) {
+        Map<String,Object> outputMap = new LinkedHashMap<String, Object>();
+        outputMap.put(NAME_KEY, output.getName());
+        outputMap.put(CLASS_KEY, output.getEntityDescriptor().getClassName());
+        if (output.hasInitializerClassName()) {
+          outputMap.put(INITIALIZER_KEY, output.getInitializerClassName());
+        }
+        outputsList.add(outputMap);
+      }
+      putInto(vertexMap, ADDITIONAL_OUTPUTS_KEY, outputsList);
+
+      if (vertexPlan.hasVertexManagerPlugin()) {
+        vertexMap.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY,
+            vertexPlan.getVertexManagerPlugin().getClassName());
+      }
+
+      verticesList.add(vertexMap);
+    }
+    putInto(dagMap, VERTICES_KEY, verticesList);
+
+    ArrayList<Object> edgesList = new ArrayList<Object>();
+    for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) {
+      Map<String,Object> edgeMap = new LinkedHashMap<String, Object>();
+      edgeMap.put(EDGE_ID_KEY, edgePlan.getId());
+      edgeMap.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName());
+      edgeMap.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName());
+      edgeMap.put(DATA_MOVEMENT_TYPE_KEY,
+          edgePlan.getDataMovementType().name());
+      edgeMap.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name());
+      edgeMap.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name());
+      edgeMap.put(EDGE_SOURCE_CLASS_KEY,
+          edgePlan.getEdgeSource().getClassName());
+      edgeMap.put(EDGE_DESTINATION_CLASS_KEY,
+          edgePlan.getEdgeDestination().getClassName());
+
+      edgesList.add(edgeMap);
+    }
+    putInto(dagMap, EDGES_KEY, edgesList);
+
+    ArrayList<Object> vertexGroupsList = new ArrayList<Object>();
+    for (DAGProtos.PlanVertexGroupInfo vertexGroupInfo :
+        dagPlan.getVertexGroupsList()) {
+      Map<String,Object> groupMap = new LinkedHashMap<String, Object>();
+      groupMap.put(VERTEX_GROUP_NAME_KEY, vertexGroupInfo.getGroupName());
+      if (vertexGroupInfo.getGroupMembersCount() > 0 ) {
+        groupMap.put(VERTEX_GROUP_MEMBERS_KEY, vertexGroupInfo.getGroupMembersList());
+      }
+      if (vertexGroupInfo.getOutputsCount() > 0) {
+        groupMap.put(VERTEX_GROUP_OUTPUTS_KEY, vertexGroupInfo.getOutputsList());
+      }
+
+      if (vertexGroupInfo.getEdgeMergedInputsCount() > 0) {
+        ArrayList<Object> edgeMergedInputs = new ArrayList<Object>();
+        for (PlanGroupInputEdgeInfo edgeMergedInputInfo :
+            vertexGroupInfo.getEdgeMergedInputsList()) {
+          Map<String,Object> edgeMergedInput = new LinkedHashMap<String, Object>();
+          edgeMergedInput.put(VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY,
+              edgeMergedInputInfo.getDestVertexName());
+          if (edgeMergedInputInfo.hasMergedInput()
+            && edgeMergedInputInfo.getMergedInput().hasClassName()) {
+            edgeMergedInput.put(PROCESSOR_CLASS_KEY,
+                edgeMergedInputInfo.getMergedInput().getClassName());
+          }
+          edgeMergedInputs.add(edgeMergedInput);
+        }
+        groupMap.put(VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY, edgeMergedInputs);
+      }
+      vertexGroupsList.add(groupMap);
+    }
+    putInto(dagMap, VERTEX_GROUPS_KEY, vertexGroupsList);
+
+    return dagMap;
+  }
+
+  private static void putInto(Map<String, Object> map, String key,
+      ArrayList<Object> list) {
+    if (list.isEmpty()) {
+      return;
+    }
+    map.put(key, list);
+  }
+
+  private static ArrayList<String> convertToStringArrayList(
+      Collection<TezTaskID> collection) {
+    ArrayList<String> list = new ArrayList<String>(collection.size());
+    for (TezTaskID t : collection) {
+      list.add(t.toString());
+    }
+    return list;
+  }
+
+  public static Map<String,Object> convertVertexStatsToATSMap(
+      VertexStats vertexStats) {
+    Map<String,Object> vertexStatsMap = new LinkedHashMap<String, Object>();
+    if (vertexStats == null) {
+      return vertexStatsMap;
+    }
+
+    final String FIRST_TASK_START_TIME_KEY = "firstTaskStartTime";
+    final String FIRST_TASKS_TO_START_KEY = "firstTasksToStart";
+    final String LAST_TASK_FINISH_TIME_KEY = "lastTaskFinishTime";
+    final String LAST_TASKS_TO_FINISH_KEY = "lastTasksToFinish";
+
+    final String MIN_TASK_DURATION = "minTaskDuration";
+    final String MAX_TASK_DURATION = "maxTaskDuration";
+    final String AVG_TASK_DURATION = "avgTaskDuration";
+
+    final String SHORTEST_DURATION_TASKS = "shortestDurationTasks";
+    final String LONGEST_DURATION_TASKS = "longestDurationTasks";
+
+    vertexStatsMap.put(FIRST_TASK_START_TIME_KEY, vertexStats.getFirstTaskStartTime());
+    if (vertexStats.getFirstTasksToStart() != null
+        && !vertexStats.getFirstTasksToStart().isEmpty()) {
+      vertexStatsMap.put(FIRST_TASKS_TO_START_KEY,
+          convertToStringArrayList(vertexStats.getFirstTasksToStart()));
+    }
+    vertexStatsMap.put(LAST_TASK_FINISH_TIME_KEY, vertexStats.getLastTaskFinishTime());
+    if (vertexStats.getLastTasksToFinish() != null
+        && !vertexStats.getLastTasksToFinish().isEmpty()) {
+      vertexStatsMap.put(LAST_TASKS_TO_FINISH_KEY,
+          convertToStringArrayList(vertexStats.getLastTasksToFinish()));
+    }
+
+    vertexStatsMap.put(MIN_TASK_DURATION, vertexStats.getMinTaskDuration());
+    vertexStatsMap.put(MAX_TASK_DURATION, vertexStats.getMaxTaskDuration());
+    vertexStatsMap.put(AVG_TASK_DURATION, vertexStats.getAvgTaskDuration());
+
+    if (vertexStats.getShortestDurationTasks() != null
+        && !vertexStats.getShortestDurationTasks().isEmpty()) {
+      vertexStatsMap.put(SHORTEST_DURATION_TASKS,
+          convertToStringArrayList(vertexStats.getShortestDurationTasks()));
+    }
+    if (vertexStats.getLongestDurationTasks() != null
+        && !vertexStats.getLongestDurationTasks().isEmpty()) {
+      vertexStatsMap.put(LONGEST_DURATION_TASKS,
+          convertToStringArrayList(vertexStats.getLongestDurationTasks()));
+    }
+
+    return vertexStatsMap;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index b22b162..164bd2f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -112,7 +112,7 @@ public class TestHistoryEventsProtoConversion {
     AMLaunchedEvent event = new AMLaunchedEvent(
         ApplicationAttemptId.newInstance(
             ApplicationId.newInstance(0, 1), 1),
-        100, 100);
+        100, 100, null);
     AMLaunchedEvent deserializedEvent = (AMLaunchedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getApplicationAttemptId(),
@@ -127,7 +127,7 @@ public class TestHistoryEventsProtoConversion {
   private void testAMStartedEvent() throws Exception {
     AMStartedEvent event = new AMStartedEvent(
         ApplicationAttemptId.newInstance(
-            ApplicationId.newInstance(0, 1), 1), 100);
+            ApplicationId.newInstance(0, 1), 1), 100, "");
     AMStartedEvent deserializedEvent = (AMStartedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getApplicationAttemptId(),
@@ -142,7 +142,7 @@ public class TestHistoryEventsProtoConversion {
         ApplicationId.newInstance(0, 1), 1), 1001l,
         DAGPlan.newBuilder().setName("foo").build(),
         ApplicationAttemptId.newInstance(
-            ApplicationId.newInstance(0, 1), 1), null);
+            ApplicationId.newInstance(0, 1), 1), null, "");
     DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getApplicationAttemptId(),
@@ -160,7 +160,8 @@ public class TestHistoryEventsProtoConversion {
 
   private void testDAGInitializedEvent() throws Exception {
     DAGInitializedEvent event = new DAGInitializedEvent(
-        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,
+        "user", "dagName");
     DAGInitializedEvent deserializedEvent = (DAGInitializedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getDagID(),
@@ -171,7 +172,8 @@ public class TestHistoryEventsProtoConversion {
 
   private void testDAGStartedEvent() throws Exception {
     DAGStartedEvent event = new DAGStartedEvent(
-        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,
+        "user", "dagName");
     DAGStartedEvent deserializedEvent = (DAGStartedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getDagID(),
@@ -184,7 +186,7 @@ public class TestHistoryEventsProtoConversion {
     {
       DAGFinishedEvent event = new DAGFinishedEvent(
           TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
-          DAGState.FAILED, null, null);
+          DAGState.FAILED, null, null, "user", "dagName");
       DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(
@@ -204,7 +206,8 @@ public class TestHistoryEventsProtoConversion {
       tezCounters.getGroup("foo").findCounter("c1").increment(1);
       DAGFinishedEvent event = new DAGFinishedEvent(
           TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
-          DAGState.FAILED, "bad diagnostics", tezCounters);
+          DAGState.FAILED, "bad diagnostics", tezCounters,
+          "user", "dagName");
       DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
new file mode 100644
index 0000000..67be9f5
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryEventJsonConversion {
+
+  private ApplicationAttemptId applicationAttemptId;
+  private ApplicationId applicationId;
+  private String user = "user";
+  private Random random = new Random();
+  private TezDAGID tezDAGID;
+  private TezVertexID tezVertexID;
+  private TezTaskID tezTaskID;
+  private TezTaskAttemptID tezTaskAttemptID;
+  private DAGPlan dagPlan;
+  private ContainerId containerId;
+  private NodeId nodeId;
+
+  @Before
+  public void setup() {
+    applicationId = ApplicationId.newInstance(9999l, 1);
+    applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+    tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
+    tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
+    tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
+    tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
+    dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+    containerId = ContainerId.newInstance(applicationAttemptId, 111);
+    nodeId = NodeId.newInstance("node", 13435);
+  }
+
+  @Test
+  public void testHandlerExists() throws JSONException {
+    for (HistoryEventType eventType : HistoryEventType.values()) {
+      HistoryEvent event = null;
+      switch (eventType) {
+        case AM_LAUNCHED:
+          event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
+              user);
+          break;
+        case AM_STARTED:
+          event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
+          break;
+        case DAG_SUBMITTED:
+          event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
+              null, user);
+          break;
+        case DAG_INITIALIZED:
+          event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          break;
+        case DAG_STARTED:
+          event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          break;
+        case DAG_FINISHED:
+          event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
+              null, null, user, dagPlan.getName());
+          break;
+        case VERTEX_INITIALIZED:
+          event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+              random.nextInt(), "proc", null);
+          break;
+        case VERTEX_STARTED:
+          event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
+          break;
+        case VERTEX_PARALLELISM_UPDATED:
+          event = new VertexParallelismUpdatedEvent();
+          break;
+        case VERTEX_FINISHED:
+          event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+              random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
+              null, null, null);
+          break;
+        case TASK_STARTED:
+          event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
+          break;
+        case TASK_FINISHED:
+          event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
+              tezTaskAttemptID, TaskState.FAILED, null);
+          break;
+        case TASK_ATTEMPT_STARTED:
+          event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
+              nodeId, null, null);
+          break;
+        case TASK_ATTEMPT_FINISHED:
+          event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
+              random.nextInt(), TaskAttemptState.FAILED, null, null);
+          break;
+        case CONTAINER_LAUNCHED:
+          event = new ContainerLaunchedEvent(containerId, random.nextInt(),
+              applicationAttemptId);
+          break;
+        case CONTAINER_STOPPED:
+          event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
+          break;
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+          event = new VertexDataMovementEventsGeneratedEvent();
+          break;
+        case DAG_COMMIT_STARTED:
+          event = new DAGCommitStartedEvent();
+          break;
+        case VERTEX_COMMIT_STARTED:
+          event = new VertexCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          event = new VertexGroupCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          event = new VertexGroupCommitFinishedEvent();
+          break;
+        default:
+          Assert.fail("Unhandled event type " + eventType);
+      }
+      if (event == null || !event.isHistoryEvent()) {
+        continue;
+      }
+      HistoryEventJsonConversion.convertToJson(event);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index bbe3256..95668a9 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -54,6 +54,19 @@
 
   <profiles>
     <profile>
+      <id>hadoop24</id>
+      <activation>
+         <activeByDefault>true</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.tez</groupId>
+          <artifactId>tez-yarn-timeline-history</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
       <id>dist-tar</id>
       <activation>
         <activeByDefault>false</activeByDefault>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/pom.xml b/tez-plugins/pom.xml
new file mode 100644
index 0000000..393de62
--- /dev/null
+++ b/tez-plugins/pom.xml
@@ -0,0 +1,50 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-plugins</artifactId>
+  <packaging>pom</packaging>
+
+  <profiles>
+    <profile>
+      <id>hadoop24</id>
+      <activation>
+         <activeByDefault>true</activeByDefault>
+      </activation>
+      <modules>
+        <module>tez-yarn-timeline-history</module>
+      </modules>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/pom.xml b/tez-plugins/tez-yarn-timeline-history/pom.xml
new file mode 100644
index 0000000..150a149
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/pom.xml
@@ -0,0 +1,71 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez-plugins</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-yarn-timeline-history</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-internals</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
new file mode 100644
index 0000000..11114b6
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.HashSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ATSHistoryLoggingService extends HistoryLoggingService {
+
+  private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class);
+
+  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+      new LinkedBlockingQueue<DAGHistoryEvent>();
+
+  private Thread eventHandlingThread;
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private int eventCounter = 0;
+  private int eventsProcessed = 0;
+  private final Object lock = new Object();
+
+  @VisibleForTesting
+  TimelineClient timelineClient;
+
+  private HashSet<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
+  private long maxTimeToWaitOnShutdown;
+
+  public ATSHistoryLoggingService() {
+    super(ATSHistoryLoggingService.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    LOG.info("Initializing ATSService");
+    timelineClient = TimelineClient.createTimelineClient();
+    timelineClient.init(conf);
+    maxTimeToWaitOnShutdown = conf.getLong(
+        TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
+        TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT);
+  }
+
+  @Override
+  public void serviceStart() {
+    LOG.info("Starting ATSService");
+    timelineClient.start();
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        DAGHistoryEvent event;
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+
+          // Log the size of the event-queue every so often.
+          if (eventCounter != 0 && eventCounter % 1000 == 0) {
+            LOG.info("Event queue stats"
+                + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
+                + ", eventQueueSize=" + eventQueue.size());
+            eventCounter = 0;
+            eventsProcessed = 0;
+          } else {
+            ++eventCounter;
+          }
+
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.info("EventQueue take interrupted. Returning");
+            return;
+          }
+
+          synchronized (lock) {
+            ++eventsProcessed;
+            try {
+              handleEvent(event);
+            } catch (Exception e) {
+              // TODO handle failures - treat as fatal or ignore?
+              LOG.warn("Error handling event", e);
+            }
+          }
+        }
+      }
+    }, "HistoryEventHandlingThread");
+    eventHandlingThread.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    LOG.info("Stopping ATSService"
+        + ", eventQueueBacklog=" + eventQueue.size());
+    stopped.set(true);
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+    }
+    synchronized (lock) {
+      if (!eventQueue.isEmpty()) {
+        LOG.warn("ATSService being stopped"
+            + ", eventQueueBacklog=" + eventQueue.size()
+            + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown);
+        long startTime = appContext.getClock().getTime();
+        if (maxTimeToWaitOnShutdown > 0) {
+          long endTime = startTime + maxTimeToWaitOnShutdown;
+          while (endTime >= appContext.getClock().getTime()) {
+            DAGHistoryEvent event = eventQueue.poll();
+            if (event == null) {
+              break;
+            }
+            try {
+              handleEvent(event);
+            } catch (Exception e) {
+              LOG.warn("Error handling event", e);
+              break;
+            }
+          }
+        }
+      }
+    }
+    if (!eventQueue.isEmpty()) {
+      LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
+          + ", eventQueueBacklog=" + eventQueue.size());
+    }
+    timelineClient.stop();
+  }
+
+  public void handle(DAGHistoryEvent event) {
+    eventQueue.add(event);
+  }
+
+  private void handleEvent(DAGHistoryEvent event) {
+    HistoryEventType eventType = event.getHistoryEvent().getEventType();
+
+    TezDAGID dagId = event.getDagID();
+
+    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+      DAGSubmittedEvent dagSubmittedEvent =
+          (DAGSubmittedEvent) event.getHistoryEvent();
+      String dagName = dagSubmittedEvent.getDAGName();
+      if (dagName != null
+          && dagName.startsWith(
+          TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+        // Skip recording pre-warm DAG events
+        skippedDAGs.add(dagId);
+        return;
+      }
+    }
+    if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+      // Remove from set to keep size small
+      // No more events should be seen after this point.
+      if (skippedDAGs.remove(dagId)) {
+        return;
+      }
+    }
+
+    if (dagId != null && skippedDAGs.contains(dagId)) {
+      // Skip pre-warm DAGs
+      return;
+    }
+
+    try {
+      TimelinePutResponse response =
+          timelineClient.putEntities(
+              HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()));
+      if (response != null
+        && !response.getErrors().isEmpty()) {
+        TimelinePutError err = response.getErrors().get(0);
+        if (err.getErrorCode() != 0) {
+          LOG.warn("Could not post history event to ATS, eventType="
+              + eventType
+              + ", atsPutError=" + err.getErrorCode());
+        }
+      }
+      // Do nothing additional, ATS client library should handle throttling
+      // or auto-disable as needed
+    } catch (Exception e) {
+      LOG.warn("Could not handle history event, eventType="
+          + eventType, e);
+    }
+  }
+
+}


[3/3] git commit: TEZ-1066. Generate events to integrate with YARN timeline server. (hitesh)

Posted by hi...@apache.org.
TEZ-1066. Generate events to integrate with YARN timeline server. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bc657961
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bc657961
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bc657961

Branch: refs/heads/master
Commit: bc65796146fca5d7d6eb49e85e20fda5bade0b57
Parents: 6e07fc7
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed May 21 14:54:08 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed May 21 14:54:08 2014 -0700

----------------------------------------------------------------------
 BUILDING.txt                                    |  14 +-
 pom.xml                                         |   8 +-
 .../apache/tez/dag/api/TezConfiguration.java    |  19 +-
 .../java/org/apache/tez/common/TezUtils.java    |  18 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   8 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   9 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   2 +
 .../dag/app/launcher/ContainerLauncherImpl.java |   2 +-
 .../apache/tez/dag/history/HistoryEvent.java    |   6 +-
 .../tez/dag/history/HistoryEventHandler.java    |  32 +-
 .../apache/tez/dag/history/ats/ATSService.java  | 124 ----
 .../apache/tez/dag/history/ats/EntityTypes.java |  28 -
 .../tez/dag/history/events/AMLaunchedEvent.java |  63 +-
 .../tez/dag/history/events/AMStartedEvent.java  |  57 +-
 .../history/events/ContainerLaunchedEvent.java  |  52 +-
 .../history/events/ContainerStoppedEvent.java   |  60 +-
 .../history/events/DAGCommitStartedEvent.java   |   8 -
 .../dag/history/events/DAGFinishedEvent.java    |  58 +-
 .../dag/history/events/DAGInitializedEvent.java |  33 +-
 .../tez/dag/history/events/DAGStartedEvent.java |  53 +-
 .../dag/history/events/DAGSubmittedEvent.java   |  81 +--
 .../events/TaskAttemptFinishedEvent.java        |  51 +-
 .../history/events/TaskAttemptStartedEvent.java |  66 +-
 .../dag/history/events/TaskFinishedEvent.java   |  51 +-
 .../dag/history/events/TaskStartedEvent.java    |  47 +-
 .../events/VertexCommitStartedEvent.java        |   8 -
 .../VertexDataMovementEventsGeneratedEvent.java |  20 +-
 .../dag/history/events/VertexFinishedEvent.java |  49 +-
 .../events/VertexGroupCommitFinishedEvent.java  |   8 -
 .../events/VertexGroupCommitStartedEvent.java   |   8 -
 .../history/events/VertexInitializedEvent.java  |  58 +-
 .../events/VertexParallelismUpdatedEvent.java   |  22 +-
 .../dag/history/events/VertexStartedEvent.java  |  46 +-
 .../tez/dag/history/logging/EntityTypes.java    |  28 +
 .../history/logging/HistoryLoggingService.java  |  43 ++
 .../impl/HistoryEventJsonConversion.java        | 633 +++++++++++++++++++
 .../impl/SimpleHistoryLoggingService.java       | 169 +++++
 .../tez/dag/history/utils/ATSConstants.java     |   1 +
 .../apache/tez/dag/history/utils/DAGUtils.java  | 251 ++++++++
 .../TestHistoryEventsProtoConversion.java       |  17 +-
 .../impl/TestHistoryEventJsonConversion.java    | 179 ++++++
 tez-dist/pom.xml                                |  13 +
 tez-plugins/pom.xml                             |  50 ++
 tez-plugins/tez-yarn-timeline-history/pom.xml   |  71 +++
 .../logging/ats/ATSHistoryLoggingService.java   | 209 ++++++
 .../ats/HistoryEventTimelineConversion.java     | 462 ++++++++++++++
 .../ats/TestATSHistoryLoggingService.java       | 108 ++++
 .../ats/TestHistoryEventTimelineConversion.java | 179 ++++++
 .../java/org/apache/tez/test/TestTezJobs.java   |  57 ++
 49 files changed, 2736 insertions(+), 903 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/BUILDING.txt
----------------------------------------------------------------------
diff --git a/BUILDING.txt b/BUILDING.txt
index 5245101..4f2f44e 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -20,6 +20,7 @@ Maven main modules:
         - tez-mapreduce ...............(Tez mapreduce)
         - tez-dag .....................(Tez dag)
         - tez-mapreduce-examples ......(Tez mapreduce examples)
+        - tez-plugins .................(Tez plugins)
         - tez-tests ...................(Tez tests)
         - tez-dist ....................(Tez dist)
 
@@ -37,7 +38,7 @@ Maven build goals:
  * Run clover                : mvn test -Pclover [-Dclover.license=${user.home}/clover.license]
  * Run Rat                   : mvn apache-rat:check
  * Build javadocs            : mvn javadoc:javadoc
- * Build distribution        : mvn package[-Dtar][-Dhadoop.version=2.2.0]
+ * Build distribution        : mvn package[-Dtar][-Dhadoop.version=2.4.0]
  * Visualize state machines  : mvn compile -Pvisualize -DskipTests=true
  
 Build options:
@@ -58,11 +59,18 @@ Tests options:
 ----------------------------------------------------------------------------------
 Building against a specific version of hadoop:
 
-Tez runs on top of Apache Hadoop YARN and requires hadoop version 2.2.0 or higher
-For example to build tez against hadoop 3.0.0-SNAPSHOT 
+Tez runs on top of Apache Hadoop YARN and requires hadoop version 2.2.0 or higher.
+
+By default, it can be compiled against hadoop versions 2.4.0 and higher by just
+specifying the hadoop.version. For example, to build tez against hadoop 3.0.0-SNAPSHOT 
 
  $ mvn package -Dtar -Dhadoop.version=3.0.0-SNAPSHOT
  
+However, to compile against hadoop versions lower than 2.4.0, the hadoop24 profile needs
+to be disabled
+
+ $ mvn package -Dtar -Dhadoop.version=2.2.0 -P\!hadoop24
+
 To skip Tests and java docs
 
  $ mvn package -Dtar -Dhadoop.version=3.0.0-SNAPSHOT -DskipTests -Dmaven.javadoc.skip=true

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d1d8b6d..6b0b591 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
   <properties>
     <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
     <clover.license>${user.home}/clover.license</clover.license>
-    <hadoop.version>2.2.0</hadoop.version>
+    <hadoop.version>2.4.0</hadoop.version>
     <jetty.version>7.6.10.v20130312</jetty.version>
     <distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
     <distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
@@ -140,6 +140,11 @@
         <version>${project.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.tez</groupId>
+        <artifactId>tez-plugins</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>
         <version>${hadoop.version}</version>
@@ -326,6 +331,7 @@
     <module>tez-mapreduce-examples</module>
     <module>tez-tests</module>
     <module>tez-dag</module>
+    <module>tez-plugins</module>
     <module>tez-dist</module>
     <module>docs</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index ba735fa..36fdd59 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -365,9 +365,22 @@ public class TezConfiguration extends Configuration {
   @Private
   public static final String TEZ_PREWARM_DAG_NAME_PREFIX = "TezPreWarmDAG";
 
-  public static final String YARN_ATS_ENABLED =
-      TEZ_PREFIX + "yarn.ats.enabled";
-  public static final boolean YARN_ATS_ENABLED_DEFAULT = false;
+  public static final String TEZ_HISTORY_LOGGING_SERVICE_CLASS =
+      TEZ_PREFIX + "history.logging.service.class";
+
+  public static final String TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT =
+      "org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService";
+
+  public static final String TEZ_SIMPLE_HISTORY_LOGGING_DIR =
+      TEZ_PREFIX + "simple.history.logging.dir";
+  public static final String TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS =
+      TEZ_PREFIX + "simple.history.max.errors";
+  public static final int TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS_DEFAULT = 10;
+
+  public static final String YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS =
+      TEZ_PREFIX + "yarn.ats.event.flush.timeout.millis";
+  public static final long YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT =
+      3000l;
 
   public static final String DAG_RECOVERY_ENABLED =
       TEZ_PREFIX + "dag.recovery.enabled";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index b900527..3e3f5eb 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -28,6 +28,7 @@ import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.zip.DataFormatException;
@@ -41,6 +42,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.log4j.Appender;
 import org.apache.log4j.Logger;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -54,6 +57,7 @@ import com.google.protobuf.ByteString;
 public class TezUtils {
 
   private static final Log LOG = LogFactory.getLog(TezUtils.class);
+  private static final Random RANDOM = new Random();
 
   public static void addUserSpecifiedTezConfiguration(Configuration conf) throws IOException {
     FileInputStream confPBBinaryStream = null;
@@ -329,4 +333,18 @@ public class TezUtils {
     }
     return bytes;
   }
+
+  public static String getContainerLogDir() {
+    String logDirsStr  = System.getenv(Environment.LOG_DIRS.name());
+    if (logDirsStr == null || logDirsStr.isEmpty()) {
+      return null;
+    }
+    String[] logDirs = StringUtils.split(logDirsStr, ',');
+    if (logDirs.length == 0) {
+      return null;
+    }
+    int logIndex = RANDOM.nextInt(logDirs.length);
+    return logDirs[logIndex];
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e16064a..aed9aa7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -416,7 +416,7 @@ public class DAGAppMaster extends AbstractService {
     super.serviceInit(conf);
 
     AMLaunchedEvent launchedEvent = new AMLaunchedEvent(appAttemptID,
-        startTime, appSubmitTime);
+        startTime, appSubmitTime, appMasterUgi.getShortUserName());
     historyEventHandler.handle(
         new DAGHistoryEvent(launchedEvent));
 
@@ -1553,7 +1553,8 @@ public class DAGAppMaster extends AbstractService {
     DefaultMetricsSystem.initialize("DAGAppMaster");
 
     this.appsStartTime = clock.getTime();
-    AMStartedEvent startEvent = new AMStartedEvent(appAttemptID, appsStartTime);
+    AMStartedEvent startEvent = new AMStartedEvent(appAttemptID,
+        appsStartTime, appMasterUgi.getShortUserName());
     historyEventHandler.handle(
         new DAGHistoryEvent(startEvent));
 
@@ -1891,7 +1892,8 @@ public class DAGAppMaster extends AbstractService {
     // Job name is the same as the app name until we support multiple dags
     // for an app later
     DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
-        submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources);
+        submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
+        newDAG.getUserName());
     try {
       historyEventHandler.handleCriticalEvent(
           new DAGHistoryEvent(newDAG.getID(), submittedEvent));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index c92d51b..71d92b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -895,21 +895,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   void logJobHistoryFinishedEvent() throws IOException {
     this.setFinishTime();
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
-        finishTime, DAGState.SUCCEEDED, "", getAllCounters());
+        finishTime, DAGState.SUCCEEDED, "", getAllCounters(),
+        this.userName, this.dagName);
     this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(dagId, finishEvt));
   }
 
   void logJobHistoryInitedEvent() {
     DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId,
-        this.initTime);
+        this.initTime, this.userName, this.dagName);
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(dagId, initEvt));
   }
 
   void logJobHistoryStartedEvent() {
     DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId,
-        this.startTime);
+        this.startTime, this.userName, this.dagName);
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(dagId, startEvt));
   }
@@ -918,7 +919,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
         clock.getTime(), state,
         StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
-        getAllCounters());
+        getAllCounters(), this.userName, this.dagName);
     this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(dagId, finishEvt));
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9701be4..17257ea 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -56,6 +57,7 @@ import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 437d057..8ea7c8c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -174,7 +174,7 @@ public class ContainerLauncherImpl extends AbstractService implements
         ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
             containerID, clock.getTime(), context.getApplicationAttemptId());
         context.getHistoryHandler().handle(new DAGHistoryEvent(
-            context.getCurrentDAGID(), lEvt));
+            null, lEvt));
 
         this.state = ContainerState.RUNNING;
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
index 3f756c0..1ca0d5f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
@@ -22,15 +22,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
 public interface HistoryEvent {
 
   public HistoryEventType getEventType();
 
-  public JSONObject convertToATSJSON() throws JSONException;
-
   public boolean isRecoveryEvent();
 
   public boolean isHistoryEvent();
@@ -38,4 +33,5 @@ public interface HistoryEvent {
   public void toProtoStream(OutputStream outputStream) throws IOException;
 
   public void fromProtoStream(InputStream inputStream) throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 4eb094f..82e063a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -18,28 +18,27 @@
 
 package org.apache.tez.dag.history;
 
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.tez.common.RuntimeUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.history.ats.ATSService;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
 import org.apache.tez.dag.history.recovery.RecoveryService;
 import org.apache.tez.dag.records.TezDAGID;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public class HistoryEventHandler extends CompositeService {
 
   private static Log LOG = LogFactory.getLog(HistoryEventHandler.class);
 
   private final AppContext context;
-  private boolean yarnATSEnabled;
-  private ATSService atsService;
   private RecoveryService recoveryService;
   private boolean recoveryEnabled;
+  private HistoryLoggingService historyLoggingService;
 
   public HistoryEventHandler(AppContext context) {
     super(HistoryEventHandler.class.getName());
@@ -49,14 +48,19 @@ public class HistoryEventHandler extends CompositeService {
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     LOG.info("Initializing HistoryEventHandler");
-    this.yarnATSEnabled = context.getAMConf().getBoolean(TezConfiguration.YARN_ATS_ENABLED,
-        TezConfiguration.YARN_ATS_ENABLED_DEFAULT);
+
     this.recoveryEnabled = context.getAMConf().getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
         TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
-    if (yarnATSEnabled) {
-      atsService = new ATSService();
-      addService(atsService);
-    }
+
+    String historyServiceClassName = context.getAMConf().get(
+        TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
+
+    historyLoggingService =
+        RuntimeUtils.createClazzInstance(historyServiceClassName);
+    historyLoggingService.setAppContext(context);
+    addService(historyLoggingService);
+
     if (recoveryEnabled) {
       recoveryService = new RecoveryService(context);
       addService(recoveryService);
@@ -97,8 +101,8 @@ public class HistoryEventHandler extends CompositeService {
     if (recoveryEnabled && event.getHistoryEvent().isRecoveryEvent()) {
       recoveryService.handle(event);
     }
-    if (yarnATSEnabled && event.getHistoryEvent().isHistoryEvent()) {
-      atsService.handle(event);
+    if (event.getHistoryEvent().isHistoryEvent()) {
+      historyLoggingService.handle(event);
     }
 
     // TODO at some point we should look at removing this once

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
deleted file mode 100644
index 690e850..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.ats;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ATSService extends AbstractService {
-
-  private static final Log LOG = LogFactory.getLog(ATSService.class);
-
-  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
-      new LinkedBlockingQueue<DAGHistoryEvent>();
-
-  private final AtomicInteger historyCounter =
-      new AtomicInteger(0);
-  private String outputFilePrefix;
-  private Thread eventHandlingThread;
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-  private int eventCounter = 0;
-  private int eventsProcessed = 0;
-  private final Object lock = new Object();
-
-  public ATSService() {
-    super(ATSService.class.getName());
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    LOG.info("Initializing ATSService");
-
-  }
-
-  @Override
-  public void serviceStart() {
-    LOG.info("Starting ATSService");
-    eventHandlingThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        DAGHistoryEvent event;
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
-
-          // Log the size of the event-queue every so often.
-          if (eventCounter != 0 && eventCounter % 1000 == 0) {
-            LOG.info("Event queue stats"
-                + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
-                + ", eventQueueSize=" + eventQueue.size());
-            eventCounter = 0;
-            eventsProcessed = 0;
-          } else {
-            ++eventCounter;
-          }
-
-          try {
-            event = eventQueue.take();
-          } catch (InterruptedException e) {
-            LOG.info("EventQueue take interrupted. Returning");
-            return;
-          }
-
-          synchronized (lock) {
-            ++eventsProcessed;
-            try {
-              handleEvent(event);
-            } catch (Exception e) {
-              // TODO handle failures - treat as fatal or ignore?
-              LOG.warn("Error handling event", e);
-            }
-          }
-        }
-      }
-    }, "HistoryEventHandlingThread");
-    eventHandlingThread.start();
-  }
-
-  @Override
-  public void serviceStop() {
-    LOG.info("Stopping ATSService");
-    stopped.set(true);
-    if (eventHandlingThread != null) {
-      eventHandlingThread.interrupt();
-    }
-  }
-
-  public void handle(DAGHistoryEvent event) {
-    eventQueue.add(event);
-  }
-
-  private void handleEvent(DAGHistoryEvent event) {
-    HistoryEventType eventType = event.getHistoryEvent().getEventType();
-    try {
-      // TODO integrate with ATS
-    } catch (Exception e) {
-      LOG.warn("Could not handle history event, eventType="
-          + eventType, e);
-      // TODO handle error as a fatal event or ignore/skip?
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
deleted file mode 100644
index a7f0208..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.ats;
-
-public enum EntityTypes {
-  TEZ_APPLICATION_ATTEMPT,
-  TEZ_CONTAINER_ID,
-  TEZ_DAG_ID,
-  TEZ_VERTEX_ID,
-  TEZ_TASK_ID,
-  TEZ_TASK_ATTEMPT_ID,
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
index 54bc658..049d340 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -18,35 +18,32 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.AMLaunchedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public class AMLaunchedEvent implements HistoryEvent {
 
   private ApplicationAttemptId applicationAttemptId;
   private long launchTime;
   private long appSubmitTime;
+  private String user;
 
   public AMLaunchedEvent() {
   }
 
   public AMLaunchedEvent(ApplicationAttemptId appAttemptId,
-      long launchTime, long appSubmitTime) {
+      long launchTime, long appSubmitTime, String user) {
     this.applicationAttemptId = appAttemptId;
     this.launchTime = launchTime;
     this.appSubmitTime = appSubmitTime;
+    this.user = user;
   }
 
   @Override
@@ -55,48 +52,6 @@ public class AMLaunchedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY,
-        "tez_" + applicationAttemptId.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
-    // Related Entities
-    JSONArray relatedEntities = new JSONArray();
-    JSONObject appEntity = new JSONObject();
-    appEntity.put(ATSConstants.ENTITY,
-        applicationAttemptId.getApplicationId().toString());
-    appEntity.put(ATSConstants.ENTITY_TYPE,
-        ATSConstants.APPLICATION_ID);
-    JSONObject appAttemptEntity = new JSONObject();
-    appAttemptEntity.put(ATSConstants.ENTITY,
-        applicationAttemptId.toString());
-    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
-        ATSConstants.APPLICATION_ATTEMPT_ID);
-    relatedEntities.put(appEntity);
-    relatedEntities.put(appAttemptEntity);
-    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
-    // TODO decide whether this goes into different events,
-    // event info or other info.
-    JSONArray events = new JSONArray();
-    JSONObject initEvent = new JSONObject();
-    initEvent.put(ATSConstants.TIMESTAMP, launchTime);
-    initEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.AM_LAUNCHED.name());
-    events.put(initEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    // Other info to tag with Tez AM
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.APP_SUBMIT_TIME, appSubmitTime);
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return false;
   }
@@ -151,4 +106,8 @@ public class AMLaunchedEvent implements HistoryEvent {
     return appSubmitTime;
   }
 
+  public String getUser() {
+    return user;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index e66141b..aa7b3f2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -18,33 +18,30 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.AMStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public class AMStartedEvent implements HistoryEvent {
 
   private ApplicationAttemptId applicationAttemptId;
   private long startTime;
+  private String user;
 
   public AMStartedEvent() {
   }
 
   public AMStartedEvent(ApplicationAttemptId appAttemptId,
-      long startTime) {
+      long startTime, String user) {
     this.applicationAttemptId = appAttemptId;
     this.startTime = startTime;
+    this.user = user;
   }
 
   @Override
@@ -53,43 +50,6 @@ public class AMStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY,
-        "tez_" + applicationAttemptId.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
-    // Related Entities
-    JSONArray relatedEntities = new JSONArray();
-    JSONObject appEntity = new JSONObject();
-    appEntity.put(ATSConstants.ENTITY,
-        applicationAttemptId.getApplicationId().toString());
-    appEntity.put(ATSConstants.ENTITY_TYPE,
-        ATSConstants.APPLICATION_ID);
-    JSONObject appAttemptEntity = new JSONObject();
-    appAttemptEntity.put(ATSConstants.ENTITY,
-        applicationAttemptId.toString());
-    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
-        ATSConstants.APPLICATION_ATTEMPT_ID);
-    relatedEntities.put(appEntity);
-    relatedEntities.put(appAttemptEntity);
-    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
-    // TODO decide whether this goes into different events,
-    // event info or other info.
-    JSONArray events = new JSONArray();
-    JSONObject startEvent = new JSONObject();
-    startEvent.put(ATSConstants.TIMESTAMP, startTime);
-    startEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.AM_STARTED.name());
-    events.put(startEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }
@@ -137,5 +97,8 @@ public class AMStartedEvent implements HistoryEvent {
     return startTime;
   }
 
+  public String getUser() {
+    return user;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index 471ddd1..c37b7f1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -18,21 +18,16 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.ContainerLaunchedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public class ContainerLaunchedEvent implements HistoryEvent {
 
@@ -57,45 +52,6 @@ public class ContainerLaunchedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY,
-        "tez_" + containerId.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_CONTAINER_ID.name());
-
-    JSONArray relatedEntities = new JSONArray();
-    JSONObject appAttemptEntity = new JSONObject();
-    appAttemptEntity.put(ATSConstants.ENTITY,
-        applicationAttemptId.toString());
-    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
-    JSONObject containerEntity = new JSONObject();
-    containerEntity.put(ATSConstants.ENTITY, containerId.toString());
-    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
-
-    relatedEntities.put(appAttemptEntity);
-    relatedEntities.put(containerEntity);
-    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
-    // TODO decide whether this goes into different events,
-    // event info or other info.
-    JSONArray events = new JSONArray();
-    JSONObject launchEvent = new JSONObject();
-    launchEvent.put(ATSConstants.TIMESTAMP, launchTime);
-    launchEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.CONTAINER_LAUNCHED.name());
-    events.put(launchEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    // TODO add other container info here? or assume AHS will have this?
-    // TODO container logs?
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
index a544354..549720a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
@@ -18,23 +18,16 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.ContainerStoppedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public class ContainerStoppedEvent implements HistoryEvent {
 
@@ -62,51 +55,6 @@ public class ContainerStoppedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    // structure is identical to ContainerLaunchedEvent
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY,
-        "tez_" + containerId.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_CONTAINER_ID.name());
-
-    JSONArray relatedEntities = new JSONArray();
-    JSONObject appAttemptEntity = new JSONObject();
-    appAttemptEntity.put(ATSConstants.ENTITY,
-        applicationAttemptId.toString());
-    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
-    JSONObject containerEntity = new JSONObject();
-    containerEntity.put(ATSConstants.ENTITY, containerId.toString());
-    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
-
-    relatedEntities.put(appAttemptEntity);
-    relatedEntities.put(containerEntity);
-    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
-    // TODO decide whether this goes into different events,
-    // event info or other info.
-    JSONArray events = new JSONArray();
-    JSONObject stopEvent = new JSONObject();
-    stopEvent.put(ATSConstants.TIMESTAMP, stopTime);
-    stopEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.CONTAINER_STOPPED.name());
-    events.put(stopEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-    
-    // TODO add other container info here? or assume AHS will have this?
-    // TODO container logs?
-
-    // Other info
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.EXIT_STATUS, exitStatus);
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-    
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index 627751a..95e8630 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGCommitStartedProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.utils.ProtoUtils;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 
 public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
 
@@ -51,12 +49,6 @@ public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    // TODO
-    return null;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 14381b3..1cfc36e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -22,47 +22,50 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGFinishedProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 
 import com.google.common.primitives.Ints;
 import com.google.protobuf.ByteString;
 
 public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
 
+  private static final Log LOG = LogFactory.getLog(DAGFinishedEvent.class);
+
   private TezDAGID dagID;
   private long startTime;
   private long finishTime;
   private DAGState state;
   private String diagnostics;
   private TezCounters tezCounters;
+  private String user;
+  private String dagName;
 
   public DAGFinishedEvent() {
   }
 
   public DAGFinishedEvent(TezDAGID dagId, long startTime,
       long finishTime, DAGState state,
-      String diagnostics, TezCounters counters) {
+      String diagnostics, TezCounters counters,
+      String user, String dagName) {
     this.dagID = dagId;
     this.startTime = startTime;
     this.finishTime = finishTime;
     this.state = state;
     this.diagnostics = diagnostics;
     this.tezCounters = counters;
+    this.user = user;
+    this.dagName = dagName;
   }
 
   @Override
@@ -71,40 +74,6 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY,
-        dagID.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_DAG_ID.name());
-
-    // Related Entities not needed as should have been done in
-    // dag submission event
-
-    // TODO decide whether this goes into different events,
-    // event info or other info.
-    JSONArray events = new JSONArray();
-    JSONObject finishEvent = new JSONObject();
-    finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
-    finishEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.DAG_FINISHED.name());
-    events.put(finishEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.START_TIME, startTime);
-    otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
-    otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
-    otherInfo.put(ATSConstants.STATUS, state.name());
-    otherInfo.put(ATSConstants.DIAGNOSTICS, diagnostics);
-    otherInfo.put(ATSConstants.COUNTERS,
-        DAGUtils.convertCountersToJSON(this.tezCounters));
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }
@@ -215,4 +184,11 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
     return tezCounters;
   }
 
+  public String getUser() {
+    return user;
+  }
+
+  public String getDagName() {
+    return dagName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 9b001b6..b3a0165 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -18,29 +18,31 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
-// TODO fix class
 public class DAGInitializedEvent implements HistoryEvent {
 
   private TezDAGID dagID;
   private long initTime;
+  private String user;
+  private String dagName;
 
   public DAGInitializedEvent() {
   }
 
-  public DAGInitializedEvent(TezDAGID dagID, long initTime) {
+  public DAGInitializedEvent(TezDAGID dagID, long initTime,
+      String user, String dagName) {
     this.dagID = dagID;
     this.initTime = initTime;
+    this.user = user;
+    this.dagName = dagName;
   }
 
   @Override
@@ -49,12 +51,6 @@ public class DAGInitializedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    // TODO
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }
@@ -101,4 +97,13 @@ public class DAGInitializedEvent implements HistoryEvent {
   public TezDAGID getDagID() {
     return dagID;
   }
+
+  public String getUser() {
+    return user;
+  }
+
+  public String getDagName() {
+    return dagName;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index a1bcdf2..4e7b880 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -18,31 +18,31 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public class DAGStartedEvent implements HistoryEvent {
 
   private TezDAGID dagID;
   private long startTime;
+  private String user;
+  private String dagName;
 
   public DAGStartedEvent() {
   }
 
-  public DAGStartedEvent(TezDAGID dagID, long startTime) {
+  public DAGStartedEvent(TezDAGID dagID, long startTime,
+      String user, String dagName) {
     this.dagID = dagID;
     this.startTime = startTime;
+    this.user = user;
+    this.dagName = dagName;
   }
 
   @Override
@@ -51,30 +51,6 @@ public class DAGStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY,
-        dagID.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_DAG_ID.name());
-
-    // Related Entities not needed as should have been done in
-    // dag submission event
-
-    // TODO decide whether this goes into different events,
-    // event info or other info.
-    JSONArray events = new JSONArray();
-    JSONObject startEvent = new JSONObject();
-    startEvent.put(ATSConstants.TIMESTAMP, startTime);
-    startEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.DAG_STARTED.name());
-    events.put(startEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }
@@ -120,4 +96,13 @@ public class DAGStartedEvent implements HistoryEvent {
   public TezDAGID getDagID() {
     return dagID;
   }
+
+  public String getUser() {
+    return user;
+  }
+
+  public String getDagName() {
+    return dagName;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 18f2205..f04afa4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -23,31 +23,32 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGSubmittedProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.utils.ProtoUtils;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
+
 
 public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
 
+  private static final Log LOG = LogFactory.getLog(DAGSubmittedEvent.class);
+
   private TezDAGID dagID;
   private long submitTime;
   private DAGProtos.DAGPlan dagPlan;
   private ApplicationAttemptId applicationAttemptId;
+  private String user;
   private Map<String, LocalResource> cumulativeAdditionalLocalResources;
 
   public DAGSubmittedEvent() {
@@ -55,12 +56,14 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
 
   public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
       DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
-      Map<String, LocalResource> cumulativeAdditionalLocalResources) {
+      Map<String, LocalResource> cumulativeAdditionalLocalResources,
+      String user) {
     this.dagID = dagID;
     this.submitTime = submitTime;
     this.dagPlan = dagPlan;
     this.applicationAttemptId = applicationAttemptId;
     this.cumulativeAdditionalLocalResources = cumulativeAdditionalLocalResources;
+    this.user = user;
   }
 
   @Override
@@ -69,62 +72,6 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY,
-        dagID.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_DAG_ID.name());
-
-    // Related Entities
-    JSONArray relatedEntities = new JSONArray();
-    JSONObject tezAppEntity = new JSONObject();
-    tezAppEntity.put(ATSConstants.ENTITY,
-        "tez_" + applicationAttemptId.toString());
-    tezAppEntity.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-    JSONObject appEntity = new JSONObject();
-    appEntity.put(ATSConstants.ENTITY,
-        applicationAttemptId.getApplicationId().toString());
-    appEntity.put(ATSConstants.ENTITY_TYPE,
-        ATSConstants.APPLICATION_ID);
-    JSONObject appAttemptEntity = new JSONObject();
-    appAttemptEntity.put(ATSConstants.ENTITY,
-        applicationAttemptId.toString());
-    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
-        ATSConstants.APPLICATION_ATTEMPT_ID);
-
-    relatedEntities.put(tezAppEntity);
-    relatedEntities.put(appEntity);
-    relatedEntities.put(appAttemptEntity);
-    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
-    // filters
-    JSONObject primaryFilters = new JSONObject();
-    primaryFilters.put(ATSConstants.DAG_NAME,
-        dagPlan.getName());
-    jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
-
-    // TODO decide whether this goes into different events,
-    // event info or other info.
-    JSONArray events = new JSONArray();
-    JSONObject submitEvent = new JSONObject();
-    submitEvent.put(ATSConstants.TIMESTAMP, submitTime);
-    submitEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.DAG_SUBMITTED.name());
-    events.put(submitEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    // Other info such as dag plan
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.DAG_PLAN,
-        DAGUtils.generateSimpleJSONPlan(dagPlan));
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }
@@ -220,4 +167,12 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
     return submitTime;
   }
 
+  public DAGPlan getDagPlan() {
+    return dagPlan;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index ecb6818..aa85efb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -18,26 +18,24 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public class TaskAttemptFinishedEvent implements HistoryEvent {
 
+  private static final Log LOG = LogFactory.getLog(TaskAttemptFinishedEvent.class);
+
   private TezTaskAttemptID taskAttemptId;
   private String vertexName;
   private long startTime;
@@ -71,35 +69,6 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY, taskAttemptId.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
-
-    // Events
-    JSONArray events = new JSONArray();
-    JSONObject finishEvent = new JSONObject();
-    finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
-    finishEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.TASK_ATTEMPT_FINISHED.name());
-    events.put(finishEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.START_TIME, startTime);
-    otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
-    otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
-    otherInfo.put(ATSConstants.STATUS, state.name());
-    otherInfo.put(ATSConstants.DIAGNOSTICS, diagnostics);
-    otherInfo.put(ATSConstants.COUNTERS,
-        DAGUtils.convertCountersToJSON(this.tezCounters));
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }
@@ -183,4 +152,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     return state;
   }
 
+  public long getStartTime() {
+    return startTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index ba91db8..76109ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -18,22 +18,17 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public class TaskAttemptStartedEvent implements HistoryEvent {
 
@@ -67,50 +62,6 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY, taskAttemptId.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE,
-        EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
-
-    // Related entities
-    JSONArray relatedEntities = new JSONArray();
-    JSONObject nodeEntity = new JSONObject();
-    nodeEntity.put(ATSConstants.ENTITY, nodeId.toString());
-    nodeEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.NODE_ID);
-
-    JSONObject containerEntity = new JSONObject();
-    containerEntity.put(ATSConstants.ENTITY, containerId.toString());
-    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
-
-    JSONObject taskEntity = new JSONObject();
-    taskEntity.put(ATSConstants.ENTITY, taskAttemptId.getTaskID().toString());
-    taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
-
-    relatedEntities.put(nodeEntity);
-    relatedEntities.put(containerEntity);
-    relatedEntities.put(taskEntity);
-    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
-    // Events
-    JSONArray events = new JSONArray();
-    JSONObject startEvent = new JSONObject();
-    startEvent.put(ATSConstants.TIMESTAMP, startTime);
-    startEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.TASK_ATTEMPT_STARTED.name());
-    events.put(startEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    // Other info
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, inProgressLogsUrl);
-    otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, completedLogsUrl);
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }
@@ -173,4 +124,13 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   public NodeId getNodeId() {
     return nodeId;
   }
+
+  public String getInProgressLogsUrl() {
+    return inProgressLogsUrl;
+  }
+
+  public String getCompletedLogsUrl() {
+    return completedLogsUrl;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index 713ecd8..0940987 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -18,27 +18,25 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskFinishedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public class TaskFinishedEvent implements HistoryEvent {
 
+  private static final Log LOG = LogFactory.getLog(TaskFinishedEvent.class);
+
   private TezTaskID taskID;
   private String vertexName;
   private long startTime;
@@ -68,34 +66,6 @@ public class TaskFinishedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY, taskID.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
-
-    // Events
-    JSONArray events = new JSONArray();
-    JSONObject finishEvent = new JSONObject();
-    finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
-    finishEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.TASK_FINISHED.name());
-    events.put(finishEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.START_TIME, startTime);
-    otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
-    otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
-    otherInfo.put(ATSConstants.STATUS, state.name());
-    otherInfo.put(ATSConstants.COUNTERS,
-        DAGUtils.convertCountersToJSON(this.tezCounters));
-
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }
@@ -178,4 +148,9 @@ public class TaskFinishedEvent implements HistoryEvent {
   public TezTaskAttemptID getSuccessfulAttemptID() {
     return successfulAttemptID;
   }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
index c2a380b..e93f3b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
@@ -18,19 +18,14 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public class TaskStartedEvent implements HistoryEvent {
 
@@ -56,40 +51,6 @@ public class TaskStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY, taskID.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
-
-    // Related entities
-    JSONArray relatedEntities = new JSONArray();
-    JSONObject vertexEntity = new JSONObject();
-    vertexEntity.put(ATSConstants.ENTITY, taskID.getVertexID().toString());
-    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-    relatedEntities.put(vertexEntity);
-    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
-    // Events
-    JSONArray events = new JSONArray();
-    JSONObject startEvent = new JSONObject();
-    startEvent.put(ATSConstants.TIMESTAMP, startTime);
-    startEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.TASK_STARTED.name());
-    events.put(startEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-    
-    // Other info
-    // TODO fix schedule/launch time to be events
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.START_TIME, startTime);
-    otherInfo.put(ATSConstants.SCHEDULED_TIME, scheduledTime);
-
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index 387bff1..a7e74e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexCommitStartedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 
 import com.google.protobuf.ByteString;
 
@@ -53,12 +51,6 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    // TODO
-    return null;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
index 035c9ca..03a3cdf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
@@ -18,7 +18,12 @@
 
 package org.apache.tez.dag.history.events;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.common.ProtoConverters;
@@ -36,14 +41,8 @@ import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TezEvent;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.EnumSet;
-import java.util.List;
+import com.google.common.collect.Lists;
 
 public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
 
@@ -79,11 +78,6 @@ public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    return null;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 8321a38..6c18d9f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.app.dag.VertexState;
@@ -29,20 +31,16 @@ import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishStateProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 
 public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
 
+  private static final Log LOG = LogFactory.getLog(VertexFinishedEvent.class);
+
   private TezVertexID vertexID;
   private String vertexName;
   private long initRequestedTime;
@@ -83,33 +81,6 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY, vertexID.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
-    // Events
-    JSONArray events = new JSONArray();
-    JSONObject finishEvent = new JSONObject();
-    finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
-    finishEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.VERTEX_FINISHED.name());
-    events.put(finishEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
-    otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
-    otherInfo.put(ATSConstants.STATUS, state.name());
-    otherInfo.put(ATSConstants.DIAGNOSTICS, diagnostics);
-    otherInfo.put(ATSConstants.COUNTERS,
-        DAGUtils.convertCountersToJSON(this.tezCounters));
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }
@@ -198,6 +169,18 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
     return tezCounters;
   }
 
+  public VertexStats getVertexStats() {
+    return vertexStats;
+  }
+
+  public String getVertexName() {
+    return vertexName;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
   @Override
   public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
     VertexFinishStateProto finishStateProto =

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
index 99a5288..4c042e1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitFinishedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 
 public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEvent {
 
@@ -54,12 +52,6 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    // TODO
-    return null;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
index 04d6276..d7df13a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitStartedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 
 public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent {
 
@@ -54,12 +52,6 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    // TODO
-    return null;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index e9e4a8c..2ae1b3a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -18,6 +18,12 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -26,20 +32,9 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexInitializedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 public class VertexInitializedEvent implements HistoryEvent {
 
@@ -75,42 +70,6 @@ public class VertexInitializedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY, vertexID.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
-    // Related entities
-    JSONArray relatedEntities = new JSONArray();
-    JSONObject vertexEntity = new JSONObject();
-    vertexEntity.put(ATSConstants.ENTITY, vertexID.getDAGId().toString());
-    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
-    relatedEntities.put(vertexEntity);
-    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
-    // Events
-    JSONArray events = new JSONArray();
-    JSONObject initEvent = new JSONObject();
-    initEvent.put(ATSConstants.TIMESTAMP, initedTime);
-    initEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.VERTEX_INITIALIZED.name());
-    events.put(initEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    // Other info
-    // TODO fix requested times to be events
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.VERTEX_NAME, vertexName);
-    otherInfo.put(ATSConstants.INIT_REQUESTED_TIME, initRequestedTime);
-    otherInfo.put(ATSConstants.INIT_TIME, initedTime);
-    otherInfo.put(ATSConstants.NUM_TASKS, numTasks);
-    otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME, processorName);
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }
@@ -214,4 +173,9 @@ public class VertexInitializedEvent implements HistoryEvent {
   public String getProcessorName() {
     return processorName;
   }
+
+  public String getVertexName() {
+    return vertexName;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
index 43cc787..39748ec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
@@ -18,6 +18,13 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -26,15 +33,6 @@ import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexParallelismUpdatedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
 
 public class VertexParallelismUpdatedEvent implements HistoryEvent {
 
@@ -61,12 +59,6 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    throw new UnsupportedOperationException("VertexParallelismUpdatedEvent"
-        + " not a History event");
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }