You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/09/22 20:02:31 UTC

[21/22] AMBARI-5707. Metrics system prototype implementation. (swagle)

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
new file mode 100644
index 0000000..590853a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ApplicationHistoryReader {
+
+  /**
+   * This method returns Application {@link ApplicationHistoryData} for the
+   * specified {@link ApplicationId}.
+   * 
+   * @param appId
+   * 
+   * @return {@link ApplicationHistoryData} for the ApplicationId.
+   * @throws IOException
+   */
+  ApplicationHistoryData getApplication(ApplicationId appId) throws IOException;
+
+  /**
+   * This method returns all Application {@link ApplicationHistoryData}s
+   * 
+   * @return map of {@link ApplicationId} to {@link ApplicationHistoryData}s.
+   * @throws IOException
+   */
+  Map<ApplicationId, ApplicationHistoryData> getAllApplications()
+      throws IOException;
+
+  /**
+   * Application can have multiple application attempts
+   * {@link ApplicationAttemptHistoryData}. This method returns the all
+   * {@link ApplicationAttemptHistoryData}s for the Application.
+   * 
+   * @param appId
+   * 
+   * @return all {@link ApplicationAttemptHistoryData}s for the Application.
+   * @throws IOException
+   */
+  Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+      getApplicationAttempts(ApplicationId appId) throws IOException;
+
+  /**
+   * This method returns {@link ApplicationAttemptHistoryData} for specified
+   * {@link ApplicationId}.
+   * 
+   * @param appAttemptId
+   *          {@link ApplicationAttemptId}
+   * @return {@link ApplicationAttemptHistoryData} for ApplicationAttemptId
+   * @throws IOException
+   */
+  ApplicationAttemptHistoryData getApplicationAttempt(
+      ApplicationAttemptId appAttemptId) throws IOException;
+
+  /**
+   * This method returns {@link ContainerHistoryData} for specified
+   * {@link ContainerId}.
+   * 
+   * @param containerId
+   *          {@link ContainerId}
+   * @return {@link ContainerHistoryData} for ContainerId
+   * @throws IOException
+   */
+  ContainerHistoryData getContainer(ContainerId containerId) throws IOException;
+
+  /**
+   * This method returns {@link ContainerHistoryData} for specified
+   * {@link ApplicationAttemptId}.
+   * 
+   * @param appAttemptId
+   *          {@link ApplicationAttemptId}
+   * @return {@link ContainerHistoryData} for ApplicationAttemptId
+   * @throws IOException
+   */
+  ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+      throws IOException;
+
+  /**
+   * This method returns Map{@link ContainerId} to {@link ContainerHistoryData}
+   * for specified {@link ApplicationAttemptId}.
+   * 
+   * @param appAttemptId
+   *          {@link ApplicationAttemptId}
+   * @return Map{@link ContainerId} to {@link ContainerHistoryData} for
+   *         ApplicationAttemptId
+   * @throws IOException
+   */
+  Map<ContainerId, ContainerHistoryData> getContainers(
+      ApplicationAttemptId appAttemptId) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
new file mode 100644
index 0000000..f622153
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * History server that keeps track of all types of history in the cluster.
+ * Application specific history to start with.
+ */
+public class ApplicationHistoryServer extends CompositeService {
+
+  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+  private static final Log LOG = LogFactory
+    .getLog(ApplicationHistoryServer.class);
+
+  ApplicationHistoryClientService ahsClientService;
+  ApplicationHistoryManager historyManager;
+  TimelineStore timelineStore;
+  TimelineMetricStore timelineMetricStore;
+  private WebApp webApp;
+
+  public ApplicationHistoryServer() {
+    super(ApplicationHistoryServer.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    historyManager = createApplicationHistory();
+    ahsClientService = createApplicationHistoryClientService(historyManager);
+    addService(ahsClientService);
+    addService((Service) historyManager);
+    timelineStore = createTimelineStore(conf);
+    timelineMetricStore = createTimelineMetricStore(conf);
+    addIfService(timelineStore);
+    addIfService(timelineMetricStore);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    DefaultMetricsSystem.initialize("ApplicationHistoryServer");
+    JvmMetrics.initSingleton("ApplicationHistoryServer", null);
+
+    startWebApp();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (webApp != null) {
+      webApp.stop();
+    }
+
+    DefaultMetricsSystem.shutdown();
+    super.serviceStop();
+  }
+
+  @Private
+  @VisibleForTesting
+  public ApplicationHistoryClientService getClientService() {
+    return this.ahsClientService;
+  }
+
+  protected ApplicationHistoryClientService
+      createApplicationHistoryClientService(
+          ApplicationHistoryManager historyManager) {
+    return new ApplicationHistoryClientService(historyManager);
+  }
+
+  protected ApplicationHistoryManager createApplicationHistory() {
+    return new ApplicationHistoryManagerImpl();
+  }
+
+  protected ApplicationHistoryManager getApplicationHistory() {
+    return this.historyManager;
+  }
+
+  static ApplicationHistoryServer launchAppHistoryServer(String[] args) {
+    Thread
+      .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    StringUtils.startupShutdownMessage(ApplicationHistoryServer.class, args,
+      LOG);
+    ApplicationHistoryServer appHistoryServer = null;
+    try {
+      appHistoryServer = new ApplicationHistoryServer();
+      ShutdownHookManager.get().addShutdownHook(
+        new CompositeServiceShutdownHook(appHistoryServer),
+        SHUTDOWN_HOOK_PRIORITY);
+      YarnConfiguration conf = new YarnConfiguration();
+      appHistoryServer.init(conf);
+      appHistoryServer.start();
+    } catch (Throwable t) {
+      LOG.fatal("Error starting ApplicationHistoryServer", t);
+      ExitUtil.terminate(-1, "Error starting ApplicationHistoryServer");
+    }
+    return appHistoryServer;
+  }
+
+  public static void main(String[] args) {
+    launchAppHistoryServer(args);
+  }
+
+  protected ApplicationHistoryManager createApplicationHistoryManager(
+      Configuration conf) {
+    return new ApplicationHistoryManagerImpl();
+  }
+
+  protected TimelineStore createTimelineStore(
+      Configuration conf) {
+    return ReflectionUtils.newInstance(conf.getClass(
+        YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class,
+        TimelineStore.class), conf);
+  }
+
+  protected TimelineMetricStore createTimelineMetricStore(Configuration conf) {
+    LOG.info("Creating metrics store.");
+    return ReflectionUtils.newInstance(HBaseTimelineMetricStore.class, conf);
+  }
+
+  protected void startWebApp() {
+    String bindAddress = WebAppUtils.getAHSWebAppURLWithoutScheme(getConfig());
+    LOG.info("Instantiating AHSWebApp at " + bindAddress);
+    try {
+      webApp =
+          WebApps
+            .$for("applicationhistory", ApplicationHistoryClientService.class,
+              ahsClientService, "ws")
+            .with(getConfig())
+            .withHttpSpnegoPrincipalKey(
+              YarnConfiguration.TIMELINE_SERVICE_WEBAPP_SPNEGO_USER_NAME_KEY)
+            .withHttpSpnegoKeytabKey(
+              YarnConfiguration.TIMELINE_SERVICE_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
+            .at(bindAddress)
+            .start(new AHSWebApp(historyManager, timelineStore, timelineMetricStore));
+    } catch (Exception e) {
+      String msg = "AHSWebApp failed to start.";
+      LOG.error(msg, e);
+      throw new YarnRuntimeException(msg, e);
+    }
+  }
+  /**
+   * @return ApplicationTimelineStore
+   */
+  @Private
+  @VisibleForTesting
+  public TimelineStore getTimelineStore() {
+    return timelineStore;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java
new file mode 100644
index 0000000..c26faef
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
+
+/**
+ * This class is the abstract of the storage of the application history data. It
+ * is a {@link Service}, such that the implementation of this class can make use
+ * of the service life cycle to initialize and cleanup the storage. Users can
+ * access the storage via {@link ApplicationHistoryReader} and
+ * {@link ApplicationHistoryWriter} interfaces.
+ * 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ApplicationHistoryStore extends Service,
+    ApplicationHistoryReader, ApplicationHistoryWriter {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
new file mode 100644
index 0000000..09ba36d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+
+/**
+ * It is the interface of writing the application history, exposing the methods
+ * of writing {@link ApplicationStartData}, {@link ApplicationFinishData}
+ * {@link ApplicationAttemptStartData}, {@link ApplicationAttemptFinishData},
+ * {@link ContainerStartData} and {@link ContainerFinishData}.
+ */
+@Private
+@Unstable
+public interface ApplicationHistoryWriter {
+
+  /**
+   * This method writes the information of <code>RMApp</code> that is available
+   * when it starts.
+   * 
+   * @param appStart
+   *          the record of the information of <code>RMApp</code> that is
+   *          available when it starts
+   * @throws IOException
+   */
+  void applicationStarted(ApplicationStartData appStart) throws IOException;
+
+  /**
+   * This method writes the information of <code>RMApp</code> that is available
+   * when it finishes.
+   * 
+   * @param appFinish
+   *          the record of the information of <code>RMApp</code> that is
+   *          available when it finishes
+   * @throws IOException
+   */
+  void applicationFinished(ApplicationFinishData appFinish) throws IOException;
+
+  /**
+   * This method writes the information of <code>RMAppAttempt</code> that is
+   * available when it starts.
+   * 
+   * @param appAttemptStart
+   *          the record of the information of <code>RMAppAttempt</code> that is
+   *          available when it starts
+   * @throws IOException
+   */
+  void applicationAttemptStarted(ApplicationAttemptStartData appAttemptStart)
+      throws IOException;
+
+  /**
+   * This method writes the information of <code>RMAppAttempt</code> that is
+   * available when it finishes.
+   * 
+   * @param appAttemptFinish
+   *          the record of the information of <code>RMAppAttempt</code> that is
+   *          available when it finishes
+   * @throws IOException
+   */
+  void
+      applicationAttemptFinished(ApplicationAttemptFinishData appAttemptFinish)
+          throws IOException;
+
+  /**
+   * This method writes the information of <code>RMContainer</code> that is
+   * available when it starts.
+   * 
+   * @param containerStart
+   *          the record of the information of <code>RMContainer</code> that is
+   *          available when it starts
+   * @throws IOException
+   */
+  void containerStarted(ContainerStartData containerStart) throws IOException;
+
+  /**
+   * This method writes the information of <code>RMContainer</code> that is
+   * available when it finishes.
+   * 
+   * @param containerFinish
+   *          the record of the information of <code>RMContainer</code> that is
+   *          available when it finishes
+   * @throws IOException
+   */
+  void containerFinished(ContainerFinishData containerFinish)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
new file mode 100644
index 0000000..4c8d745
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerStartDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * File system implementation of {@link ApplicationHistoryStore}. In this
+ * implementation, one application will have just one file in the file system,
+ * which contains all the history data of one application, and its attempts and
+ * containers. {@link #applicationStarted(ApplicationStartData)} is supposed to
+ * be invoked first when writing any history data of one application and it will
+ * open a file, while {@link #applicationFinished(ApplicationFinishData)} is
+ * supposed to be last writing operation and will close the file.
+ */
+@Public
+@Unstable
+public class FileSystemApplicationHistoryStore extends AbstractService
+    implements ApplicationHistoryStore {
+
+  private static final Log LOG = LogFactory
+    .getLog(FileSystemApplicationHistoryStore.class);
+
+  private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot";
+  private static final int MIN_BLOCK_SIZE = 256 * 1024;
+  private static final String START_DATA_SUFFIX = "_start";
+  private static final String FINISH_DATA_SUFFIX = "_finish";
+  private static final FsPermission ROOT_DIR_UMASK = FsPermission
+    .createImmutable((short) 0740);
+  private static final FsPermission HISTORY_FILE_UMASK = FsPermission
+    .createImmutable((short) 0640);
+
+  private FileSystem fs;
+  private Path rootDirPath;
+
+  private ConcurrentMap<ApplicationId, HistoryFileWriter> outstandingWriters =
+      new ConcurrentHashMap<ApplicationId, HistoryFileWriter>();
+
+  public FileSystemApplicationHistoryStore() {
+    super(FileSystemApplicationHistoryStore.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    Path fsWorkingPath =
+        new Path(conf.get(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI));
+    rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+    try {
+      fs = fsWorkingPath.getFileSystem(conf);
+      fs.mkdirs(rootDirPath);
+      fs.setPermission(rootDirPath, ROOT_DIR_UMASK);
+    } catch (IOException e) {
+      LOG.error("Error when initializing FileSystemHistoryStorage", e);
+      throw e;
+    }
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    try {
+      for (Entry<ApplicationId, HistoryFileWriter> entry : outstandingWriters
+        .entrySet()) {
+        entry.getValue().close();
+      }
+      outstandingWriters.clear();
+    } finally {
+      IOUtils.cleanup(LOG, fs);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public ApplicationHistoryData getApplication(ApplicationId appId)
+      throws IOException {
+    HistoryFileReader hfReader = getHistoryFileReader(appId);
+    try {
+      boolean readStartData = false;
+      boolean readFinishData = false;
+      ApplicationHistoryData historyData =
+          ApplicationHistoryData.newInstance(appId, null, null, null, null,
+            Long.MIN_VALUE, Long.MIN_VALUE, Long.MAX_VALUE, null,
+            FinalApplicationStatus.UNDEFINED, null);
+      while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.equals(appId.toString())) {
+          if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+            ApplicationStartData startData =
+                parseApplicationStartData(entry.value);
+            mergeApplicationHistoryData(historyData, startData);
+            readStartData = true;
+          } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+            ApplicationFinishData finishData =
+                parseApplicationFinishData(entry.value);
+            mergeApplicationHistoryData(historyData, finishData);
+            readFinishData = true;
+          }
+        }
+      }
+      if (!readStartData && !readFinishData) {
+        return null;
+      }
+      if (!readStartData) {
+        LOG.warn("Start information is missing for application " + appId);
+      }
+      if (!readFinishData) {
+        LOG.warn("Finish information is missing for application " + appId);
+      }
+      LOG.info("Completed reading history information of application " + appId);
+      return historyData;
+    } catch (IOException e) {
+      LOG.error("Error when reading history file of application " + appId);
+      throw e;
+    } finally {
+      hfReader.close();
+    }
+  }
+
+  @Override
+  public Map<ApplicationId, ApplicationHistoryData> getAllApplications()
+      throws IOException {
+    Map<ApplicationId, ApplicationHistoryData> historyDataMap =
+        new HashMap<ApplicationId, ApplicationHistoryData>();
+    FileStatus[] files = fs.listStatus(rootDirPath);
+    for (FileStatus file : files) {
+      ApplicationId appId =
+          ConverterUtils.toApplicationId(file.getPath().getName());
+      try {
+        ApplicationHistoryData historyData = getApplication(appId);
+        if (historyData != null) {
+          historyDataMap.put(appId, historyData);
+        }
+      } catch (IOException e) {
+        // Eat the exception not to disturb the getting the next
+        // ApplicationHistoryData
+        LOG.error("History information of application " + appId
+            + " is not included into the result due to the exception", e);
+      }
+    }
+    return historyDataMap;
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+      getApplicationAttempts(ApplicationId appId) throws IOException {
+    Map<ApplicationAttemptId, ApplicationAttemptHistoryData> historyDataMap =
+        new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>();
+    HistoryFileReader hfReader = getHistoryFileReader(appId);
+    try {
+      while (hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.startsWith(
+            ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) {
+          ApplicationAttemptId appAttemptId = 
+              ConverterUtils.toApplicationAttemptId(entry.key.id);
+          if (appAttemptId.getApplicationId().equals(appId)) {
+            ApplicationAttemptHistoryData historyData = 
+                historyDataMap.get(appAttemptId);
+            if (historyData == null) {
+              historyData = ApplicationAttemptHistoryData.newInstance(
+                  appAttemptId, null, -1, null, null, null,
+                  FinalApplicationStatus.UNDEFINED, null);
+              historyDataMap.put(appAttemptId, historyData);
+            }
+            if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+              mergeApplicationAttemptHistoryData(historyData,
+                  parseApplicationAttemptStartData(entry.value));
+            } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+              mergeApplicationAttemptHistoryData(historyData,
+                  parseApplicationAttemptFinishData(entry.value));
+            }
+          }
+        }
+      }
+      LOG.info("Completed reading history information of all application"
+          + " attempts of application " + appId);
+    } catch (IOException e) {
+      LOG.info("Error when reading history information of some application"
+          + " attempts of application " + appId);
+    } finally {
+      hfReader.close();
+    }
+    return historyDataMap;
+  }
+
+  @Override
+  public ApplicationAttemptHistoryData getApplicationAttempt(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    HistoryFileReader hfReader =
+        getHistoryFileReader(appAttemptId.getApplicationId());
+    try {
+      boolean readStartData = false;
+      boolean readFinishData = false;
+      ApplicationAttemptHistoryData historyData =
+          ApplicationAttemptHistoryData.newInstance(appAttemptId, null, -1,
+            null, null, null, FinalApplicationStatus.UNDEFINED, null);
+      while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.equals(appAttemptId.toString())) {
+          if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+            ApplicationAttemptStartData startData =
+                parseApplicationAttemptStartData(entry.value);
+            mergeApplicationAttemptHistoryData(historyData, startData);
+            readStartData = true;
+          } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+            ApplicationAttemptFinishData finishData =
+                parseApplicationAttemptFinishData(entry.value);
+            mergeApplicationAttemptHistoryData(historyData, finishData);
+            readFinishData = true;
+          }
+        }
+      }
+      if (!readStartData && !readFinishData) {
+        return null;
+      }
+      if (!readStartData) {
+        LOG.warn("Start information is missing for application attempt "
+            + appAttemptId);
+      }
+      if (!readFinishData) {
+        LOG.warn("Finish information is missing for application attempt "
+            + appAttemptId);
+      }
+      LOG.info("Completed reading history information of application attempt "
+          + appAttemptId);
+      return historyData;
+    } catch (IOException e) {
+      LOG.error("Error when reading history file of application attempt"
+          + appAttemptId);
+      throw e;
+    } finally {
+      hfReader.close();
+    }
+  }
+
+  @Override
+  public ContainerHistoryData getContainer(ContainerId containerId)
+      throws IOException {
+    HistoryFileReader hfReader =
+        getHistoryFileReader(containerId.getApplicationAttemptId()
+          .getApplicationId());
+    try {
+      boolean readStartData = false;
+      boolean readFinishData = false;
+      ContainerHistoryData historyData =
+          ContainerHistoryData
+            .newInstance(containerId, null, null, null, Long.MIN_VALUE,
+              Long.MAX_VALUE, null, Integer.MAX_VALUE, null);
+      while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.equals(containerId.toString())) {
+          if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+            ContainerStartData startData = parseContainerStartData(entry.value);
+            mergeContainerHistoryData(historyData, startData);
+            readStartData = true;
+          } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+            ContainerFinishData finishData =
+                parseContainerFinishData(entry.value);
+            mergeContainerHistoryData(historyData, finishData);
+            readFinishData = true;
+          }
+        }
+      }
+      if (!readStartData && !readFinishData) {
+        return null;
+      }
+      if (!readStartData) {
+        LOG.warn("Start information is missing for container " + containerId);
+      }
+      if (!readFinishData) {
+        LOG.warn("Finish information is missing for container " + containerId);
+      }
+      LOG.info("Completed reading history information of container "
+          + containerId);
+      return historyData;
+    } catch (IOException e) {
+      LOG.error("Error when reading history file of container " + containerId);
+      throw e;
+    } finally {
+      hfReader.close();
+    }
+  }
+
+  @Override
+  public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+      throws IOException {
+    ApplicationAttemptHistoryData attemptHistoryData =
+        getApplicationAttempt(appAttemptId);
+    if (attemptHistoryData == null
+        || attemptHistoryData.getMasterContainerId() == null) {
+      return null;
+    }
+    return getContainer(attemptHistoryData.getMasterContainerId());
+  }
+
+  @Override
+  public Map<ContainerId, ContainerHistoryData> getContainers(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    Map<ContainerId, ContainerHistoryData> historyDataMap =
+        new HashMap<ContainerId, ContainerHistoryData>();
+    HistoryFileReader hfReader =
+        getHistoryFileReader(appAttemptId.getApplicationId());
+    try {
+      while (hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
+          ContainerId containerId =
+              ConverterUtils.toContainerId(entry.key.id);
+          if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
+            ContainerHistoryData historyData =
+                historyDataMap.get(containerId);
+            if (historyData == null) {
+              historyData = ContainerHistoryData.newInstance(
+                  containerId, null, null, null, Long.MIN_VALUE,
+                  Long.MAX_VALUE, null, Integer.MAX_VALUE, null);
+              historyDataMap.put(containerId, historyData);
+            }
+            if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+              mergeContainerHistoryData(historyData,
+                  parseContainerStartData(entry.value));
+            } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+              mergeContainerHistoryData(historyData,
+                  parseContainerFinishData(entry.value));
+            }
+          }
+        }
+      }
+      LOG.info("Completed reading history information of all conatiners"
+          + " of application attempt " + appAttemptId);
+    } catch (IOException e) {
+      LOG.info("Error when reading history information of some containers"
+          + " of application attempt " + appAttemptId);
+    } finally {
+      hfReader.close();
+    }
+    return historyDataMap;
+  }
+
+  @Override
+  public void applicationStarted(ApplicationStartData appStart)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        outstandingWriters.get(appStart.getApplicationId());
+    if (hfWriter == null) {
+      Path applicationHistoryFile =
+          new Path(rootDirPath, appStart.getApplicationId().toString());
+      try {
+        hfWriter = new HistoryFileWriter(applicationHistoryFile);
+        LOG.info("Opened history file of application "
+            + appStart.getApplicationId());
+      } catch (IOException e) {
+        LOG.error("Error when openning history file of application "
+            + appStart.getApplicationId());
+        throw e;
+      }
+      outstandingWriters.put(appStart.getApplicationId(), hfWriter);
+    } else {
+      throw new IOException("History file of application "
+          + appStart.getApplicationId() + " is already opened");
+    }
+    assert appStart instanceof ApplicationStartDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(appStart.getApplicationId()
+        .toString(), START_DATA_SUFFIX),
+        ((ApplicationStartDataPBImpl) appStart).getProto().toByteArray());
+      LOG.info("Start information of application "
+          + appStart.getApplicationId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing start information of application "
+          + appStart.getApplicationId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void applicationFinished(ApplicationFinishData appFinish)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(appFinish.getApplicationId());
+    assert appFinish instanceof ApplicationFinishDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(appFinish.getApplicationId()
+        .toString(), FINISH_DATA_SUFFIX),
+        ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray());
+      LOG.info("Finish information of application "
+          + appFinish.getApplicationId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing finish information of application "
+          + appFinish.getApplicationId());
+      throw e;
+    } finally {
+      hfWriter.close();
+      outstandingWriters.remove(appFinish.getApplicationId());
+    }
+  }
+
+  @Override
+  public void applicationAttemptStarted(
+      ApplicationAttemptStartData appAttemptStart) throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(appAttemptStart.getApplicationAttemptId()
+          .getApplicationId());
+    assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(appAttemptStart
+        .getApplicationAttemptId().toString(), START_DATA_SUFFIX),
+        ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto()
+          .toByteArray());
+      LOG.info("Start information of application attempt "
+          + appAttemptStart.getApplicationAttemptId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing start information of application attempt "
+          + appAttemptStart.getApplicationAttemptId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void applicationAttemptFinished(
+      ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId()
+          .getApplicationId());
+    assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(appAttemptFinish
+        .getApplicationAttemptId().toString(), FINISH_DATA_SUFFIX),
+        ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto()
+          .toByteArray());
+      LOG.info("Finish information of application attempt "
+          + appAttemptFinish.getApplicationAttemptId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing finish information of application attempt "
+          + appAttemptFinish.getApplicationAttemptId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void containerStarted(ContainerStartData containerStart)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(containerStart.getContainerId()
+          .getApplicationAttemptId().getApplicationId());
+    assert containerStart instanceof ContainerStartDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(containerStart
+        .getContainerId().toString(), START_DATA_SUFFIX),
+        ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray());
+      LOG.info("Start information of container "
+          + containerStart.getContainerId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing start information of container "
+          + containerStart.getContainerId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void containerFinished(ContainerFinishData containerFinish)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(containerFinish.getContainerId()
+          .getApplicationAttemptId().getApplicationId());
+    assert containerFinish instanceof ContainerFinishDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(containerFinish
+        .getContainerId().toString(), FINISH_DATA_SUFFIX),
+        ((ContainerFinishDataPBImpl) containerFinish).getProto().toByteArray());
+      LOG.info("Finish information of container "
+          + containerFinish.getContainerId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing finish information of container "
+          + containerFinish.getContainerId());
+    }
+  }
+
+  private static ApplicationStartData parseApplicationStartData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ApplicationStartDataPBImpl(
+      ApplicationStartDataProto.parseFrom(value));
+  }
+
+  private static ApplicationFinishData parseApplicationFinishData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ApplicationFinishDataPBImpl(
+      ApplicationFinishDataProto.parseFrom(value));
+  }
+
+  private static ApplicationAttemptStartData parseApplicationAttemptStartData(
+      byte[] value) throws InvalidProtocolBufferException {
+    return new ApplicationAttemptStartDataPBImpl(
+      ApplicationAttemptStartDataProto.parseFrom(value));
+  }
+
+  private static ApplicationAttemptFinishData
+      parseApplicationAttemptFinishData(byte[] value)
+          throws InvalidProtocolBufferException {
+    return new ApplicationAttemptFinishDataPBImpl(
+      ApplicationAttemptFinishDataProto.parseFrom(value));
+  }
+
+  private static ContainerStartData parseContainerStartData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ContainerStartDataPBImpl(
+      ContainerStartDataProto.parseFrom(value));
+  }
+
+  private static ContainerFinishData parseContainerFinishData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ContainerFinishDataPBImpl(
+      ContainerFinishDataProto.parseFrom(value));
+  }
+
+  private static void mergeApplicationHistoryData(
+      ApplicationHistoryData historyData, ApplicationStartData startData) {
+    historyData.setApplicationName(startData.getApplicationName());
+    historyData.setApplicationType(startData.getApplicationType());
+    historyData.setQueue(startData.getQueue());
+    historyData.setUser(startData.getUser());
+    historyData.setSubmitTime(startData.getSubmitTime());
+    historyData.setStartTime(startData.getStartTime());
+  }
+
+  private static void mergeApplicationHistoryData(
+      ApplicationHistoryData historyData, ApplicationFinishData finishData) {
+    historyData.setFinishTime(finishData.getFinishTime());
+    historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+    historyData.setFinalApplicationStatus(finishData
+      .getFinalApplicationStatus());
+    historyData.setYarnApplicationState(finishData.getYarnApplicationState());
+  }
+
+  private static void mergeApplicationAttemptHistoryData(
+      ApplicationAttemptHistoryData historyData,
+      ApplicationAttemptStartData startData) {
+    historyData.setHost(startData.getHost());
+    historyData.setRPCPort(startData.getRPCPort());
+    historyData.setMasterContainerId(startData.getMasterContainerId());
+  }
+
+  private static void mergeApplicationAttemptHistoryData(
+      ApplicationAttemptHistoryData historyData,
+      ApplicationAttemptFinishData finishData) {
+    historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+    historyData.setTrackingURL(finishData.getTrackingURL());
+    historyData.setFinalApplicationStatus(finishData
+      .getFinalApplicationStatus());
+    historyData.setYarnApplicationAttemptState(finishData
+      .getYarnApplicationAttemptState());
+  }
+
+  private static void mergeContainerHistoryData(
+      ContainerHistoryData historyData, ContainerStartData startData) {
+    historyData.setAllocatedResource(startData.getAllocatedResource());
+    historyData.setAssignedNode(startData.getAssignedNode());
+    historyData.setPriority(startData.getPriority());
+    historyData.setStartTime(startData.getStartTime());
+  }
+
+  private static void mergeContainerHistoryData(
+      ContainerHistoryData historyData, ContainerFinishData finishData) {
+    historyData.setFinishTime(finishData.getFinishTime());
+    historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+    historyData.setContainerExitStatus(finishData.getContainerExitStatus());
+    historyData.setContainerState(finishData.getContainerState());
+  }
+
+  private HistoryFileWriter getHistoryFileWriter(ApplicationId appId)
+      throws IOException {
+    HistoryFileWriter hfWriter = outstandingWriters.get(appId);
+    if (hfWriter == null) {
+      throw new IOException("History file of application " + appId
+          + " is not opened");
+    }
+    return hfWriter;
+  }
+
+  private HistoryFileReader getHistoryFileReader(ApplicationId appId)
+      throws IOException {
+    Path applicationHistoryFile = new Path(rootDirPath, appId.toString());
+    if (!fs.exists(applicationHistoryFile)) {
+      throw new IOException("History file for application " + appId
+          + " is not found");
+    }
+    // The history file is still under writing
+    if (outstandingWriters.containsKey(appId)) {
+      throw new IOException("History file for application " + appId
+          + " is under writing");
+    }
+    return new HistoryFileReader(applicationHistoryFile);
+  }
+
+  private class HistoryFileReader {
+
+    private class Entry {
+
+      private HistoryDataKey key;
+      private byte[] value;
+
+      public Entry(HistoryDataKey key, byte[] value) {
+        this.key = key;
+        this.value = value;
+      }
+    }
+
+    private TFile.Reader reader;
+    private TFile.Reader.Scanner scanner;
+
+    public HistoryFileReader(Path historyFile) throws IOException {
+      FSDataInputStream fsdis = fs.open(historyFile);
+      reader =
+          new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
+            getConfig());
+      reset();
+    }
+
+    public boolean hasNext() {
+      return !scanner.atEnd();
+    }
+
+    public Entry next() throws IOException {
+      TFile.Reader.Scanner.Entry entry = scanner.entry();
+      DataInputStream dis = entry.getKeyStream();
+      HistoryDataKey key = new HistoryDataKey();
+      key.readFields(dis);
+      dis = entry.getValueStream();
+      byte[] value = new byte[entry.getValueLength()];
+      dis.read(value);
+      scanner.advance();
+      return new Entry(key, value);
+    }
+
+    public void reset() throws IOException {
+      IOUtils.cleanup(LOG, scanner);
+      scanner = reader.createScanner();
+    }
+
+    public void close() {
+      IOUtils.cleanup(LOG, scanner, reader);
+    }
+
+  }
+
+  private class HistoryFileWriter {
+
+    private FSDataOutputStream fsdos;
+    private TFile.Writer writer;
+
+    public HistoryFileWriter(Path historyFile) throws IOException {
+      if (fs.exists(historyFile)) {
+        fsdos = fs.append(historyFile);
+      } else {
+        fsdos = fs.create(historyFile);
+      }
+      fs.setPermission(historyFile, HISTORY_FILE_UMASK);
+      writer =
+          new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
+            YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
+            YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
+            getConfig());
+    }
+
+    public synchronized void close() {
+      IOUtils.cleanup(LOG, writer, fsdos);
+    }
+
+    public synchronized void writeHistoryData(HistoryDataKey key, byte[] value)
+        throws IOException {
+      DataOutputStream dos = null;
+      try {
+        dos = writer.prepareAppendKey(-1);
+        key.write(dos);
+      } finally {
+        IOUtils.cleanup(LOG, dos);
+      }
+      try {
+        dos = writer.prepareAppendValue(value.length);
+        dos.write(value);
+      } finally {
+        IOUtils.cleanup(LOG, dos);
+      }
+    }
+
+  }
+
+  private static class HistoryDataKey implements Writable {
+
+    private String id;
+
+    private String suffix;
+
+    public HistoryDataKey() {
+      this(null, null);
+    }
+
+    public HistoryDataKey(String id, String suffix) {
+      this.id = id;
+      this.suffix = suffix;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(id);
+      out.writeUTF(suffix);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readUTF();
+      suffix = in.readUTF();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
new file mode 100644
index 0000000..c226ad3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+
+/**
+ * In-memory implementation of {@link ApplicationHistoryStore}. This
+ * implementation is for test purpose only. If users improperly instantiate it,
+ * they may encounter reading and writing history data in different memory
+ * store.
+ * 
+ */
+@Private
+@Unstable
+public class MemoryApplicationHistoryStore extends AbstractService implements
+    ApplicationHistoryStore {
+
+  private final ConcurrentMap<ApplicationId, ApplicationHistoryData> applicationData =
+      new ConcurrentHashMap<ApplicationId, ApplicationHistoryData>();
+  private final ConcurrentMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>> applicationAttemptData =
+      new ConcurrentHashMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>>();
+  private final ConcurrentMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>> containerData =
+      new ConcurrentHashMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>>();
+
+  public MemoryApplicationHistoryStore() {
+    super(MemoryApplicationHistoryStore.class.getName());
+  }
+
+  @Override
+  public Map<ApplicationId, ApplicationHistoryData> getAllApplications() {
+    return new HashMap<ApplicationId, ApplicationHistoryData>(applicationData);
+  }
+
+  @Override
+  public ApplicationHistoryData getApplication(ApplicationId appId) {
+    return applicationData.get(appId);
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+      getApplicationAttempts(ApplicationId appId) {
+    ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+        applicationAttemptData.get(appId);
+    if (subMap == null) {
+      return Collections
+        .<ApplicationAttemptId, ApplicationAttemptHistoryData> emptyMap();
+    } else {
+      return new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>(
+        subMap);
+    }
+  }
+
+  @Override
+  public ApplicationAttemptHistoryData getApplicationAttempt(
+      ApplicationAttemptId appAttemptId) {
+    ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+        applicationAttemptData.get(appAttemptId.getApplicationId());
+    if (subMap == null) {
+      return null;
+    } else {
+      return subMap.get(appAttemptId);
+    }
+  }
+
+  @Override
+  public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) {
+    ApplicationAttemptHistoryData appAttempt =
+        getApplicationAttempt(appAttemptId);
+    if (appAttempt == null || appAttempt.getMasterContainerId() == null) {
+      return null;
+    } else {
+      return getContainer(appAttempt.getMasterContainerId());
+    }
+  }
+
+  @Override
+  public ContainerHistoryData getContainer(ContainerId containerId) {
+    Map<ContainerId, ContainerHistoryData> subMap =
+        containerData.get(containerId.getApplicationAttemptId());
+    if (subMap == null) {
+      return null;
+    } else {
+      return subMap.get(containerId);
+    }
+  }
+
+  @Override
+  public Map<ContainerId, ContainerHistoryData> getContainers(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
+        containerData.get(appAttemptId);
+    if (subMap == null) {
+      return Collections.<ContainerId, ContainerHistoryData> emptyMap();
+    } else {
+      return new HashMap<ContainerId, ContainerHistoryData>(subMap);
+    }
+  }
+
+  @Override
+  public void applicationStarted(ApplicationStartData appStart)
+      throws IOException {
+    ApplicationHistoryData oldData =
+        applicationData.putIfAbsent(appStart.getApplicationId(),
+          ApplicationHistoryData.newInstance(appStart.getApplicationId(),
+            appStart.getApplicationName(), appStart.getApplicationType(),
+            appStart.getQueue(), appStart.getUser(), appStart.getSubmitTime(),
+            appStart.getStartTime(), Long.MAX_VALUE, null, null, null));
+    if (oldData != null) {
+      throw new IOException("The start information of application "
+          + appStart.getApplicationId() + " is already stored.");
+    }
+  }
+
+  @Override
+  public void applicationFinished(ApplicationFinishData appFinish)
+      throws IOException {
+    ApplicationHistoryData data =
+        applicationData.get(appFinish.getApplicationId());
+    if (data == null) {
+      throw new IOException("The finish information of application "
+          + appFinish.getApplicationId() + " is stored before the start"
+          + " information.");
+    }
+    // Make the assumption that YarnApplicationState should not be null if
+    // the finish information is already recorded
+    if (data.getYarnApplicationState() != null) {
+      throw new IOException("The finish information of application "
+          + appFinish.getApplicationId() + " is already stored.");
+    }
+    data.setFinishTime(appFinish.getFinishTime());
+    data.setDiagnosticsInfo(appFinish.getDiagnosticsInfo());
+    data.setFinalApplicationStatus(appFinish.getFinalApplicationStatus());
+    data.setYarnApplicationState(appFinish.getYarnApplicationState());
+  }
+
+  @Override
+  public void applicationAttemptStarted(
+      ApplicationAttemptStartData appAttemptStart) throws IOException {
+    ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+        getSubMap(appAttemptStart.getApplicationAttemptId().getApplicationId());
+    ApplicationAttemptHistoryData oldData =
+        subMap.putIfAbsent(appAttemptStart.getApplicationAttemptId(),
+          ApplicationAttemptHistoryData.newInstance(
+            appAttemptStart.getApplicationAttemptId(),
+            appAttemptStart.getHost(), appAttemptStart.getRPCPort(),
+            appAttemptStart.getMasterContainerId(), null, null, null, null));
+    if (oldData != null) {
+      throw new IOException("The start information of application attempt "
+          + appAttemptStart.getApplicationAttemptId() + " is already stored.");
+    }
+  }
+
+  @Override
+  public void applicationAttemptFinished(
+      ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+    ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+        getSubMap(appAttemptFinish.getApplicationAttemptId().getApplicationId());
+    ApplicationAttemptHistoryData data =
+        subMap.get(appAttemptFinish.getApplicationAttemptId());
+    if (data == null) {
+      throw new IOException("The finish information of application attempt "
+          + appAttemptFinish.getApplicationAttemptId() + " is stored before"
+          + " the start information.");
+    }
+    // Make the assumption that YarnApplicationAttemptState should not be null
+    // if the finish information is already recorded
+    if (data.getYarnApplicationAttemptState() != null) {
+      throw new IOException("The finish information of application attempt "
+          + appAttemptFinish.getApplicationAttemptId() + " is already stored.");
+    }
+    data.setTrackingURL(appAttemptFinish.getTrackingURL());
+    data.setDiagnosticsInfo(appAttemptFinish.getDiagnosticsInfo());
+    data
+      .setFinalApplicationStatus(appAttemptFinish.getFinalApplicationStatus());
+    data.setYarnApplicationAttemptState(appAttemptFinish
+      .getYarnApplicationAttemptState());
+  }
+
+  private ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>
+      getSubMap(ApplicationId appId) {
+    applicationAttemptData
+      .putIfAbsent(
+        appId,
+        new ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>());
+    return applicationAttemptData.get(appId);
+  }
+
+  @Override
+  public void containerStarted(ContainerStartData containerStart)
+      throws IOException {
+    ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
+        getSubMap(containerStart.getContainerId().getApplicationAttemptId());
+    ContainerHistoryData oldData =
+        subMap.putIfAbsent(containerStart.getContainerId(),
+          ContainerHistoryData.newInstance(containerStart.getContainerId(),
+            containerStart.getAllocatedResource(),
+            containerStart.getAssignedNode(), containerStart.getPriority(),
+            containerStart.getStartTime(), Long.MAX_VALUE, null,
+            Integer.MAX_VALUE, null));
+    if (oldData != null) {
+      throw new IOException("The start information of container "
+          + containerStart.getContainerId() + " is already stored.");
+    }
+  }
+
+  @Override
+  public void containerFinished(ContainerFinishData containerFinish)
+      throws IOException {
+    ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
+        getSubMap(containerFinish.getContainerId().getApplicationAttemptId());
+    ContainerHistoryData data = subMap.get(containerFinish.getContainerId());
+    if (data == null) {
+      throw new IOException("The finish information of container "
+          + containerFinish.getContainerId() + " is stored before"
+          + " the start information.");
+    }
+    // Make the assumption that ContainerState should not be null if
+    // the finish information is already recorded
+    if (data.getContainerState() != null) {
+      throw new IOException("The finish information of container "
+          + containerFinish.getContainerId() + " is already stored.");
+    }
+    data.setFinishTime(containerFinish.getFinishTime());
+    data.setDiagnosticsInfo(containerFinish.getDiagnosticsInfo());
+    data.setContainerExitStatus(containerFinish.getContainerExitStatus());
+    data.setContainerState(containerFinish.getContainerState());
+  }
+
+  private ConcurrentMap<ContainerId, ContainerHistoryData> getSubMap(
+      ApplicationAttemptId appAttemptId) {
+    containerData.putIfAbsent(appAttemptId,
+      new ConcurrentHashMap<ContainerId, ContainerHistoryData>());
+    return containerData.get(appAttemptId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java
new file mode 100644
index 0000000..3660c10
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+
+/**
+ * Dummy implementation of {@link ApplicationHistoryStore}. If this
+ * implementation is used, no history data will be persisted.
+ * 
+ */
+@Unstable
+@Private
+public class NullApplicationHistoryStore extends AbstractService implements
+    ApplicationHistoryStore {
+
+  public NullApplicationHistoryStore() {
+    super(NullApplicationHistoryStore.class.getName());
+  }
+
+  @Override
+  public void applicationStarted(ApplicationStartData appStart)
+      throws IOException {
+  }
+
+  @Override
+  public void applicationFinished(ApplicationFinishData appFinish)
+      throws IOException {
+  }
+
+  @Override
+  public void applicationAttemptStarted(
+      ApplicationAttemptStartData appAttemptStart) throws IOException {
+  }
+
+  @Override
+  public void applicationAttemptFinished(
+      ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+  }
+
+  @Override
+  public void containerStarted(ContainerStartData containerStart)
+      throws IOException {
+  }
+
+  @Override
+  public void containerFinished(ContainerFinishData containerFinish)
+      throws IOException {
+  }
+
+  @Override
+  public ApplicationHistoryData getApplication(ApplicationId appId)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public Map<ApplicationId, ApplicationHistoryData> getAllApplications()
+      throws IOException {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+      getApplicationAttempts(ApplicationId appId) throws IOException {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public ApplicationAttemptHistoryData getApplicationAttempt(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    return null;
+  }
+
+  @Override
+  public ContainerHistoryData getContainer(ContainerId containerId)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public Map<ContainerId, ContainerHistoryData> getContainers(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    return Collections.emptyMap();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
new file mode 100644
index 0000000..e702fc0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+
+public abstract class AbstractTimelineAggregator implements Runnable {
+  protected final PhoenixHBaseAccessor hBaseAccessor;
+  protected final String CHECKPOINT_LOCATION;
+  private final Log LOG;
+  static final long checkpointDelay = 120000;
+  static final Integer RESULTSET_FETCH_SIZE = 5000;
+  private static final ObjectMapper mapper;
+
+  static {
+    //SimpleModule simpleModule = new SimpleModule("MetricAggregator", new Version(1, 0, 0, null));
+    //simpleModule.addSerializer(new MetricAggregateSerializer());
+    mapper = new ObjectMapper();
+    //mapper.registerModule(simpleModule);
+  }
+
+  public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                    String checkpointLocation) {
+    this.hBaseAccessor = hBaseAccessor;
+    this.CHECKPOINT_LOCATION = checkpointLocation;
+    this.LOG = LogFactory.getLog(this.getClass());
+  }
+
+  @Override
+  public void run() {
+    LOG.info("Started Timeline aggregator thread @ " + new Date());
+    Long SLEEP_INTERVAL = getSleepInterval();
+
+    while (true) {
+      long currentTime = System.currentTimeMillis();
+      long lastCheckPointTime = -1;
+
+      try {
+        lastCheckPointTime = readCheckPoint();
+        if (isLastCheckPointTooOld(lastCheckPointTime)) {
+          LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
+            "lastCheckPointTime = " + lastCheckPointTime);
+          lastCheckPointTime = -1;
+        }
+        if (lastCheckPointTime == -1) {
+          // Assuming first run, save checkpoint and sleep.
+          // Set checkpoint to 2 minutes in the past to allow the
+          // agents/collectors to catch up
+          saveCheckPoint(currentTime - checkpointDelay);
+        }
+      } catch (IOException io) {
+        LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
+      }
+
+      if (lastCheckPointTime != -1) {
+        LOG.info("Last check point time: " + lastCheckPointTime + ", " +
+          "lagBy: " + ((System.currentTimeMillis() - lastCheckPointTime)) / 1000);
+        boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
+        if (success) {
+          try {
+            saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
+          } catch (IOException io) {
+            LOG.warn("Error saving checkpoint, restarting aggregation at " +
+              "previous checkpoint.");
+          }
+        }
+      }
+
+      try {
+        Thread.sleep(SLEEP_INTERVAL);
+      } catch (InterruptedException e) {
+        LOG.info("Sleep interrupted, continuing with aggregation.");
+      }
+    }
+  }
+
+  private boolean isLastCheckPointTooOld(long checkpoint) {
+    return checkpoint != -1 &&
+      ((System.currentTimeMillis() - checkpoint) > getCheckpointCutOffInterval());
+  }
+
+  private long readCheckPoint() {
+    try {
+      File checkpoint = new File(CHECKPOINT_LOCATION);
+      if (checkpoint.exists()) {
+        String contents = FileUtils.readFileToString(checkpoint);
+        if (contents != null && !contents.isEmpty()) {
+          return Long.parseLong(contents);
+        }
+      }
+    } catch (IOException io) {
+      LOG.debug(io);
+    }
+    return -1;
+  }
+
+  private void saveCheckPoint(long checkpointTime) throws IOException {
+    File checkpoint = new File(CHECKPOINT_LOCATION);
+    if (!checkpoint.exists()) {
+      boolean done = checkpoint.createNewFile();
+      if (!done) {
+        throw new IOException("Could not create checkpoint at location, " +
+          CHECKPOINT_LOCATION);
+      }
+    }
+    FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
+  }
+
+  // TODO: Abstract out doWork implementation for cluster and host levels
+  protected abstract boolean doWork(long startTime, long endTime);
+
+  protected abstract Long getSleepInterval();
+
+  protected abstract Long getCheckpointCutOffInterval();
+
+  @JsonSubTypes({ @JsonSubTypes.Type(value = MetricClusterAggregate.class),
+                @JsonSubTypes.Type(value = MetricHostAggregate.class) })
+  @InterfaceAudience.Public
+  @InterfaceStability.Unstable
+  public static class MetricAggregate {
+    protected Double sum = 0.0;
+    protected Double deviation;
+    protected Double max = Double.MIN_VALUE;
+    protected Double min = Double.MAX_VALUE;
+
+    public MetricAggregate() {}
+
+    protected MetricAggregate(Double sum, Double deviation, Double max, Double min) {
+      this.sum = sum;
+      this.deviation = deviation;
+      this.max = max;
+      this.min = min;
+    }
+
+    void updateSum(Double sum) {
+      this.sum += sum;
+    }
+
+    void updateMax(Double max) {
+      if (max > this.max) {
+        this.max = max;
+      }
+    }
+
+    void updateMin(Double min) {
+      if (min < this.min) {
+        this.min = min;
+      }
+    }
+
+    @JsonProperty("sum")
+    Double getSum() {
+      return sum;
+    }
+
+    @JsonProperty("deviation")
+    Double getDeviation() {
+      return deviation;
+    }
+
+    @JsonProperty("max")
+    Double getMax() {
+      return max;
+    }
+
+    @JsonProperty("min")
+    Double getMin() {
+      return min;
+    }
+
+    public void setSum(Double sum) {
+      this.sum = sum;
+    }
+
+    public void setDeviation(Double deviation) {
+      this.deviation = deviation;
+    }
+
+    public void setMax(Double max) {
+      this.max = max;
+    }
+
+    public void setMin(Double min) {
+      this.min = min;
+    }
+
+    public String toJSON() throws IOException {
+      return mapper.writeValueAsString(this);
+    }
+  }
+
+  public static class MetricClusterAggregate extends MetricAggregate {
+    private int numberOfHosts;
+
+    @JsonCreator
+    public MetricClusterAggregate() {}
+
+    MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+                           Double max, Double min) {
+      super(sum, deviation, max, min);
+      this.numberOfHosts = numberOfHosts;
+    }
+
+    @JsonProperty("numberOfHosts")
+    int getNumberOfHosts() {
+      return numberOfHosts;
+    }
+
+    void updateNumberOfHosts(int count) {
+      this.numberOfHosts += count;
+    }
+
+    public void setNumberOfHosts(int numberOfHosts) {
+      this.numberOfHosts = numberOfHosts;
+    }
+
+    @Override
+    public String toString() {
+      return "MetricAggregate{" +
+        "sum=" + sum +
+        ", numberOfHosts=" + numberOfHosts +
+        ", deviation=" + deviation +
+        ", max=" + max +
+        ", min=" + min +
+        '}';
+    }
+  }
+
+  /**
+   * Represents a collection of minute based aggregation of values for
+   * resolution greater than a minute.
+   */
+  public static class MetricHostAggregate extends MetricAggregate {
+
+    @JsonCreator
+    public MetricHostAggregate() {
+      super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+    }
+
+    /**
+     * Find and update min, max and avg for a minute
+     */
+    void updateAggregates(MetricHostAggregate hostAggregate) {
+      updateMax(hostAggregate.getMax());
+      updateMin(hostAggregate.getMin());
+      updateSum(hostAggregate.getSum());
+    }
+
+    /**
+     * Reuse sum to indicate average for a host for the hour
+     */
+    @Override
+    void updateSum(Double sum) {
+      this.sum = (this.sum + sum) / 2;
+    }
+
+    @Override
+    public String toString() {
+      return "MetricHostAggregate{" +
+        "sum=" + sum +
+        ", deviation=" + deviation +
+        ", max=" + max +
+        ", min=" + min +
+        '}';
+    }
+  }
+}