You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/05/22 00:13:14 UTC

[2/3] TEZ-1066. Generate events to integrate with YARN timeline server. (hitesh)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index e6023f1..da00b06 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -18,19 +18,14 @@
 
 package org.apache.tez.dag.history.events;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public class VertexStartedEvent implements HistoryEvent {
 
@@ -54,39 +49,6 @@ public class VertexStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public JSONObject convertToATSJSON() throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY, vertexID.toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
-    // Related entities
-    JSONArray relatedEntities = new JSONArray();
-    JSONObject vertexEntity = new JSONObject();
-    vertexEntity.put(ATSConstants.ENTITY, vertexID.getDAGId().toString());
-    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
-    relatedEntities.put(vertexEntity);
-    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
-    // Events
-    JSONArray events = new JSONArray();
-    JSONObject startEvent = new JSONObject();
-    startEvent.put(ATSConstants.TIMESTAMP, startTime);
-    startEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.VERTEX_STARTED.name());
-    events.put(startEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    // Other info
-    // TODO fix requested times to be events
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.START_REQUESTED_TIME, startRequestedTime);
-    otherInfo.put(ATSConstants.START_TIME, startTime);
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    return jsonObject;
-  }
-
-  @Override
   public boolean isRecoveryEvent() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
new file mode 100644
index 0000000..00cac28
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/EntityTypes.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging;
+
+public enum EntityTypes {
+  TEZ_APPLICATION_ATTEMPT,
+  TEZ_CONTAINER_ID,
+  TEZ_DAG_ID,
+  TEZ_VERTEX_ID,
+  TEZ_TASK_ID,
+  TEZ_TASK_ATTEMPT_ID,
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
new file mode 100644
index 0000000..44efad7
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/HistoryLoggingService.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+
+public abstract class HistoryLoggingService extends AbstractService {
+
+  protected AppContext appContext;
+
+  public void setAppContext(AppContext appContext) {
+    this.appContext = appContext;
+  }
+
+  public HistoryLoggingService(String name) {
+    super(name);
+  }
+
+  /**
+   * Handle logging of history event
+   * @param event History event to be logged
+   */
+  public abstract void handle(DAGHistoryEvent event);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
new file mode 100644
index 0000000..e5bb1e5
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -0,0 +1,633 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import org.apache.tez.dag.history.utils.ATSConstants;
+import org.apache.tez.dag.history.utils.DAGUtils;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class HistoryEventJsonConversion {
+
+  public static JSONObject convertToJson(HistoryEvent historyEvent) throws JSONException {
+    if (!historyEvent.isHistoryEvent()) {
+      throw new UnsupportedOperationException("Invalid Event, does not support history"
+          + ", eventType=" + historyEvent.getEventType());
+    }
+    JSONObject jsonObject = null;
+    switch (historyEvent.getEventType()) {
+      case AM_LAUNCHED:
+        jsonObject = convertAMLaunchedEvent((AMLaunchedEvent) historyEvent);
+        break;
+      case AM_STARTED:
+        jsonObject = convertAMStartedEvent((AMStartedEvent) historyEvent);
+        break;
+      case CONTAINER_LAUNCHED:
+        jsonObject = convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent);
+        break;
+      case CONTAINER_STOPPED:
+        jsonObject = convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent);
+        break;
+      case DAG_SUBMITTED:
+        jsonObject = convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent);
+        break;
+      case DAG_INITIALIZED:
+        jsonObject = convertDAGInitializedEvent((DAGInitializedEvent) historyEvent);
+        break;
+      case DAG_STARTED:
+        jsonObject = convertDAGStartedEvent((DAGStartedEvent) historyEvent);
+        break;
+      case DAG_FINISHED:
+        jsonObject = convertDAGFinishedEvent((DAGFinishedEvent) historyEvent);
+        break;
+      case VERTEX_INITIALIZED:
+        jsonObject = convertVertexInitializedEvent((VertexInitializedEvent) historyEvent);
+        break;
+      case VERTEX_STARTED:
+        jsonObject = convertVertexStartedEvent((VertexStartedEvent) historyEvent);
+        break;
+      case VERTEX_FINISHED:
+        jsonObject = convertVertexFinishedEvent((VertexFinishedEvent) historyEvent);
+      break;
+      case TASK_STARTED:
+        jsonObject = convertTaskStartedEvent((TaskStartedEvent) historyEvent);
+        break;
+      case TASK_FINISHED:
+        jsonObject = convertTaskFinishedEvent((TaskFinishedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_STARTED:
+        jsonObject = convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent);
+        break;
+      case TASK_ATTEMPT_FINISHED:
+        jsonObject = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
+        break;
+      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+      case VERTEX_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_STARTED:
+      case VERTEX_GROUP_COMMIT_FINISHED:
+      case VERTEX_PARALLELISM_UPDATED:
+      case DAG_COMMIT_STARTED:
+        throw new UnsupportedOperationException("Invalid Event, does not support history"
+            + ", eventType=" + historyEvent.getEventType());
+      default:
+        throw new UnsupportedOperationException("Unhandled Event"
+            + ", eventType=" + historyEvent.getEventType());
+    }
+    return jsonObject;
+  }
+
+  private static JSONObject convertAMLaunchedEvent(AMLaunchedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + event.getApplicationAttemptId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+            EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    // Related Entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appEntity = new JSONObject();
+    appEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    appEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ID);
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+            ATSConstants.APPLICATION_ATTEMPT_ID);
+    relatedEntities.put(appEntity);
+    relatedEntities.put(appAttemptEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject initEvent = new JSONObject();
+    initEvent.put(ATSConstants.TIMESTAMP, event.getLaunchTime());
+    initEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.AM_LAUNCHED.name());
+    events.put(initEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info to tag with Tez AM
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+    
+    return jsonObject;
+  }
+
+  private static JSONObject convertAMStartedEvent(AMStartedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + event.getApplicationAttemptId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    // Related Entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appEntity = new JSONObject();
+    appEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    appEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ID);
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ATTEMPT_ID);
+    relatedEntities.put(appEntity);
+    relatedEntities.put(appAttemptEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+    startEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.AM_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    return jsonObject;  }
+
+  private static JSONObject convertContainerLaunchedEvent(ContainerLaunchedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + event.getContainerId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_CONTAINER_ID.name());
+
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    JSONObject containerEntity = new JSONObject();
+    containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+    relatedEntities.put(appAttemptEntity);
+    relatedEntities.put(containerEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject launchEvent = new JSONObject();
+    launchEvent.put(ATSConstants.TIMESTAMP, event.getLaunchTime());
+    launchEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.CONTAINER_LAUNCHED.name());
+    events.put(launchEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // TODO add other container info here? or assume AHS will have this?
+    // TODO container logs?
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertContainerStoppedEvent(ContainerStoppedEvent event) throws JSONException {
+    // structure is identical to ContainerLaunchedEvent
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + event.getContainerId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_CONTAINER_ID.name());
+
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    JSONObject containerEntity = new JSONObject();
+    containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+    relatedEntities.put(appAttemptEntity);
+    relatedEntities.put(containerEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject stopEvent = new JSONObject();
+    stopEvent.put(ATSConstants.TIMESTAMP, event.getStoppedTime());
+    stopEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.CONTAINER_STOPPED.name());
+    events.put(stopEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // TODO add other container info here? or assume AHS will have this?
+    // TODO container logs?
+
+    // Other info
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.EXIT_STATUS, event.getExitStatus());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;  }
+
+  private static JSONObject convertDAGFinishedEvent(DAGFinishedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        event.getDagID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities not needed as should have been done in
+    // dag submission event
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.DAG_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    otherInfo.put(ATSConstants.STATUS, event.getState().name());
+    otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    otherInfo.put(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToJSON(event.getTezCounters()));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertDAGInitializedEvent(DAGInitializedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        event.getDagID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities not needed as should have been done in
+    // dag submission event
+
+    JSONArray events = new JSONArray();
+    JSONObject initEvent = new JSONObject();
+    initEvent.put(ATSConstants.TIMESTAMP, event.getInitTime());
+    initEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.DAG_INITIALIZED.name());
+    events.put(initEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertDAGStartedEvent(DAGStartedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        event.getDagID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities not needed as should have been done in
+    // dag submission event
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+    startEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.DAG_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertDAGSubmittedEvent(DAGSubmittedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        event.getDagID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject tezAppEntity = new JSONObject();
+    tezAppEntity.put(ATSConstants.ENTITY,
+        "tez_" + event.getApplicationAttemptId().toString());
+    tezAppEntity.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+    JSONObject appEntity = new JSONObject();
+    appEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().getApplicationId().toString());
+    appEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ID);
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY,
+        event.getApplicationAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.APPLICATION_ATTEMPT_ID);
+    JSONObject userEntity = new JSONObject();
+    userEntity.put(ATSConstants.ENTITY,
+        event.getUser());
+    userEntity.put(ATSConstants.ENTITY_TYPE,
+        ATSConstants.USER);
+
+    relatedEntities.put(tezAppEntity);
+    relatedEntities.put(appEntity);
+    relatedEntities.put(appAttemptEntity);
+    relatedEntities.put(userEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // filters
+    JSONObject primaryFilters = new JSONObject();
+    primaryFilters.put(ATSConstants.DAG_NAME,
+        event.getDAGName());
+    jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject submitEvent = new JSONObject();
+    submitEvent.put(ATSConstants.TIMESTAMP, event.getSubmitTime());
+    submitEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.DAG_SUBMITTED.name());
+    events.put(submitEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info such as dag plan
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.DAG_PLAN,
+        DAGUtils.generateSimpleJSONPlan(event.getDAGPlan()));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.TASK_ATTEMPT_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    otherInfo.put(ATSConstants.STATUS, event.getState().name());
+    otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    otherInfo.put(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToJSON(event.getCounters()));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE,
+        EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject nodeEntity = new JSONObject();
+    nodeEntity.put(ATSConstants.ENTITY, event.getNodeId().toString());
+    nodeEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.NODE_ID);
+
+    JSONObject containerEntity = new JSONObject();
+    containerEntity.put(ATSConstants.ENTITY, event.getContainerId().toString());
+    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+    JSONObject taskEntity = new JSONObject();
+    taskEntity.put(ATSConstants.ENTITY, event.getTaskAttemptID().getTaskID().toString());
+    taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+    relatedEntities.put(nodeEntity);
+    relatedEntities.put(containerEntity);
+    relatedEntities.put(taskEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+    startEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.TASK_ATTEMPT_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+    otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskFinishedEvent(TaskFinishedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.TASK_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    otherInfo.put(ATSConstants.STATUS, event.getState().name());
+    otherInfo.put(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToJSON(event.getTezCounters()));
+
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskStartedEvent(TaskStartedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject vertexEntity = new JSONObject();
+    vertexEntity.put(ATSConstants.ENTITY, event.getTaskID().getVertexID().toString());
+    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+    relatedEntities.put(vertexEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+    startEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.TASK_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    // TODO fix schedule/launch time to be events
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+    otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
+
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertVertexFinishedEvent(VertexFinishedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getFinishTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.VERTEX_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
+    otherInfo.put(ATSConstants.STATUS, event.getState().name());
+    otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
+    otherInfo.put(ATSConstants.COUNTERS,
+        DAGUtils.convertCountersToJSON(event.getTezCounters()));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertVertexInitializedEvent(VertexInitializedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject vertexEntity = new JSONObject();
+    vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString());
+    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+    relatedEntities.put(vertexEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject initEvent = new JSONObject();
+    initEvent.put(ATSConstants.TIMESTAMP, event.getInitedTime());
+    initEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.VERTEX_INITIALIZED.name());
+    events.put(initEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    // TODO fix requested times to be events
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.VERTEX_NAME, event.getVertexName());
+    otherInfo.put(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime());
+    otherInfo.put(ATSConstants.INIT_TIME, event.getInitedTime());
+    otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
+    otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertVertexStartedEvent(VertexStartedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject vertexEntity = new JSONObject();
+    vertexEntity.put(ATSConstants.ENTITY, event.getVertexID().getDAGId().toString());
+    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+    relatedEntities.put(vertexEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getStartTime());
+    startEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.VERTEX_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    // TODO fix requested times to be events
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime());
+    otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
new file mode 100644
index 0000000..257b6c8
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class SimpleHistoryLoggingService extends HistoryLoggingService {
+
+  private static final Log LOG = LogFactory.getLog(SimpleHistoryLoggingService.class);
+  private Path logFileLocation;
+  private FileSystem logFileFS;
+  private FSDataOutputStream outputStream;
+  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+      new LinkedBlockingQueue<DAGHistoryEvent>();
+  public static final String RECORD_SEPARATOR = System.getProperty("line.separator") + "\u0001";
+  public static final String LOG_FILE_NAME_PREFIX = "history.txt";
+
+  private Thread eventHandlingThread;
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private int consecutiveErrors = 0;
+  private int maxErrors;
+  private boolean loggingDisabled = false;
+
+  public SimpleHistoryLoggingService() {
+    super(SimpleHistoryLoggingService.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    String logDirPath = conf.get(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR);
+    final String logFileName = LOG_FILE_NAME_PREFIX + "." + appContext.getApplicationAttemptId();
+    if (logDirPath == null || logDirPath.isEmpty()) {
+      String logDir = TezUtils.getContainerLogDir();
+      Path p;
+      logFileFS = FileSystem.getLocal(conf);
+      if (logDir != null) {
+        p = new Path(logDir, logFileName);
+      } else {
+        p = new Path(logFileName);
+      }
+      logFileLocation = p;
+    } else {
+      Path p = new Path(logDirPath);
+      logFileFS = p.getFileSystem(conf);
+      if (!logFileFS.exists(p)) {
+        logFileFS.mkdirs(p);
+      }
+      logFileLocation = new Path(logFileFS.resolvePath(p), logFileName);
+    }
+    maxErrors = conf.getInt(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS,
+        TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS_DEFAULT);
+    LOG.info("Initializing SimpleHistoryLoggingService, logFileLocation=" + logFileLocation
+        + ", maxErrors=" + maxErrors);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting SimpleHistoryLoggingService");
+    outputStream = logFileFS.create(logFileLocation, true);
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        DAGHistoryEvent event;
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.info("EventQueue take interrupted. Returning");
+            return;
+          }
+          handleEvent(event);
+        }
+      }
+    }, "HistoryEventHandlingThread");
+    eventHandlingThread.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping SimpleHistoryLoggingService"
+        + ", eventQueueBacklog=" + eventQueue.size());
+    stopped.set(true);
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+    }
+    while (!eventQueue.isEmpty()) {
+      DAGHistoryEvent event = eventQueue.poll();
+      if (event == null) {
+        break;
+      }
+      handleEvent(event);
+    }
+    try {
+      if (outputStream != null) {
+        outputStream.hsync();
+        outputStream.close();
+      }
+    } catch (IOException ioe) {
+      LOG.warn("Failed to close output stream", ioe);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void handle(DAGHistoryEvent event) {
+    eventQueue.add(event);
+  }
+
+  private synchronized void handleEvent(DAGHistoryEvent event) {
+    if (loggingDisabled) {
+      return;
+    }
+    LOG.info("Writing event " + event.getHistoryEvent().getEventType() + " to history file");
+    try {
+      try {
+        JSONObject eventJson = HistoryEventJsonConversion.convertToJson(event.getHistoryEvent());
+        outputStream.writeBytes(eventJson.toString());
+        outputStream.writeBytes(RECORD_SEPARATOR);
+      } catch (JSONException e) {
+        LOG.warn("Failed to convert event to json", e);
+      }
+      consecutiveErrors = 0;
+    } catch (IOException ioe) {
+      ++consecutiveErrors;
+      if (consecutiveErrors < maxErrors) {
+        LOG.error("Failed to write to output stream, consecutiveErrorCount=" + consecutiveErrors, ioe);
+      } else {
+        loggingDisabled = true;
+        LOG.error("Disabling SimpleHistoryLoggingService due to multiple errors," +
+            "consecutive max errors reached, maxErrors=" + maxErrors);
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
index f53cc7d..0188a8e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/ATSConstants.java
@@ -39,6 +39,7 @@ public class ATSConstants {
   public static final String APPLICATION_ATTEMPT_ID = "applicationAttemptId";
   public static final String CONTAINER_ID = "containerId";
   public static final String NODE_ID = "nodeId";
+  public static final String USER = "user";
 
   /* Keys used in other info */
   public static final String APP_SUBMIT_TIME = "appSubmitTime";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 782fff1..3997c2f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -18,10 +18,18 @@
 
 package org.apache.tez.dag.history.utils;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.records.TezTaskID;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
@@ -149,4 +157,247 @@ public class DAGUtils {
     return jsonObject;
   }
 
+  public static Map<String,Object> convertCountersToATSMap(TezCounters counters) {
+    Map<String,Object> object = new LinkedHashMap<String, Object>();
+    if (counters == null) {
+        return object;
+      }
+    ArrayList<Object> counterGroupsList = new ArrayList<Object>();
+    for (CounterGroup group : counters) {
+        Map<String,Object> counterGroupMap = new LinkedHashMap<String, Object>();
+        counterGroupMap.put(ATSConstants.COUNTER_GROUP_NAME, group.getName());
+        counterGroupMap.put(ATSConstants.COUNTER_GROUP_DISPLAY_NAME,
+                group.getDisplayName());
+        ArrayList<Object> counterList = new ArrayList<Object>();
+        for (TezCounter counter : group) {
+            Map<String,Object> counterMap = new LinkedHashMap<String, Object>();
+            counterMap.put(ATSConstants.COUNTER_NAME, counter.getName());
+            counterMap.put(ATSConstants.COUNTER_DISPLAY_NAME,
+                    counter.getDisplayName());
+            counterMap.put(ATSConstants.COUNTER_VALUE, counter.getValue());
+            counterList.add(counterMap);
+          }
+        putInto(counterGroupMap, ATSConstants.COUNTERS, counterList);
+        counterGroupsList.add(counterGroupMap);
+      }
+    putInto(object, ATSConstants.COUNTER_GROUPS, counterGroupsList);
+    return object;
+  }
+
+  public static Map<String,Object> convertDAGPlanToATSMap(
+      DAGProtos.DAGPlan dagPlan) {
+
+    final String VERSION_KEY = "version";
+    final int version = 1;
+    final String DAG_NAME_KEY = "dagName";
+    final String VERTICES_KEY = "vertices";
+    final String EDGES_KEY = "edges";
+    final String VERTEX_GROUPS_KEY = "vertexGroups";
+
+    final String VERTEX_NAME_KEY = "vertexName";
+    final String PROCESSOR_CLASS_KEY = "processorClass";
+    final String IN_EDGE_IDS_KEY = "inEdgeIds";
+    final String OUT_EDGE_IDS_KEY = "outEdgeIds";
+    final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
+    final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
+    final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
+        "vertexManagerPluginClass";
+
+    final String EDGE_ID_KEY = "edgeId";
+    final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
+    final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
+    final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
+    final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
+    final String SCHEDULING_TYPE_KEY = "schedulingType";
+    final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
+    final String EDGE_DESTINATION_CLASS_KEY =
+        "edgeDestinationClass";
+
+    final String NAME_KEY = "name";
+    final String CLASS_KEY = "class";
+    final String INITIALIZER_KEY = "initializer";
+
+    final String VERTEX_GROUP_NAME_KEY = "groupName";
+    final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
+    final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
+    final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
+    final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
+
+    Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
+    dagMap.put(DAG_NAME_KEY, dagPlan.getName());
+    dagMap.put(VERSION_KEY, version);
+    ArrayList<Object> verticesList = new ArrayList<Object>();
+    for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
+      Map<String,Object> vertexMap = new LinkedHashMap<String, Object>();
+      vertexMap.put(VERTEX_NAME_KEY, vertexPlan.getName());
+
+      if (vertexPlan.hasProcessorDescriptor()) {
+        vertexMap.put(PROCESSOR_CLASS_KEY,
+            vertexPlan.getProcessorDescriptor().getClassName());
+      }
+
+      ArrayList<Object> inEdgeIdList = new ArrayList<Object>();
+      inEdgeIdList.addAll(vertexPlan.getInEdgeIdList());
+      putInto(vertexMap, IN_EDGE_IDS_KEY, inEdgeIdList);
+
+      ArrayList<Object> outEdgeIdList = new ArrayList<Object>();
+      outEdgeIdList.addAll(vertexPlan.getOutEdgeIdList());
+      putInto(vertexMap, OUT_EDGE_IDS_KEY, outEdgeIdList);
+
+      ArrayList<Object> inputsList = new ArrayList<Object>();
+      for (DAGProtos.RootInputLeafOutputProto input :
+          vertexPlan.getInputsList()) {
+        Map<String,Object> inputMap = new LinkedHashMap<String, Object>();
+        inputMap.put(NAME_KEY, input.getName());
+        inputMap.put(CLASS_KEY, input.getEntityDescriptor().getClassName());
+        if (input.hasInitializerClassName()) {
+          inputMap.put(INITIALIZER_KEY, input.getInitializerClassName());
+        }
+        inputsList.add(inputMap);
+      }
+      putInto(vertexMap, ADDITIONAL_INPUTS_KEY, inputsList);
+
+      ArrayList<Object> outputsList = new ArrayList<Object>();
+      for (DAGProtos.RootInputLeafOutputProto output :
+          vertexPlan.getOutputsList()) {
+        Map<String,Object> outputMap = new LinkedHashMap<String, Object>();
+        outputMap.put(NAME_KEY, output.getName());
+        outputMap.put(CLASS_KEY, output.getEntityDescriptor().getClassName());
+        if (output.hasInitializerClassName()) {
+          outputMap.put(INITIALIZER_KEY, output.getInitializerClassName());
+        }
+        outputsList.add(outputMap);
+      }
+      putInto(vertexMap, ADDITIONAL_OUTPUTS_KEY, outputsList);
+
+      if (vertexPlan.hasVertexManagerPlugin()) {
+        vertexMap.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY,
+            vertexPlan.getVertexManagerPlugin().getClassName());
+      }
+
+      verticesList.add(vertexMap);
+    }
+    putInto(dagMap, VERTICES_KEY, verticesList);
+
+    ArrayList<Object> edgesList = new ArrayList<Object>();
+    for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) {
+      Map<String,Object> edgeMap = new LinkedHashMap<String, Object>();
+      edgeMap.put(EDGE_ID_KEY, edgePlan.getId());
+      edgeMap.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName());
+      edgeMap.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName());
+      edgeMap.put(DATA_MOVEMENT_TYPE_KEY,
+          edgePlan.getDataMovementType().name());
+      edgeMap.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name());
+      edgeMap.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name());
+      edgeMap.put(EDGE_SOURCE_CLASS_KEY,
+          edgePlan.getEdgeSource().getClassName());
+      edgeMap.put(EDGE_DESTINATION_CLASS_KEY,
+          edgePlan.getEdgeDestination().getClassName());
+
+      edgesList.add(edgeMap);
+    }
+    putInto(dagMap, EDGES_KEY, edgesList);
+
+    ArrayList<Object> vertexGroupsList = new ArrayList<Object>();
+    for (DAGProtos.PlanVertexGroupInfo vertexGroupInfo :
+        dagPlan.getVertexGroupsList()) {
+      Map<String,Object> groupMap = new LinkedHashMap<String, Object>();
+      groupMap.put(VERTEX_GROUP_NAME_KEY, vertexGroupInfo.getGroupName());
+      if (vertexGroupInfo.getGroupMembersCount() > 0 ) {
+        groupMap.put(VERTEX_GROUP_MEMBERS_KEY, vertexGroupInfo.getGroupMembersList());
+      }
+      if (vertexGroupInfo.getOutputsCount() > 0) {
+        groupMap.put(VERTEX_GROUP_OUTPUTS_KEY, vertexGroupInfo.getOutputsList());
+      }
+
+      if (vertexGroupInfo.getEdgeMergedInputsCount() > 0) {
+        ArrayList<Object> edgeMergedInputs = new ArrayList<Object>();
+        for (PlanGroupInputEdgeInfo edgeMergedInputInfo :
+            vertexGroupInfo.getEdgeMergedInputsList()) {
+          Map<String,Object> edgeMergedInput = new LinkedHashMap<String, Object>();
+          edgeMergedInput.put(VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY,
+              edgeMergedInputInfo.getDestVertexName());
+          if (edgeMergedInputInfo.hasMergedInput()
+            && edgeMergedInputInfo.getMergedInput().hasClassName()) {
+            edgeMergedInput.put(PROCESSOR_CLASS_KEY,
+                edgeMergedInputInfo.getMergedInput().getClassName());
+          }
+          edgeMergedInputs.add(edgeMergedInput);
+        }
+        groupMap.put(VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY, edgeMergedInputs);
+      }
+      vertexGroupsList.add(groupMap);
+    }
+    putInto(dagMap, VERTEX_GROUPS_KEY, vertexGroupsList);
+
+    return dagMap;
+  }
+
+  private static void putInto(Map<String, Object> map, String key,
+      ArrayList<Object> list) {
+    if (list.isEmpty()) {
+      return;
+    }
+    map.put(key, list);
+  }
+
+  private static ArrayList<String> convertToStringArrayList(
+      Collection<TezTaskID> collection) {
+    ArrayList<String> list = new ArrayList<String>(collection.size());
+    for (TezTaskID t : collection) {
+      list.add(t.toString());
+    }
+    return list;
+  }
+
+  public static Map<String,Object> convertVertexStatsToATSMap(
+      VertexStats vertexStats) {
+    Map<String,Object> vertexStatsMap = new LinkedHashMap<String, Object>();
+    if (vertexStats == null) {
+      return vertexStatsMap;
+    }
+
+    final String FIRST_TASK_START_TIME_KEY = "firstTaskStartTime";
+    final String FIRST_TASKS_TO_START_KEY = "firstTasksToStart";
+    final String LAST_TASK_FINISH_TIME_KEY = "lastTaskFinishTime";
+    final String LAST_TASKS_TO_FINISH_KEY = "lastTasksToFinish";
+
+    final String MIN_TASK_DURATION = "minTaskDuration";
+    final String MAX_TASK_DURATION = "maxTaskDuration";
+    final String AVG_TASK_DURATION = "avgTaskDuration";
+
+    final String SHORTEST_DURATION_TASKS = "shortestDurationTasks";
+    final String LONGEST_DURATION_TASKS = "longestDurationTasks";
+
+    vertexStatsMap.put(FIRST_TASK_START_TIME_KEY, vertexStats.getFirstTaskStartTime());
+    if (vertexStats.getFirstTasksToStart() != null
+        && !vertexStats.getFirstTasksToStart().isEmpty()) {
+      vertexStatsMap.put(FIRST_TASKS_TO_START_KEY,
+          convertToStringArrayList(vertexStats.getFirstTasksToStart()));
+    }
+    vertexStatsMap.put(LAST_TASK_FINISH_TIME_KEY, vertexStats.getLastTaskFinishTime());
+    if (vertexStats.getLastTasksToFinish() != null
+        && !vertexStats.getLastTasksToFinish().isEmpty()) {
+      vertexStatsMap.put(LAST_TASKS_TO_FINISH_KEY,
+          convertToStringArrayList(vertexStats.getLastTasksToFinish()));
+    }
+
+    vertexStatsMap.put(MIN_TASK_DURATION, vertexStats.getMinTaskDuration());
+    vertexStatsMap.put(MAX_TASK_DURATION, vertexStats.getMaxTaskDuration());
+    vertexStatsMap.put(AVG_TASK_DURATION, vertexStats.getAvgTaskDuration());
+
+    if (vertexStats.getShortestDurationTasks() != null
+        && !vertexStats.getShortestDurationTasks().isEmpty()) {
+      vertexStatsMap.put(SHORTEST_DURATION_TASKS,
+          convertToStringArrayList(vertexStats.getShortestDurationTasks()));
+    }
+    if (vertexStats.getLongestDurationTasks() != null
+        && !vertexStats.getLongestDurationTasks().isEmpty()) {
+      vertexStatsMap.put(LONGEST_DURATION_TASKS,
+          convertToStringArrayList(vertexStats.getLongestDurationTasks()));
+    }
+
+    return vertexStatsMap;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index b22b162..164bd2f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -112,7 +112,7 @@ public class TestHistoryEventsProtoConversion {
     AMLaunchedEvent event = new AMLaunchedEvent(
         ApplicationAttemptId.newInstance(
             ApplicationId.newInstance(0, 1), 1),
-        100, 100);
+        100, 100, null);
     AMLaunchedEvent deserializedEvent = (AMLaunchedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getApplicationAttemptId(),
@@ -127,7 +127,7 @@ public class TestHistoryEventsProtoConversion {
   private void testAMStartedEvent() throws Exception {
     AMStartedEvent event = new AMStartedEvent(
         ApplicationAttemptId.newInstance(
-            ApplicationId.newInstance(0, 1), 1), 100);
+            ApplicationId.newInstance(0, 1), 1), 100, "");
     AMStartedEvent deserializedEvent = (AMStartedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getApplicationAttemptId(),
@@ -142,7 +142,7 @@ public class TestHistoryEventsProtoConversion {
         ApplicationId.newInstance(0, 1), 1), 1001l,
         DAGPlan.newBuilder().setName("foo").build(),
         ApplicationAttemptId.newInstance(
-            ApplicationId.newInstance(0, 1), 1), null);
+            ApplicationId.newInstance(0, 1), 1), null, "");
     DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getApplicationAttemptId(),
