You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/09/22 20:02:30 UTC

[20/22] AMBARI-5707. Metrics system prototype implementation. (swagle)

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
new file mode 100644
index 0000000..3f174ef
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.conf.YarnConfig;
+import java.io.IOException;
+import java.net.URL;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+
+public class HBaseTimelineMetricStore extends AbstractService
+  implements TimelineMetricStore {
+
+  static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class);
+  static final String HBASE_CONF = "hbase-site.xml";
+  static final String DEFAULT_CHECKPOINT_LOCATION = "/tmp";
+  static final String AGGREGATOR_CHECKPOINT_FILE =
+    "timeline-metrics-aggregator-checkpoint";
+  static final String MINUTE_AGGREGATE_ROLLUP_CHECKPOINT_FILE =
+    "timeline-metrics-minute-aggregator-checkpoint";
+  static final String HOURLY_AGGREGATE_ROLLUP_CHECKPOINT_FILE =
+    "timeline-metrics-hourly-aggregator-checkpoint";
+  static final String HOURLY_ROLLUP_CHECKPOINT_FILE =
+    "timeline-metrics-hourly-checkpoint";
+  private PhoenixHBaseAccessor hBaseAccessor;
+
+  /**
+   * Construct the service.
+   *
+   */
+  public HBaseTimelineMetricStore() {
+    super(HBaseTimelineMetricStore.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    URL resUrl = getClass().getClassLoader().getResource(HBASE_CONF);
+    LOG.info("Found hbase site configuration: " + resUrl);
+    Configuration hbaseConf;
+    if (resUrl != null) {
+      hbaseConf = new Configuration(true);
+      hbaseConf.addResource(resUrl.toURI().toURL());
+      hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf);
+      hBaseAccessor.initMetricSchema();
+
+      String checkpointLocation = FilenameUtils.concat(conf.get(
+        YarnConfig.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR,
+        DEFAULT_CHECKPOINT_LOCATION), AGGREGATOR_CHECKPOINT_FILE);
+
+      // Start the cluster aggregator
+      TimelineMetricClusterAggregator clusterAggregator =
+        new TimelineMetricClusterAggregator(hBaseAccessor, checkpointLocation);
+      Thread aggregatorThread = new Thread(clusterAggregator);
+      aggregatorThread.start();
+
+      // Start the hourly cluster aggregator
+      String clusterAggregatorHourlyCheckpoint = FilenameUtils.concat(conf.get(
+        YarnConfig.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR,
+        DEFAULT_CHECKPOINT_LOCATION), HOURLY_AGGREGATE_ROLLUP_CHECKPOINT_FILE);
+
+      TimelineMetricClusterAggregatorHourly clusterAggregatorHourly = new
+        TimelineMetricClusterAggregatorHourly(hBaseAccessor,
+        clusterAggregatorHourlyCheckpoint);
+      Thread rollupAggregatorThread = new Thread(clusterAggregatorHourly);
+      rollupAggregatorThread.start();
+
+      // Start the 5 minute aggregator
+      String minuteCheckpoint = FilenameUtils.concat(conf.get(
+        YarnConfig.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR,
+        DEFAULT_CHECKPOINT_LOCATION), MINUTE_AGGREGATE_ROLLUP_CHECKPOINT_FILE);
+      TimelineMetricAggregatorMinute minuteAggregator = new
+        TimelineMetricAggregatorMinute(hBaseAccessor, minuteCheckpoint);
+
+      Thread minuteAggregatorThread = new Thread(minuteAggregator);
+      minuteAggregatorThread.start();
+
+      // Start hourly host aggregator
+      String hostAggregatorHourlyCheckpoint = FilenameUtils.concat(conf.get(
+        YarnConfig.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR,
+        DEFAULT_CHECKPOINT_LOCATION), HOURLY_ROLLUP_CHECKPOINT_FILE);
+
+      TimelineMetricAggregatorHourly aggregatorHourly = new
+        TimelineMetricAggregatorHourly(hBaseAccessor, hostAggregatorHourlyCheckpoint);
+      Thread aggregatorHourlyThread = new Thread(aggregatorHourly);
+      aggregatorHourlyThread.start();
+
+    } else {
+      throw new IllegalStateException("Unable to initialize the metrics " +
+        "subsystem. No hbase-site present in the classpath.");
+    }
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  @Override
+  public TimelineMetrics getTimelineMetrics(List<String> metricNames,
+                                            String hostname, String applicationId, String instanceId,
+                                            Long startTime, Long endTime, Integer limit,
+                                            boolean groupedByHosts) throws SQLException, IOException {
+
+    Condition condition = new Condition(metricNames, hostname, applicationId,
+      instanceId, startTime, endTime, limit, groupedByHosts);
+
+    if (hostname == null) {
+      return hBaseAccessor.getAggregateMetricRecords(condition);
+    }
+
+    return hBaseAccessor.getMetricRecords(condition);
+  }
+
+  @Override
+  public TimelineMetric getTimelineMetric(String metricName, String hostname,
+                                          String applicationId, String instanceId, Long startTime,
+                                          Long endTime, Integer limit)
+    throws SQLException, IOException {
+
+    TimelineMetrics metrics = hBaseAccessor.getMetricRecords(
+      new Condition(Collections.singletonList(metricName), hostname,
+        applicationId, instanceId, startTime, endTime, limit, true)
+    );
+
+    TimelineMetric metric = new TimelineMetric();
+    List<TimelineMetric> metricList = metrics.getMetrics();
+
+    if (metricList != null && !metricList.isEmpty()) {
+      metric.setMetricName(metricList.get(0).getMetricName());
+      metric.setAppId(metricList.get(0).getAppId());
+      metric.setInstanceId(metricList.get(0).getInstanceId());
+      metric.setHostName(metricList.get(0).getHostName());
+      // Assumption that metrics are ordered by start time
+      metric.setStartTime(metricList.get(0).getStartTime());
+      Map<Long, Double> metricRecords = new HashMap<Long, Double>();
+      for (TimelineMetric timelineMetric : metricList) {
+        metricRecords.putAll(timelineMetric.getMetricValues());
+      }
+      metric.setMetricValues(metricRecords);
+    }
+
+    return metric;
+  }
+
+
+  @Override
+  public TimelinePutResponse putMetrics(TimelineMetrics metrics)
+    throws SQLException, IOException {
+
+    // Error indicated by the Sql exception
+    TimelinePutResponse response = new TimelinePutResponse();
+
+    hBaseAccessor.insertMetricRecords(metrics);
+
+    return response;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
new file mode 100644
index 0000000..d39b1cb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtilsExt;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricClusterAggregate;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricHostAggregate;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
+
+/**
+ * Provides a facade over the Phoenix API to access HBase schema
+ */
+public class PhoenixHBaseAccessor {
+
+  private final Configuration conf;
+  static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
+  private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+
+  private static final String ZOOKEEPER_CLIENT_PORT =
+    "hbase.zookeeper.property.clientPort";
+  private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+  private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+  static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
+
+  public PhoenixHBaseAccessor(Configuration conf) {
+    this.conf = conf;
+    try {
+      Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+    } catch (ClassNotFoundException e) {
+      LOG.error("Phoenix client jar not found in the classpath.");
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Get JDBC connection to HBase store. Assumption is that the hbase
+   * configuration is present on the classpath and loaded by the caller into
+   * the Configuration object.
+   * Phoenix already caches the HConnection between the client and HBase
+   * cluster.
+   * @return @java.sql.Connection
+   */
+  protected Connection getConnection() {
+    Connection connection = null;
+    String zookeeperClientPort = conf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181");
+    String zookeeperQuorum = conf.getTrimmed(ZOOKEEPER_QUORUM);
+    String znodeParent = conf.getTrimmed(ZNODE_PARENT, "/hbase");
+
+    if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+      throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+        "access HBase store using Phoenix.");
+    }
+
+    String url = String.format(connectionUrl, zookeeperQuorum,
+      zookeeperClientPort, znodeParent);
+
+    LOG.debug("Metric store connection url: " + url);
+
+    try {
+      connection = DriverManager.getConnection(url);
+    } catch (SQLException e) {
+      LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+    }
+
+    return connection;
+  }
+
+  @SuppressWarnings("unchecked")
+  static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
+      throws SQLException, IOException {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(rs.getString("METRIC_NAME"));
+    metric.setAppId(rs.getString("APP_ID"));
+    metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    metric.setHostName(rs.getString("HOSTNAME"));
+    metric.setTimestamp(rs.getLong("TIMESTAMP"));
+    metric.setStartTime(rs.getLong("START_TIME"));
+    metric.setType(rs.getString("UNITS"));
+    metric.setMetricValues(
+      (Map<Long, Double>) TimelineUtilsExt.readMetricFromJSON(
+        rs.getString("METRICS")));
+    return metric;
+  }
+
+  static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
+      throws SQLException, IOException {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(rs.getString("METRIC_NAME"));
+    metric.setAppId(rs.getString("APP_ID"));
+    metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    metric.setHostName(rs.getString("HOSTNAME"));
+    metric.setTimestamp(rs.getLong("TIMESTAMP"));
+    metric.setType(rs.getString("UNITS"));
+    return metric;
+  }
+
+  static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
+      throws SQLException {
+    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+    metricHostAggregate.setSum(rs.getDouble("METRIC_AVG"));
+    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+    metricHostAggregate.setDeviation(0.0);
+    return metricHostAggregate;
+  }
+
+
+  protected void initMetricSchema() {
+    Connection conn = getConnection();
+    Statement stmt = null;
+
+    try {
+      LOG.info("Initializing metrics schema...");
+      stmt = conn.createStatement();
+      stmt.executeUpdate(CREATE_METRICS_TABLE_SQL);
+      stmt.executeUpdate(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL);
+      stmt.executeUpdate(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL);
+      stmt.executeUpdate(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL);
+      stmt.executeUpdate(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL);
+      conn.commit();
+    } catch (SQLException sql) {
+      LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", sql);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  public void insertMetricRecords(TimelineMetrics metrics)
+      throws SQLException, IOException {
+
+    List<TimelineMetric> timelineMetrics = metrics.getMetrics();
+    if (timelineMetrics == null || timelineMetrics.isEmpty()) {
+      LOG.debug("Empty metrics insert request.");
+      return;
+    }
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    long currentTime = System.currentTimeMillis();
+
+    try {
+      stmt = conn.prepareStatement(UPSERT_METRICS_SQL);
+
+      for (TimelineMetric metric : timelineMetrics) {
+        stmt.clearParameters();
+
+        LOG.trace("host: " + metric.getHostName() + ", " +
+          "values: " + metric.getMetricValues());
+        Double[] aggregates = calculateAggregates(metric.getMetricValues());
+
+        stmt.setString(1, metric.getMetricName());
+        stmt.setString(2, metric.getHostName());
+        stmt.setString(3, metric.getAppId());
+        stmt.setString(4, metric.getInstanceId());
+        stmt.setLong(5, currentTime);
+        stmt.setLong(6, metric.getStartTime());
+        stmt.setString(7, metric.getType());
+        stmt.setDouble(8, aggregates[0]);
+        stmt.setDouble(9, aggregates[1]);
+        stmt.setDouble(10, aggregates[2]);
+        stmt.setString(11,
+          TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()));
+
+        try {
+          stmt.executeUpdate();
+        } catch (SQLException sql) {
+          LOG.error(sql);
+        }
+      }
+
+      conn.commit();
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  private Double[] calculateAggregates(Map<Long, Double> metricValues) {
+    Double[] values = new Double[3];
+    Double max = Double.MIN_VALUE;
+    Double min = Double.MAX_VALUE;
+    Double avg = 0.0;
+    if (metricValues != null && !metricValues.isEmpty()) {
+      for (Double value : metricValues.values()) {
+        // TODO: Some nulls in data - need to investigate null values from host
+        if (value != null) {
+          if (value > max) {
+            max  = value;
+          }
+          if (value < min) {
+            min = value;
+          }
+          avg += value;
+        }
+      }
+      avg /= metricValues.values().size();
+    }
+    values[0] = max != Double.MIN_VALUE ? max : 0.0;
+    values[1] = min != Double.MAX_VALUE ? min : 0.0;
+    values[2] = avg;
+    return values;
+  }
+
+  @SuppressWarnings("unchecked")
+  public TimelineMetrics getMetricRecords(final Condition condition)
+      throws SQLException, IOException {
+
+    if (condition.isEmpty()) {
+      throw new SQLException("No filter criteria specified.");
+    }
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    TimelineMetrics metrics = new TimelineMetrics();
+
+    try {
+      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+      ResultSet rs = stmt.executeQuery();
+
+      while (rs.next()) {
+        TimelineMetric metric = getTimelineMetricFromResultSet(rs);
+
+        if (condition.isGrouped()) {
+          metrics.addOrMergeTimelineMetric(metric);
+        } else {
+          metrics.getMetrics().add(metric);
+        }
+      }
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+    return metrics;
+  }
+
+  public void saveHostAggregateRecords(Map<TimelineMetric,
+      MetricHostAggregate> hostAggregateMap, String phoenixTableName)
+      throws SQLException {
+
+    if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) {
+      Connection conn = getConnection();
+      PreparedStatement stmt = null;
+
+      long start = System.currentTimeMillis();
+      int rowCount = 0;
+
+      try {
+        stmt = conn.prepareStatement(
+          String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
+
+        for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
+            hostAggregateMap.entrySet()) {
+
+          TimelineMetric metric = metricAggregate.getKey();
+          MetricHostAggregate hostAggregate = metricAggregate.getValue();
+
+          rowCount++;
+          stmt.clearParameters();
+          stmt.setString(1, metric.getMetricName());
+          stmt.setString(2, metric.getHostName());
+          stmt.setString(3, metric.getAppId());
+          stmt.setString(4, metric.getInstanceId());
+          stmt.setLong(5, metric.getTimestamp());
+          stmt.setString(6, metric.getType());
+          stmt.setDouble(7, hostAggregate.getSum());
+          stmt.setDouble(8, hostAggregate.getMax());
+          stmt.setDouble(9, hostAggregate.getMin());
+
+          try {
+            stmt.executeUpdate();
+          } catch (SQLException sql) {
+            LOG.error(sql);
+          }
+
+          if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+            conn.commit();
+            rowCount = 0;
+          }
+
+        }
+
+        conn.commit();
+
+      } finally {
+        if (stmt != null) {
+          try {
+            stmt.close();
+          } catch (SQLException e) {
+            // Ignore
+          }
+        }
+        if (conn != null) {
+          try {
+            conn.close();
+          } catch (SQLException sql) {
+            // Ignore
+          }
+        }
+      }
+
+      long end = System.currentTimeMillis();
+
+      if ((end - start) > 60000l) {
+        LOG.info("Time to save map: " + (end - start) + ", " +
+          "thread = " + Thread.currentThread().getClass());
+      }
+    }
+  }
+
+  /**
+   * Save Metric aggregate records.
+   * @throws SQLException
+   */
+  public void saveClusterAggregateRecords(Map<TimelineClusterMetric,
+      MetricClusterAggregate> records) throws SQLException {
+    if (records == null || records.isEmpty()) {
+      LOG.debug("Empty aggregate records.");
+      return;
+    }
+
+    long start = System.currentTimeMillis();
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+      int rowCount = 0;
+
+      for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+          aggregateEntry : records.entrySet()) {
+        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+        MetricClusterAggregate aggregate = aggregateEntry.getValue();
+
+        LOG.trace("clusterMetric = " + clusterMetric + ", " +
+          "aggregate = " + aggregate);
+
+        rowCount++;
+        stmt.clearParameters();
+        stmt.setString(1, clusterMetric.getMetricName());
+        stmt.setString(2, clusterMetric.getAppId());
+        stmt.setString(3, clusterMetric.getInstanceId());
+        stmt.setLong(4, clusterMetric.getTimestamp());
+        stmt.setString(5, clusterMetric.getType());
+        stmt.setDouble(6, aggregate.getSum());
+        stmt.setInt(7, aggregate.getNumberOfHosts());
+        stmt.setDouble(8, aggregate.getMax());
+        stmt.setDouble(9, aggregate.getMin());
+
+        try {
+          stmt.executeUpdate();
+        } catch (SQLException sql) {
+          LOG.error(sql);
+        }
+
+        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+          conn.commit();
+          rowCount = 0;
+        }
+      }
+
+      conn.commit();
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+    long end = System.currentTimeMillis();
+    if ((end - start) > 60000l) {
+      LOG.info("Time to save: " + (end - start) + ", " +
+        "thread = " + Thread.currentThread().getName());
+    }
+  }
+
+
+  public TimelineMetrics getAggregateMetricRecords(final Condition condition)
+      throws SQLException {
+
+    if (condition.isEmpty()) {
+      throw new SQLException("No filter criteria specified.");
+    }
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    TimelineMetrics metrics = new TimelineMetrics();
+
+    try {
+      stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
+
+      ResultSet rs = stmt.executeQuery();
+
+      while (rs.next()) {
+        TimelineMetric metric = new TimelineMetric();
+        metric.setMetricName(rs.getString("METRIC_NAME"));
+        metric.setAppId(rs.getString("APP_ID"));
+        metric.setInstanceId(rs.getString("INSTANCE_ID"));
+        metric.setTimestamp(rs.getLong("TIMESTAMP"));
+        metric.setStartTime(rs.getLong("TIMESTAMP"));
+        Map<Long, Double> valueMap = new HashMap<Long, Double>();
+        valueMap.put(rs.getLong("TIMESTAMP"), rs.getDouble("METRIC_SUM") /
+                                              rs.getInt("HOSTS_COUNT"));
+        metric.setMetricValues(valueMap);
+
+        if (condition.isGrouped()) {
+          metrics.addOrMergeTimelineMetric(metric);
+        } else {
+          metrics.getMetrics().add(metric);
+        }
+      }
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+    LOG.info("Aggregate records size: " + metrics.getMetrics().size());
+    return metrics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
new file mode 100644
index 0000000..60a5673
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * Encapsulate all metrics related SQL queries.
+ */
+public class PhoenixTransactSQL {
+
+  static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+  // TODO: Configurable TTL values
+  /**
+  * Create table to store individual metric records.
+  */
+  public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
+    "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, HOSTNAME VARCHAR, " +
+    "APP_ID VARCHAR, INSTANCE_ID VARCHAR, TIMESTAMP UNSIGNED_LONG NOT NULL, " +
+    "START_TIME UNSIGNED_LONG, UNITS CHAR(20), " +
+    "METRIC_AVG DOUBLE, METRIC_MAX DOUBLE, METRIC_MIN DOUBLE, " +
+    "METRICS VARCHAR CONSTRAINT pk " +
+    "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP)) " +
+    "IMMUTABLE_ROWS=true, TTL=86400";
+
+  public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
+    "(METRIC_NAME VARCHAR, HOSTNAME VARCHAR, " +
+    "APP_ID VARCHAR, INSTANCE_ID VARCHAR, TIMESTAMP UNSIGNED_LONG NOT NULL, " +
+    "UNITS CHAR(20), METRIC_AVG DOUBLE, METRIC_MAX DOUBLE," +
+    "METRIC_MIN DOUBLE CONSTRAINT pk " +
+    "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP)) " +
+    "IMMUTABLE_ROWS=true, TTL=2592000";
+
+  public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
+    "(METRIC_NAME VARCHAR, HOSTNAME VARCHAR, " +
+    "APP_ID VARCHAR, INSTANCE_ID VARCHAR, TIMESTAMP UNSIGNED_LONG NOT NULL, " +
+    "UNITS CHAR(20), METRIC_AVG DOUBLE, METRIC_MAX DOUBLE," +
+    "METRIC_MIN DOUBLE CONSTRAINT pk " +
+    "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP)) " +
+    "IMMUTABLE_ROWS=true, TTL=604800";
+
+  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
+    "(METRIC_NAME VARCHAR, APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
+    "TIMESTAMP UNSIGNED_LONG NOT NULL, UNITS CHAR(20), METRIC_SUM DOUBLE, " +
+    "HOSTS_COUNT UNSIGNED_INT, METRIC_MAX DOUBLE, METRIC_MIN DOUBLE " +
+    "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, TIMESTAMP)) " +
+    "IMMUTABLE_ROWS=true, TTL=2592000";
+
+  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
+    "(METRIC_NAME VARCHAR, APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
+    "TIMESTAMP UNSIGNED_LONG NOT NULL, UNITS CHAR(20), METRIC_AVG DOUBLE, " +
+    "METRIC_MAX DOUBLE, METRIC_MIN DOUBLE " +
+    "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, TIMESTAMP)) " +
+    "IMMUTABLE_ROWS=true, TTL=31536000";
+
+  /**
+   * Insert into metric records table.
+   */
+  public static final String UPSERT_METRICS_SQL = "UPSERT INTO METRIC_RECORD " +
+    "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP, START_TIME, " +
+    "UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN, METRICS) VALUES " +
+    "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
+    "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, TIMESTAMP, " +
+    "UNITS, METRIC_SUM, HOSTS_COUNT, METRIC_MAX, METRIC_MIN) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
+    "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+    "TIMESTAMP, UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Retrieve a set of rows from metrics records table.
+   */
+  public static final String GET_METRIC_SQL = "SELECT METRIC_NAME, " +
+    "HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP, START_TIME, UNITS, METRIC_AVG, " +
+    "METRIC_MAX, METRIC_MIN, METRICS FROM METRIC_RECORD";
+
+  public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT " +
+    "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, TIMESTAMP, " +
+    "UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN FROM %s";
+
+  public static final String GET_CLUSTER_AGGREGATE_SQL =
+    "SELECT METRIC_NAME, APP_ID, " +
+    "INSTANCE_ID, TIMESTAMP, METRIC_SUM, HOSTS_COUNT, METRIC_MAX, " +
+    "METRIC_MIN FROM METRIC_AGGREGATE";
+
+  /**
+   * 4 metrics/min * 60 * 24: Retrieve data for 1 day.
+   */
+  public static final Integer DEFAULT_RESULT_LIMIT = 5760;
+  public static final String METRICS_RECORD_TABLE_NAME =
+    "METRIC_RECORD";
+  public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
+    "METRIC_RECORD_MINUTE";
+  public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
+    "METRIC_RECORD_HOURLY";
+
+  public static PreparedStatement prepareGetMetricsSqlStmt(
+      Connection connection, Condition condition) throws SQLException {
+
+    if (condition.isEmpty()) {
+      throw new IllegalArgumentException("Condition is empty.");
+    }
+    String stmtStr = GET_METRIC_SQL;
+    if (condition.getStatement() != null) {
+      stmtStr = condition.getStatement();
+    }
+
+    StringBuilder sb = new StringBuilder(stmtStr);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    sb.append(" ORDER BY METRIC_NAME, TIMESTAMP");
+    if (condition.getLimit() != null) {
+      sb.append(" LIMIT ").append(condition.getLimit());
+    }
+
+    LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+    PreparedStatement stmt = connection.prepareStatement(sb.toString());
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      for ( ; pos <= condition.getMetricNames().size(); pos++) {
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    if (condition.getHostname() != null) {
+      stmt.setString(pos++, condition.getHostname());
+    }
+    // TODO: Upper case all strings on POST
+    if (condition.getAppId() != null) {
+      stmt.setString(pos++, condition.getAppId().toLowerCase());
+    }
+    if (condition.getInstanceId() != null) {
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+    if (condition.getStartTime() != null) {
+      stmt.setLong(pos++, condition.getStartTime());
+    }
+    if (condition.getEndTime() != null) {
+      stmt.setLong(pos, condition.getEndTime());
+    }
+    if (condition.getFetchSize() != null) {
+      stmt.setFetchSize(condition.getFetchSize());
+    }
+
+    return stmt;
+  }
+
+  public static PreparedStatement prepareGetAggregateSqlStmt(
+      Connection connection, Condition condition) throws SQLException {
+
+    if (condition.isEmpty()) {
+      throw new IllegalArgumentException("Condition is empty.");
+    }
+
+    StringBuilder sb = new StringBuilder(GET_CLUSTER_AGGREGATE_SQL);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    sb.append(" ORDER BY METRIC_NAME, TIMESTAMP");
+    if (condition.getLimit() != null) {
+      sb.append(" LIMIT ").append(condition.getLimit());
+    }
+
+    LOG.debug("SQL => " + sb.toString() + ", condition => " + condition);
+    PreparedStatement stmt = connection.prepareStatement(sb.toString());
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      for ( ; pos <= condition.getMetricNames().size(); pos++) {
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    // TODO: Upper case all strings on POST
+    if (condition.getAppId() != null) {
+      stmt.setString(pos++, condition.getAppId().toLowerCase());
+    }
+    if (condition.getInstanceId() != null) {
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+    if (condition.getStartTime() != null) {
+      stmt.setLong(pos++, condition.getStartTime());
+    }
+    if (condition.getEndTime() != null) {
+      stmt.setLong(pos, condition.getEndTime());
+    }
+
+    return stmt;
+  }
+
+  static class Condition {
+    List<String> metricNames;
+    String hostname;
+    String appId;
+    String instanceId;
+    Long startTime;
+    Long endTime;
+    Integer limit;
+    boolean grouped;
+    boolean noLimit = false;
+    Integer fetchSize;
+    String statement;
+
+    Condition(List<String> metricNames, String hostname, String appId,
+              String instanceId, Long startTime, Long endTime, Integer limit,
+              boolean grouped) {
+      this.metricNames = metricNames;
+      this.hostname = hostname;
+      this.appId = appId;
+      this.instanceId = instanceId;
+      this.startTime = startTime;
+      this.endTime = endTime;
+      this.limit = limit;
+      this.grouped = grouped;
+    }
+
+    String getStatement() {
+      return statement;
+    }
+
+    void setStatement(String statement) {
+      this.statement = statement;
+    }
+
+    List<String> getMetricNames() {
+      return metricNames == null || metricNames.isEmpty() ? null : metricNames;
+    }
+
+    String getMetricsClause() {
+      StringBuilder sb = new StringBuilder("(");
+      if (metricNames != null) {
+        for (String name : metricNames) {
+          if (sb.length() != 1) {
+            sb.append(", ");
+          }
+          sb.append("?");
+        }
+        sb.append(")");
+        return sb.toString();
+      } else {
+        return null;
+      }
+    }
+
+    String getConditionClause() {
+      StringBuilder sb = new StringBuilder();
+      boolean appendConjunction = false;
+
+      if (getMetricNames() != null) {
+        sb.append("METRIC_NAME IN ");
+        sb.append(getMetricsClause());
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getHostname() != null) {
+        sb.append(" HOSTNAME = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getAppId() != null) {
+        sb.append(" APP_ID = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getInstanceId() != null) {
+        sb.append(" INSTANCE_ID = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getStartTime() != null) {
+        sb.append(" TIMESTAMP >= ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      if (getEndTime() != null) {
+        sb.append(" TIMESTAMP < ?");
+      }
+      return sb.toString();
+    }
+
+    String getHostname() {
+      return hostname == null || hostname.isEmpty() ? null : hostname;
+    }
+
+    String getAppId() {
+      return appId == null || appId.isEmpty() ? null : appId;
+    }
+
+    String getInstanceId() {
+      return instanceId == null || instanceId.isEmpty() ? null : instanceId;
+    }
+
+    /**
+     * Convert to millis.
+     */
+    Long getStartTime() {
+      if (startTime < 9999999999l) {
+        return startTime * 1000;
+      } else {
+        return startTime;
+      }
+    }
+
+    Long getEndTime() {
+      if (endTime < 9999999999l) {
+        return endTime * 1000;
+      } else {
+        return endTime;
+      }
+    }
+
+    void setNoLimit() {
+      this.noLimit = true;
+    }
+
+    Integer getLimit() {
+      if (noLimit) {
+        return null;
+      }
+      return limit == null ? DEFAULT_RESULT_LIMIT : limit;
+    }
+
+    boolean isGrouped() {
+      return grouped;
+    }
+
+    boolean isEmpty() {
+      return (metricNames == null || metricNames.isEmpty())
+        && (hostname == null || hostname.isEmpty())
+        && (appId == null || appId.isEmpty())
+        && (instanceId == null || instanceId.isEmpty())
+        && startTime == null
+        && endTime == null;
+    }
+
+    Integer getFetchSize() {
+      return fetchSize;
+    }
+
+    void setFetchSize(Integer fetchSize) {
+      this.fetchSize = fetchSize;
+    }
+
+    @Override
+    public String toString() {
+      return "Condition{" +
+        "metricNames=" + metricNames +
+        ", hostname='" + hostname + '\'' +
+        ", appId='" + appId + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", startTime=" + startTime +
+        ", endTime=" + endTime +
+        ", limit=" + limit +
+        ", grouped=" + grouped +
+        ", noLimit=" + noLimit +
+        '}';
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
new file mode 100644
index 0000000..50a4f63
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+public class TimelineMetricAggregatorHourly extends AbstractTimelineAggregator {
+  static final Long SLEEP_INTERVAL = 3600000l;
+  static final Long CHECKPOINT_CUT_OFF_INTERVAL = SLEEP_INTERVAL * 2;
+  static final Integer RESULTSET_FETCH_SIZE = 1000;
+  private static final Log LOG = LogFactory.getLog(TimelineMetricAggregatorHourly.class);
+
+  public TimelineMetricAggregatorHourly(PhoenixHBaseAccessor hBaseAccessor,
+                                        String checkpointLocation) {
+    super(hBaseAccessor, checkpointLocation);
+  }
+
+  @Override
+  protected boolean doWork(long startTime, long endTime) {
+    LOG.info("Start aggregation cycle @ " + new Date());
+
+    boolean success = true;
+    Condition condition = new Condition(null, null, null, null, startTime,
+                                        endTime, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(RESULTSET_FETCH_SIZE);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+    Connection conn = null;
+    PreparedStatement stmt = null;
+
+    try {
+      conn = hBaseAccessor.getConnection();
+      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+      ResultSet rs = stmt.executeQuery();
+      TimelineMetric existingMetric = null;
+      MetricHostAggregate hostAggregate = null;
+      Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+        new HashMap<TimelineMetric, MetricHostAggregate>();
+
+      while (rs.next()) {
+        TimelineMetric currentMetric =
+          PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+        MetricHostAggregate currentHostAggregate =
+          PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+        if (existingMetric == null) {
+          // First row
+          existingMetric = currentMetric;
+          hostAggregate = new MetricHostAggregate();
+          hostAggregateMap.put(currentMetric, hostAggregate);
+        }
+
+        if (existingMetric.equalsExceptTime(currentMetric)) {
+          // Recalculate totals with current metric
+          hostAggregate.updateAggregates(currentHostAggregate);
+
+        } else {
+          // Switched over to a new metric - save existing
+          hostAggregate = new MetricHostAggregate();
+          hostAggregate.updateAggregates(currentHostAggregate);
+          hostAggregateMap.put(currentMetric, hostAggregate);
+          existingMetric = currentMetric;
+        }
+      }
+
+      LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+
+      hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+        METRICS_AGGREGATE_HOURLY_TABLE_NAME);
+
+    } catch (SQLException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } catch (IOException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    LOG.info("End aggregation cycle @ " + new Date());
+    return success;
+  }
+
+  @Override
+  protected Long getSleepInterval() {
+    return SLEEP_INTERVAL;
+  }
+
+  @Override
+  protected Long getCheckpointCutOffInterval() {
+    return CHECKPOINT_CUT_OFF_INTERVAL;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
new file mode 100644
index 0000000..a3909cf
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
+
+public class TimelineMetricAggregatorMinute extends AbstractTimelineAggregator {
+  static final Long SLEEP_INTERVAL = 300000l; // 5 mins
+  static final Long CHECKPOINT_CUT_OFF_INTERVAL = SLEEP_INTERVAL * 4;
+  private static final Log LOG = LogFactory.getLog(TimelineMetricAggregatorMinute.class);
+
+  public TimelineMetricAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor,
+                                        String checkpointLocation) {
+    super(hBaseAccessor, checkpointLocation);
+  }
+
+  @Override
+  protected boolean doWork(long startTime, long endTime) {
+    LOG.info("Start aggregation cycle @ " + new Date());
+
+    boolean success = true;
+    Condition condition = new Condition(null, null, null, null, startTime,
+                                        endTime, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(RESULTSET_FETCH_SIZE);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      METRICS_RECORD_TABLE_NAME));
+
+    Connection conn = null;
+    PreparedStatement stmt = null;
+
+    try {
+      conn = hBaseAccessor.getConnection();
+      stmt = prepareGetMetricsSqlStmt(conn, condition);
+      LOG.info("Query issued @: " + new Date());
+      ResultSet rs = stmt.executeQuery();
+      LOG.info("Query returned @: " + new Date());
+      TimelineMetric existingMetric = null;
+      MetricHostAggregate hostAggregate = null;
+
+      Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+        new HashMap<TimelineMetric, MetricHostAggregate>();
+
+      while (rs.next()) {
+        TimelineMetric currentMetric =
+          PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+        MetricHostAggregate currentHostAggregate =
+          PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+        if (existingMetric == null) {
+          // First row
+          existingMetric = currentMetric;
+          hostAggregate = new MetricHostAggregate();
+          hostAggregateMap.put(currentMetric, hostAggregate);
+        }
+
+        if (existingMetric.equalsExceptTime(currentMetric)) {
+          // Recalculate totals with current metric
+          hostAggregate.updateAggregates(currentHostAggregate);
+
+        } else {
+          // Switched over to a new metric - save existing
+          hostAggregate = new MetricHostAggregate();
+          hostAggregate.updateAggregates(currentHostAggregate);
+          hostAggregateMap.put(currentMetric, hostAggregate);
+          existingMetric = currentMetric;
+        }
+      }
+
+      LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+
+      hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+        METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+    } catch (SQLException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } catch (IOException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    LOG.info("End aggregation cycle @ " + new Date());
+    return success;
+  }
+
+  @Override
+  protected Long getSleepInterval() {
+    return SLEEP_INTERVAL;
+  }
+
+  @Override
+  protected Long getCheckpointCutOffInterval() {
+    return CHECKPOINT_CUT_OFF_INTERVAL;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
new file mode 100644
index 0000000..10b2d70
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
+
+/**
+ * Aggregates a metric across all hosts in the cluster.
+ */
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+  public static final long WAKE_UP_INTERVAL = 120000;
+  public static final int TIME_SLICE_INTERVAL = 15000;
+  private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
+
+  public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                         String checkpointLocation) {
+    super(hBaseAccessor, checkpointLocation);
+  }
+
+  /**
+   * Read metrics written during the time interval and save the sum and total
+   * in the aggregate table.
+   *
+   * @param startTime Sample start time
+   * @param endTime Sample end time
+   */
+  protected boolean doWork(long startTime, long endTime) {
+    LOG.info("Start aggregation cycle @ " + new Date());
+
+    boolean success = true;
+    Condition condition = new Condition(null, null, null, null, startTime,
+                                        endTime, null, true);
+    condition.setFetchSize(RESULTSET_FETCH_SIZE);
+    condition.setNoLimit();
+    condition.setStatement(GET_METRIC_SQL);
+
+    Connection conn;
+    PreparedStatement stmt;
+
+    try {
+      conn = hBaseAccessor.getConnection();
+      stmt = prepareGetMetricsSqlStmt(conn, condition);
+      LOG.info("Query issued @: " + new Date());
+      ResultSet rs = stmt.executeQuery();
+      LOG.info("Query returned @: " + new Date());
+      Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+        new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+      List<Long[]> timeSlices = new ArrayList<Long[]>();
+      // Create time slices
+      long sliceStartTime = startTime;
+      while (sliceStartTime < endTime) {
+        timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + TIME_SLICE_INTERVAL });
+        sliceStartTime += TIME_SLICE_INTERVAL;
+      }
+
+      while (rs.next()) {
+        TimelineMetric metric =
+          PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
+
+        Map<TimelineClusterMetric, Double> clusterMetrics =
+          sliceFromTimelineMetric(metric, timeSlices);
+
+        if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+          for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+              clusterMetrics.entrySet()) {
+            TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+            MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+            Double avgValue = clusterMetricEntry.getValue();
+
+            if (aggregate == null) {
+              aggregate = new MetricClusterAggregate(avgValue, 1, null,
+                avgValue, avgValue);
+              aggregateClusterMetrics.put(clusterMetric, aggregate);
+            } else {
+              aggregate.updateSum(avgValue);
+              aggregate.updateNumberOfHosts(1);
+              aggregate.updateMax(avgValue);
+              aggregate.updateMin(avgValue);
+            }
+          }
+        }
+      }
+      LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+
+      hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+
+      LOG.info("End aggregation cycle @ " + new Date());
+
+    } catch (SQLException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } catch (IOException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    }
+
+    return success;
+  }
+
+  @Override
+  protected Long getSleepInterval() {
+    return WAKE_UP_INTERVAL;
+  }
+
+  @Override
+  protected Long getCheckpointCutOffInterval() {
+    return 600000l;
+  }
+
+  private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+        TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+
+    if (timelineMetric.getMetricValues().isEmpty()) {
+      return null;
+    }
+
+    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+      new HashMap<TimelineClusterMetric, Double>();
+
+    for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
+      // TODO: investigate null values - pre filter
+      if (metric.getValue() == null) {
+        continue;
+      }
+      Long timestamp = getSliceTimeForMetric(timeSlices,
+                       Long.parseLong(metric.getKey().toString()));
+      if (timestamp != -1) {
+        // Metric is within desired time range
+        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+          timelineMetric.getMetricName(), timelineMetric.getAppId(),
+          timelineMetric.getInstanceId(), timestamp, timelineMetric.getType());
+
+        if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
+          timelineClusterMetricMap.put(clusterMetric, metric.getValue());
+        } else {
+          Double oldValue = timelineClusterMetricMap.get(clusterMetric);
+          Double newValue = (oldValue + metric.getValue()) / 2;
+          timelineClusterMetricMap.put(clusterMetric, newValue);
+        }
+      }
+    }
+
+    return timelineClusterMetricMap;
+  }
+
+  /**
+   * Return beginning of the time slice into which the metric fits.
+   */
+  private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+    for (Long[] timeSlice : timeSlices) {
+      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
+        return timeSlice[0];
+      }
+    }
+    return -1l;
+  }
+
+  public static class TimelineClusterMetric {
+    private String metricName;
+    private String appId;
+    private String instanceId;
+    private long timestamp;
+    private String type;
+
+    TimelineClusterMetric(String metricName, String appId, String instanceId,
+                          long timestamp, String type) {
+      this.metricName = metricName;
+      this.appId = appId;
+      this.instanceId = instanceId;
+      this.timestamp = timestamp;
+      this.type = type;
+    }
+
+    String getMetricName() {
+      return metricName;
+    }
+
+    String getAppId() {
+      return appId;
+    }
+
+    String getInstanceId() {
+      return instanceId;
+    }
+
+    long getTimestamp() {
+      return timestamp;
+    }
+
+    String getType() { return type; }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+      if (timestamp != that.timestamp) return false;
+      if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+        return false;
+      if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+        return false;
+      if (!metricName.equals(that.metricName)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = metricName.hashCode();
+      result = 31 * result + (appId != null ? appId.hashCode() : 0);
+      result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+      result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "TimelineClusterMetric{" +
+        "metricName='" + metricName + '\'' +
+        ", appId='" + appId + '\'' +
+        ", instanceId='" + instanceId + '\'' +
+        ", timestamp=" + timestamp +
+        '}';
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
new file mode 100644
index 0000000..1caf809
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
+  public static final long SLEEP_INTERVAL = 3600000;
+
+  public TimelineMetricClusterAggregatorHourly(PhoenixHBaseAccessor hBaseAccessor,
+                                               String checkpointLocation) {
+    super(hBaseAccessor, checkpointLocation);
+  }
+
+  @Override
+  protected boolean doWork(long startTime, long endTime) {
+    return false;
+  }
+
+  @Override
+  protected Long getSleepInterval() {
+    return SLEEP_INTERVAL;
+  }
+
+  @Override
+  protected Long getCheckpointCutOffInterval() {
+    return 7200000l;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
new file mode 100644
index 0000000..5224450
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+public interface TimelineMetricStore {
+  /**
+   * This method retrieves metrics stored byu the Timeline store.
+   *
+   * @param metricNames Names of the metric, e.g.: cpu_user
+   * @param hostname Name of the host where the metric originated from
+   * @param applicationId Id of the application to which this metric belongs
+   * @param instanceId Application instance id.
+   * @param startTime Start timestamp
+   * @param endTime End timestamp
+   * @param limit Override default result limit
+   * @param groupedByHosts Group {@link TimelineMetric} by metric name, hostname,
+   *                app id and instance id
+   *
+   * @return {@link TimelineMetric}
+   * @throws java.sql.SQLException
+   */
+  TimelineMetrics getTimelineMetrics(List<String> metricNames, String hostname,
+      String applicationId, String instanceId, Long startTime,
+      Long endTime, Integer limit, boolean groupedByHosts)
+    throws SQLException, IOException;
+
+
+  /**
+   * Return all records for a single metric satisfying the filter criteria.
+   * @return {@link TimelineMetric}
+   */
+  TimelineMetric getTimelineMetric(String metricName, String hostname,
+      String applicationId, String instanceId, Long startTime,
+      Long endTime, Integer limit)
+      throws SQLException, IOException;
+
+
+  /**
+   * Stores metric information to the timeline store. Any errors occurring for
+   * individual put request objects will be reported in the response.
+   *
+   * @param metrics An {@link TimelineMetrics}.
+   * @return An {@link org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse}.
+   * @throws SQLException, IOException
+   */
+  TimelinePutResponse putMetrics(TimelineMetrics metrics)
+    throws SQLException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
new file mode 100644
index 0000000..7ba51af
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptFinishData.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when
+ * <code>RMAppAttempt</code> finishes, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationAttemptFinishData {
+
+  @Public
+  @Unstable
+  public static ApplicationAttemptFinishData newInstance(
+      ApplicationAttemptId appAttemptId, String diagnosticsInfo,
+      String trackingURL, FinalApplicationStatus finalApplicationStatus,
+      YarnApplicationAttemptState yarnApplicationAttemptState) {
+    ApplicationAttemptFinishData appAttemptFD =
+        Records.newRecord(ApplicationAttemptFinishData.class);
+    appAttemptFD.setApplicationAttemptId(appAttemptId);
+    appAttemptFD.setDiagnosticsInfo(diagnosticsInfo);
+    appAttemptFD.setTrackingURL(trackingURL);
+    appAttemptFD.setFinalApplicationStatus(finalApplicationStatus);
+    appAttemptFD.setYarnApplicationAttemptState(yarnApplicationAttemptState);
+    return appAttemptFD;
+  }
+
+  @Public
+  @Unstable
+  public abstract ApplicationAttemptId getApplicationAttemptId();
+
+  @Public
+  @Unstable
+  public abstract void setApplicationAttemptId(
+      ApplicationAttemptId applicationAttemptId);
+
+  @Public
+  @Unstable
+  public abstract String getTrackingURL();
+
+  @Public
+  @Unstable
+  public abstract void setTrackingURL(String trackingURL);
+
+  @Public
+  @Unstable
+  public abstract String getDiagnosticsInfo();
+
+  @Public
+  @Unstable
+  public abstract void setDiagnosticsInfo(String diagnosticsInfo);
+
+  @Public
+  @Unstable
+  public abstract FinalApplicationStatus getFinalApplicationStatus();
+
+  @Public
+  @Unstable
+  public abstract void setFinalApplicationStatus(
+      FinalApplicationStatus finalApplicationStatus);
+
+  @Public
+  @Unstable
+  public abstract YarnApplicationAttemptState getYarnApplicationAttemptState();
+
+  @Public
+  @Unstable
+  public abstract void setYarnApplicationAttemptState(
+      YarnApplicationAttemptState yarnApplicationAttemptState);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
new file mode 100644
index 0000000..b759ab1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptHistoryData.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+
+/**
+ * The class contains all the fields that are stored persistently for
+ * <code>RMAppAttempt</code>.
+ */
+@Public
+@Unstable
+public class ApplicationAttemptHistoryData {
+
+  private ApplicationAttemptId applicationAttemptId;
+
+  private String host;
+
+  private int rpcPort;
+
+  private String trackingURL;
+
+  private String diagnosticsInfo;
+
+  private FinalApplicationStatus finalApplicationStatus;
+
+  private ContainerId masterContainerId;
+
+  private YarnApplicationAttemptState yarnApplicationAttemptState;
+
+  @Public
+  @Unstable
+  public static ApplicationAttemptHistoryData newInstance(
+      ApplicationAttemptId appAttemptId, String host, int rpcPort,
+      ContainerId masterContainerId, String diagnosticsInfo,
+      String trackingURL, FinalApplicationStatus finalApplicationStatus,
+      YarnApplicationAttemptState yarnApplicationAttemptState) {
+    ApplicationAttemptHistoryData appAttemptHD =
+        new ApplicationAttemptHistoryData();
+    appAttemptHD.setApplicationAttemptId(appAttemptId);
+    appAttemptHD.setHost(host);
+    appAttemptHD.setRPCPort(rpcPort);
+    appAttemptHD.setMasterContainerId(masterContainerId);
+    appAttemptHD.setDiagnosticsInfo(diagnosticsInfo);
+    appAttemptHD.setTrackingURL(trackingURL);
+    appAttemptHD.setFinalApplicationStatus(finalApplicationStatus);
+    appAttemptHD.setYarnApplicationAttemptState(yarnApplicationAttemptState);
+    return appAttemptHD;
+  }
+
+  @Public
+  @Unstable
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  @Public
+  @Unstable
+  public void
+      setApplicationAttemptId(ApplicationAttemptId applicationAttemptId) {
+    this.applicationAttemptId = applicationAttemptId;
+  }
+
+  @Public
+  @Unstable
+  public String getHost() {
+    return host;
+  }
+
+  @Public
+  @Unstable
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  @Public
+  @Unstable
+  public int getRPCPort() {
+    return rpcPort;
+  }
+
+  @Public
+  @Unstable
+  public void setRPCPort(int rpcPort) {
+    this.rpcPort = rpcPort;
+  }
+
+  @Public
+  @Unstable
+  public String getTrackingURL() {
+    return trackingURL;
+  }
+
+  @Public
+  @Unstable
+  public void setTrackingURL(String trackingURL) {
+    this.trackingURL = trackingURL;
+  }
+
+  @Public
+  @Unstable
+  public String getDiagnosticsInfo() {
+    return diagnosticsInfo;
+  }
+
+  @Public
+  @Unstable
+  public void setDiagnosticsInfo(String diagnosticsInfo) {
+    this.diagnosticsInfo = diagnosticsInfo;
+  }
+
+  @Public
+  @Unstable
+  public FinalApplicationStatus getFinalApplicationStatus() {
+    return finalApplicationStatus;
+  }
+
+  @Public
+  @Unstable
+  public void setFinalApplicationStatus(
+      FinalApplicationStatus finalApplicationStatus) {
+    this.finalApplicationStatus = finalApplicationStatus;
+  }
+
+  @Public
+  @Unstable
+  public ContainerId getMasterContainerId() {
+    return masterContainerId;
+  }
+
+  @Public
+  @Unstable
+  public void setMasterContainerId(ContainerId masterContainerId) {
+    this.masterContainerId = masterContainerId;
+  }
+
+  @Public
+  @Unstable
+  public YarnApplicationAttemptState getYarnApplicationAttemptState() {
+    return yarnApplicationAttemptState;
+  }
+
+  @Public
+  @Unstable
+  public void setYarnApplicationAttemptState(
+      YarnApplicationAttemptState yarnApplicationAttemptState) {
+    this.yarnApplicationAttemptState = yarnApplicationAttemptState;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
new file mode 100644
index 0000000..7ca43fa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationAttemptStartData.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when
+ * <code>RMAppAttempt</code> starts, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationAttemptStartData {
+
+  @Public
+  @Unstable
+  public static ApplicationAttemptStartData newInstance(
+      ApplicationAttemptId appAttemptId, String host, int rpcPort,
+      ContainerId masterContainerId) {
+    ApplicationAttemptStartData appAttemptSD =
+        Records.newRecord(ApplicationAttemptStartData.class);
+    appAttemptSD.setApplicationAttemptId(appAttemptId);
+    appAttemptSD.setHost(host);
+    appAttemptSD.setRPCPort(rpcPort);
+    appAttemptSD.setMasterContainerId(masterContainerId);
+    return appAttemptSD;
+  }
+
+  @Public
+  @Unstable
+  public abstract ApplicationAttemptId getApplicationAttemptId();
+
+  @Public
+  @Unstable
+  public abstract void setApplicationAttemptId(
+      ApplicationAttemptId applicationAttemptId);
+
+  @Public
+  @Unstable
+  public abstract String getHost();
+
+  @Public
+  @Unstable
+  public abstract void setHost(String host);
+
+  @Public
+  @Unstable
+  public abstract int getRPCPort();
+
+  @Public
+  @Unstable
+  public abstract void setRPCPort(int rpcPort);
+
+  @Public
+  @Unstable
+  public abstract ContainerId getMasterContainerId();
+
+  @Public
+  @Unstable
+  public abstract void setMasterContainerId(ContainerId masterContainerId);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
new file mode 100644
index 0000000..997fa6c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/records/ApplicationFinishData.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The class contains the fields that can be determined when <code>RMApp</code>
+ * finishes, and that need to be stored persistently.
+ */
+@Public
+@Unstable
+public abstract class ApplicationFinishData {
+
+  @Public
+  @Unstable
+  public static ApplicationFinishData newInstance(ApplicationId applicationId,
+      long finishTime, String diagnosticsInfo,
+      FinalApplicationStatus finalApplicationStatus,
+      YarnApplicationState yarnApplicationState) {
+    ApplicationFinishData appFD =
+        Records.newRecord(ApplicationFinishData.class);
+    appFD.setApplicationId(applicationId);
+    appFD.setFinishTime(finishTime);
+    appFD.setDiagnosticsInfo(diagnosticsInfo);
+    appFD.setFinalApplicationStatus(finalApplicationStatus);
+    appFD.setYarnApplicationState(yarnApplicationState);
+    return appFD;
+  }
+
+  @Public
+  @Unstable
+  public abstract ApplicationId getApplicationId();
+
+  @Public
+  @Unstable
+  public abstract void setApplicationId(ApplicationId applicationId);
+
+  @Public
+  @Unstable
+  public abstract long getFinishTime();
+
+  @Public
+  @Unstable
+  public abstract void setFinishTime(long finishTime);
+
+  @Public
+  @Unstable
+  public abstract String getDiagnosticsInfo();
+
+  @Public
+  @Unstable
+  public abstract void setDiagnosticsInfo(String diagnosticsInfo);
+
+  @Public
+  @Unstable
+  public abstract FinalApplicationStatus getFinalApplicationStatus();
+
+  @Public
+  @Unstable
+  public abstract void setFinalApplicationStatus(
+      FinalApplicationStatus finalApplicationStatus);
+
+  @Public
+  @Unstable
+  public abstract YarnApplicationState getYarnApplicationState();
+
+  @Public
+  @Unstable
+  public abstract void setYarnApplicationState(
+      YarnApplicationState yarnApplicationState);
+
+}