@@ -160,7 +160,8 @@ public class TestHistoryEventsProtoConversion {
 
   private void testDAGInitializedEvent() throws Exception {
     DAGInitializedEvent event = new DAGInitializedEvent(
-        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,
+        "user", "dagName");
     DAGInitializedEvent deserializedEvent = (DAGInitializedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getDagID(),
@@ -171,7 +172,8 @@ public class TestHistoryEventsProtoConversion {
 
   private void testDAGStartedEvent() throws Exception {
     DAGStartedEvent event = new DAGStartedEvent(
-        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,
+        "user", "dagName");
     DAGStartedEvent deserializedEvent = (DAGStartedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getDagID(),
@@ -184,7 +186,7 @@ public class TestHistoryEventsProtoConversion {
     {
       DAGFinishedEvent event = new DAGFinishedEvent(
           TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
-          DAGState.FAILED, null, null);
+          DAGState.FAILED, null, null, "user", "dagName");
       DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(
@@ -204,7 +206,8 @@ public class TestHistoryEventsProtoConversion {
       tezCounters.getGroup("foo").findCounter("c1").increment(1);
       DAGFinishedEvent event = new DAGFinishedEvent(
           TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
-          DAGState.FAILED, "bad diagnostics", tezCounters);
+          DAGState.FAILED, "bad diagnostics", tezCounters,
+          "user", "dagName");
       DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
new file mode 100644
index 0000000..67be9f5
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.impl;
+
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryEventJsonConversion {
+
+  private ApplicationAttemptId applicationAttemptId;
+  private ApplicationId applicationId;
+  private String user = "user";
+  private Random random = new Random();
+  private TezDAGID tezDAGID;
+  private TezVertexID tezVertexID;
+  private TezTaskID tezTaskID;
+  private TezTaskAttemptID tezTaskAttemptID;
+  private DAGPlan dagPlan;
+  private ContainerId containerId;
+  private NodeId nodeId;
+
+  @Before
+  public void setup() {
+    applicationId = ApplicationId.newInstance(9999l, 1);
+    applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+    tezDAGID = TezDAGID.getInstance(applicationId, random.nextInt());
+    tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt());
+    tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt());
+    tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt());
+    dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
+    containerId = ContainerId.newInstance(applicationAttemptId, 111);
+    nodeId = NodeId.newInstance("node", 13435);
+  }
+
+  @Test
+  public void testHandlerExists() throws JSONException {
+    for (HistoryEventType eventType : HistoryEventType.values()) {
+      HistoryEvent event = null;
+      switch (eventType) {
+        case AM_LAUNCHED:
+          event = new AMLaunchedEvent(applicationAttemptId, random.nextInt(), random.nextInt(),
+              user);
+          break;
+        case AM_STARTED:
+          event = new AMStartedEvent(applicationAttemptId, random.nextInt(), user);
+          break;
+        case DAG_SUBMITTED:
+          event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
+              null, user);
+          break;
+        case DAG_INITIALIZED:
+          event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          break;
+        case DAG_STARTED:
+          event = new DAGStartedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName());
+          break;
+        case DAG_FINISHED:
+          event = new DAGFinishedEvent(tezDAGID, random.nextInt(), random.nextInt(), DAGState.ERROR,
+              null, null, user, dagPlan.getName());
+          break;
+        case VERTEX_INITIALIZED:
+          event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+              random.nextInt(), "proc", null);
+          break;
+        case VERTEX_STARTED:
+          event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
+          break;
+        case VERTEX_PARALLELISM_UPDATED:
+          event = new VertexParallelismUpdatedEvent();
+          break;
+        case VERTEX_FINISHED:
+          event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
+              random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
+              null, null, null);
+          break;
+        case TASK_STARTED:
+          event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
+          break;
+        case TASK_FINISHED:
+          event = new TaskFinishedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt(),
+              tezTaskAttemptID, TaskState.FAILED, null);
+          break;
+        case TASK_ATTEMPT_STARTED:
+          event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
+              nodeId, null, null);
+          break;
+        case TASK_ATTEMPT_FINISHED:
+          event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
+              random.nextInt(), TaskAttemptState.FAILED, null, null);
+          break;
+        case CONTAINER_LAUNCHED:
+          event = new ContainerLaunchedEvent(containerId, random.nextInt(),
+              applicationAttemptId);
+          break;
+        case CONTAINER_STOPPED:
+          event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
+          break;
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+          event = new VertexDataMovementEventsGeneratedEvent();
+          break;
+        case DAG_COMMIT_STARTED:
+          event = new DAGCommitStartedEvent();
+          break;
+        case VERTEX_COMMIT_STARTED:
+          event = new VertexCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          event = new VertexGroupCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          event = new VertexGroupCommitFinishedEvent();
+          break;
+        default:
+          Assert.fail("Unhandled event type " + eventType);
+      }
+      if (event == null || !event.isHistoryEvent()) {
+        continue;
+      }
+      HistoryEventJsonConversion.convertToJson(event);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index bbe3256..95668a9 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -54,6 +54,19 @@
 
   <profiles>
     <profile>
+      <id>hadoop24</id>
+      <activation>
+         <activeByDefault>true</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.tez</groupId>
+          <artifactId>tez-yarn-timeline-history</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
       <id>dist-tar</id>
       <activation>
         <activeByDefault>false</activeByDefault>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/pom.xml b/tez-plugins/pom.xml
new file mode 100644
index 0000000..393de62
--- /dev/null
+++ b/tez-plugins/pom.xml
@@ -0,0 +1,50 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-plugins</artifactId>
+  <packaging>pom</packaging>
+
+  <profiles>
+    <profile>
+      <id>hadoop24</id>
+      <activation>
+         <activeByDefault>true</activeByDefault>
+      </activation>
+      <modules>
+        <module>tez-yarn-timeline-history</module>
+      </modules>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/pom.xml
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/pom.xml b/tez-plugins/tez-yarn-timeline-history/pom.xml
new file mode 100644
index 0000000..150a149
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/pom.xml
@@ -0,0 +1,71 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez-plugins</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-yarn-timeline-history</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-internals</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
new file mode 100644
index 0000000..11114b6
--- /dev/null
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.util.HashSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ATSHistoryLoggingService extends HistoryLoggingService {
+
+  private static final Log LOG = LogFactory.getLog(ATSHistoryLoggingService.class);
+
+  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
+      new LinkedBlockingQueue<DAGHistoryEvent>();
+
+  private Thread eventHandlingThread;
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private int eventCounter = 0;
+  private int eventsProcessed = 0;
+  private final Object lock = new Object();
+
+  @VisibleForTesting
+  TimelineClient timelineClient;
+
+  private HashSet<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
+  private long maxTimeToWaitOnShutdown;
+
+  public ATSHistoryLoggingService() {
+    super(ATSHistoryLoggingService.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    LOG.info("Initializing ATSService");
+    timelineClient = TimelineClient.createTimelineClient();
+    timelineClient.init(conf);
+    maxTimeToWaitOnShutdown = conf.getLong(
+        TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
+        TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT);
+  }
+
+  @Override
+  public void serviceStart() {
+    LOG.info("Starting ATSService");
+    timelineClient.start();
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        DAGHistoryEvent event;
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+
+          // Log the size of the event-queue every so often.
+          if (eventCounter != 0 && eventCounter % 1000 == 0) {
+            LOG.info("Event queue stats"
+                + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
+                + ", eventQueueSize=" + eventQueue.size());
+            eventCounter = 0;
+            eventsProcessed = 0;
+          } else {
+            ++eventCounter;
+          }
+
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.info("EventQueue take interrupted. Returning");
+            return;
+          }
+
+          synchronized (lock) {
+            ++eventsProcessed;
+            try {
+              handleEvent(event);
+            } catch (Exception e) {
+              // TODO handle failures - treat as fatal or ignore?
+              LOG.warn("Error handling event", e);
+            }
+          }
+        }
+      }
+    }, "HistoryEventHandlingThread");
+    eventHandlingThread.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    LOG.info("Stopping ATSService"
+        + ", eventQueueBacklog=" + eventQueue.size());
+    stopped.set(true);
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+    }
+    synchronized (lock) {
+      if (!eventQueue.isEmpty()) {
+        LOG.warn("ATSService being stopped"
+            + ", eventQueueBacklog=" + eventQueue.size()
+            + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown);
+        long startTime = appContext.getClock().getTime();
+        if (maxTimeToWaitOnShutdown > 0) {
+          long endTime = startTime + maxTimeToWaitOnShutdown;
+          while (endTime >= appContext.getClock().getTime()) {
+            DAGHistoryEvent event = eventQueue.poll();
+            if (event == null) {
+              break;
+            }
+            try {
+              handleEvent(event);
+            } catch (Exception e) {
+              LOG.warn("Error handling event", e);
+              break;
+            }
+          }
+        }
+      }
+    }
+    if (!eventQueue.isEmpty()) {
+      LOG.warn("Did not finish flushing eventQueue before stopping ATSService"
+          + ", eventQueueBacklog=" + eventQueue.size());
+    }
+    timelineClient.stop();
+  }
+
+  public void handle(DAGHistoryEvent event) {
+    eventQueue.add(event);
+  }
+
+  private void handleEvent(DAGHistoryEvent event) {
+    HistoryEventType eventType = event.getHistoryEvent().getEventType();
+
+    TezDAGID dagId = event.getDagID();
+
+    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+      DAGSubmittedEvent dagSubmittedEvent =
+          (DAGSubmittedEvent) event.getHistoryEvent();
+      String dagName = dagSubmittedEvent.getDAGName();
+      if (dagName != null
+          && dagName.startsWith(
+          TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+        // Skip recording pre-warm DAG events
+        skippedDAGs.add(dagId);
+        return;
+      }
+    }
+    if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+      // Remove from set to keep size small
+      // No more events should be seen after this point.
+      if (skippedDAGs.remove(dagId)) {
+        return;
+      }
+    }
+
+    if (dagId != null && skippedDAGs.contains(dagId)) {
+      // Skip pre-warm DAGs
+      return;
+    }
+
+    try {
+      TimelinePutResponse response =
+          timelineClient.putEntities(
+              HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()));
+      if (response != null
+        && !response.getErrors().isEmpty()) {
+        TimelinePutError err = response.getErrors().get(0);
+        if (err.getErrorCode() != 0) {
+          LOG.warn("Could not post history event to ATS, eventType="
+              + eventType
+              + ", atsPutError=" + err.getErrorCode());
+        }
+      }
+      // Do nothing additional, ATS client library should handle throttling
+      // or auto-disable as needed
+    } catch (Exception e) {
+      LOG.warn("Could not handle history event, eventType="
+          + eventType, e);
+    }
+  }
+
+}