You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/04/21 18:52:14 UTC

[2/2] ambari git commit: AMBARI-10622. Add daily aggregation to AMS (useful for reporting over months of data). (swagle)

AMBARI-10622. Add daily aggregation to AMS (useful for reporting over months of data). (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/67c425ac
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/67c425ac
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/67c425ac

Branch: refs/heads/trunk
Commit: 67c425acfd22dcd701d05eab16f22d423db045dd
Parents: 2557d9a
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Apr 21 09:51:58 2015 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Apr 21 09:51:58 2015 -0700

----------------------------------------------------------------------
 .../ambari-metrics-timelineservice/pom.xml      |   1 +
 .../timeline/HBaseTimelineMetricStore.java      |  40 ++--
 .../metrics/timeline/PhoenixHBaseAccessor.java  | 144 ++++++------
 .../metrics/timeline/Precision.java             |   6 +-
 .../timeline/TimelineMetricConfiguration.java   |  29 ++-
 .../aggregators/AbstractTimelineAggregator.java |  58 ++++-
 .../TimelineClusterMetricReader.java            |  42 ----
 .../aggregators/TimelineMetricAggregator.java   | 151 ++----------
 .../TimelineMetricAggregatorFactory.java        | 188 ++++++++++++++-
 .../TimelineMetricClusterAggregator.java        | 235 ++++++-------------
 .../TimelineMetricClusterAggregatorHourly.java  | 175 --------------
 .../TimelineMetricClusterAggregatorMinute.java  | 201 ++++++++++++++++
 .../TimelineMetricHostAggregator.java           | 113 +++++++++
 .../aggregators/TimelineMetricReadHelper.java   |  36 +++
 .../timeline/query/PhoenixTransactSQL.java      |  82 ++++---
 .../timeline/AbstractMiniHBaseClusterTest.java  |   5 +-
 .../metrics/timeline/ITClusterAggregator.java   |  99 ++++++--
 .../metrics/timeline/ITMetricAggregator.java    |  82 ++++++-
 .../timeline/ITPhoenixHBaseAccessor.java        |  34 ++-
 .../metrics/timeline/MetricTestHelper.java      |   7 +-
 .../timeline/TestMetricHostAggregate.java       |   4 +-
 .../0.1.0/configuration/ams-site.xml            |  70 +++++-
 22 files changed, 1065 insertions(+), 737 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index 4ec730e..2485661 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -249,6 +249,7 @@
         <configuration>
           <redirectTestOutputToFile>true</redirectTestOutputToFile>
           <forkMode>always</forkMode>
+          <argLine>-XX:-UseSplitVerifier</argLine>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 1fac404..447f6f9 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 
@@ -69,39 +67,53 @@ public class HBaseTimelineMetricStore extends AbstractService
     hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
     hBaseAccessor.initMetricSchema();
 
-    // Start the cluster aggregator
-    TimelineMetricClusterAggregator minuteClusterAggregator =
-      new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf);
+    // Start the cluster aggregator minute
+    TimelineMetricAggregator minuteClusterAggregator =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
     if (!minuteClusterAggregator.isDisabled()) {
       Thread aggregatorThread = new Thread(minuteClusterAggregator);
       aggregatorThread.start();
     }
 
-    // Start the cluster aggregator hourly
-    TimelineMetricClusterAggregatorHourly hourlyClusterAggregator =
-      new TimelineMetricClusterAggregatorHourly(hBaseAccessor, metricsConf);
+    // Start the hourly cluster aggregator
+    TimelineMetricAggregator hourlyClusterAggregator =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
     if (!hourlyClusterAggregator.isDisabled()) {
       Thread aggregatorThread = new Thread(hourlyClusterAggregator);
       aggregatorThread.start();
     }
 
-    // Start the 5 minute aggregator
+    // Start the daily cluster aggregator
+    TimelineMetricAggregator dailyClusterAggregator =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
+    if (!dailyClusterAggregator.isDisabled()) {
+      Thread aggregatorThread = new Thread(dailyClusterAggregator);
+      aggregatorThread.start();
+    }
+
+    // Start the minute host aggregator
     TimelineMetricAggregator minuteHostAggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute
-        (hBaseAccessor, metricsConf);
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
     if (!minuteHostAggregator.isDisabled()) {
       Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
       minuteAggregatorThread.start();
     }
 
-    // Start hourly host aggregator
+    // Start the hourly host aggregator
     TimelineMetricAggregator hourlyHostAggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly
-        (hBaseAccessor, metricsConf);
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
     if (!hourlyHostAggregator.isDisabled()) {
       Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
       aggregatorHourlyThread.start();
     }
+
+    // Start the daily host aggregator
+    TimelineMetricAggregator dailyHostAggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
+    if (!dailyHostAggregator.isDisabled()) {
+      Thread aggregatorDailyThread = new Thread(dailyHostAggregator);
+      aggregatorDailyThread.start();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 2e78912..e27d9a9 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
+
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -51,7 +52,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
@@ -59,19 +62,21 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
@@ -85,13 +90,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
  */
 public class PhoenixHBaseAccessor {
 
+  static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
   private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
   private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
-  private final Configuration hbaseConf;
-  private final Configuration metricsConf;
-  private final RetryCounterFactory retryCounterFactory;
-
-  static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
   /**
    * 4 metrics/min * 60 * 24: Retrieve data for 1 day.
    */
@@ -99,9 +100,11 @@ public class PhoenixHBaseAccessor {
   public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) *
     METRICS_PER_MINUTE;
   private static ObjectMapper mapper = new ObjectMapper();
-
   private static TypeReference<Map<Long, Double>> metricValuesTypeRef =
     new TypeReference<Map<Long, Double>>() {};
+  private final Configuration hbaseConf;
+  private final Configuration metricsConf;
+  private final RetryCounterFactory retryCounterFactory;
   private final ConnectionProvider dataSource;
 
   public PhoenixHBaseAccessor(Configuration hbaseConf,
@@ -127,37 +130,6 @@ public class PhoenixHBaseAccessor {
       (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5)));
   }
 
-
-  private Connection getConnectionRetryingOnException()
-    throws SQLException, InterruptedException {
-    RetryCounter retryCounter = retryCounterFactory.create();
-    while (true) {
-      try{
-        return getConnection();
-      } catch (SQLException e) {
-        if(!retryCounter.shouldRetry()){
-          LOG.error("HBaseAccessor getConnection failed after "
-            + retryCounter.getMaxAttempts() + " attempts");
-          throw e;
-        }
-      }
-      retryCounter.sleepUntilNextRetry();
-    }
-  }
-
-  /**
-   * Get JDBC connection to HBase store. Assumption is that the hbase
-   * configuration is present on the classpath and loaded by the caller into
-   * the Configuration object.
-   * Phoenix already caches the HConnection between the client and HBase
-   * cluster.
-   *
-   * @return @java.sql.Connection
-   */
-  public Connection getConnection() throws SQLException {
-    return dataSource.getConnection();
-  }
-
   private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
     throws SQLException, IOException {
     TimelineMetric metric = TIMELINE_METRIC_READ_HELPER
@@ -218,8 +190,7 @@ public class PhoenixHBaseAccessor {
   }
 
   @SuppressWarnings("unchecked")
-  public static Map<Long, Double>  readMetricFromJSON(String json)
-    throws IOException {
+  public static Map<Long, Double>  readMetricFromJSON(String json) throws IOException {
     return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef);
   }
 
@@ -247,17 +218,34 @@ public class PhoenixHBaseAccessor {
     return metricHostAggregate;
   }
 
-  public static MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs)
-    throws SQLException {
-    MetricClusterAggregate agg = new MetricClusterAggregate();
-    agg.setSum(rs.getDouble("METRIC_SUM"));
-    agg.setMax(rs.getDouble("METRIC_MAX"));
-    agg.setMin(rs.getDouble("METRIC_MIN"));
-    agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
-
-    agg.setDeviation(0.0);
+  private Connection getConnectionRetryingOnException()
+    throws SQLException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try{
+        return getConnection();
+      } catch (SQLException e) {
+        if(!retryCounter.shouldRetry()){
+          LOG.error("HBaseAccessor getConnection failed after "
+            + retryCounter.getMaxAttempts() + " attempts");
+          throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+    }
+  }
 
-    return agg;
+  /**
+   * Get JDBC connection to HBase store. Assumption is that the hbase
+   * configuration is present on the classpath and loaded by the caller into
+   * the Configuration object.
+   * Phoenix already caches the HConnection between the client and HBase
+   * cluster.
+   *
+   * @return @java.sql.Connection
+   */
+  public Connection getConnection() throws SQLException {
+    return dataSource.getConnection();
   }
 
   protected void initMetricSchema() {
@@ -269,24 +257,33 @@ public class PhoenixHBaseAccessor {
     String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");
     String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");
     String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");
+    String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000");
     String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000");
     String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000");
+    String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "31536000");
 
     try {
       LOG.info("Initializing metrics schema...");
       conn = getConnectionRetryingOnException();
       stmt = conn.createStatement();
 
+      // Host level
       stmt.executeUpdate(String.format(CREATE_METRICS_TABLE_SQL,
         encoding, precisionTtl, compression));
-      stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL,
-        encoding, hostHourTtl, compression));
-      stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL,
-        encoding, hostMinTtl, compression));
+      stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
+        METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding, hostMinTtl, compression));
+      stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
+        METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding, hostHourTtl, compression));
+      stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
+        METRICS_AGGREGATE_DAILY_TABLE_NAME, encoding, hostDailyTtl, compression));
+
+      // Cluster level
       stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
-        encoding, clusterMinTtl, compression));
+        METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding, clusterMinTtl, compression));
+      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
+        METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding, clusterHourTtl, compression));
       stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
-        encoding, clusterHourTtl, compression));
+        METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, clusterDailyTtl, compression));
 
       //alter TTL options to update tables
       stmt.executeUpdate(String.format(ALTER_SQL,
@@ -299,11 +296,17 @@ public class PhoenixHBaseAccessor {
         METRICS_AGGREGATE_HOURLY_TABLE_NAME,
         hostHourTtl));
       stmt.executeUpdate(String.format(ALTER_SQL,
+        METRICS_AGGREGATE_DAILY_TABLE_NAME,
+        hostDailyTtl));
+      stmt.executeUpdate(String.format(ALTER_SQL,
         METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
         clusterMinTtl));
       stmt.executeUpdate(String.format(ALTER_SQL,
         METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
         clusterHourTtl));
+      stmt.executeUpdate(String.format(ALTER_SQL,
+        METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME,
+        clusterDailyTtl));
 
       conn.commit();
     } catch (SQLException sql) {
@@ -726,9 +729,8 @@ public class PhoenixHBaseAccessor {
     }
   }
 
-  public void saveHostAggregateRecords(Map<TimelineMetric,
-    MetricHostAggregate> hostAggregateMap, String phoenixTableName)
-    throws SQLException {
+  public void saveHostAggregateRecords(Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
+                                       String phoenixTableName) throws SQLException {
 
     if (hostAggregateMap == null || hostAggregateMap.isEmpty()) {
       LOG.debug("Empty aggregate records.");
@@ -809,9 +811,8 @@ public class PhoenixHBaseAccessor {
    *
    * @throws SQLException
    */
-  public void saveClusterAggregateRecords(
-    Map<TimelineClusterMetric, MetricClusterAggregate> records)
-    throws SQLException {
+  public void saveClusterAggregateRecords(Map<TimelineClusterMetric, MetricClusterAggregate> records)
+      throws SQLException {
 
     if (records == null || records.isEmpty()) {
       LOG.debug("Empty aggregate records.");
@@ -819,11 +820,11 @@ public class PhoenixHBaseAccessor {
     }
 
     long start = System.currentTimeMillis();
-
+    String sqlStr = String.format(UPSERT_CLUSTER_AGGREGATE_SQL, METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
     Connection conn = getConnection();
     PreparedStatement stmt = null;
     try {
-      stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+      stmt = conn.prepareStatement(sqlStr);
       int rowCount = 0;
 
       for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
@@ -892,10 +893,8 @@ public class PhoenixHBaseAccessor {
    *
    * @throws SQLException
    */
-  public void saveClusterAggregateHourlyRecords(
-    Map<TimelineClusterMetric, MetricHostAggregate> records,
-    String tableName)
-    throws SQLException {
+  public void saveClusterTimeAggregateRecords(Map<TimelineClusterMetric, MetricHostAggregate> records,
+                                              String tableName) throws SQLException {
     if (records == null || records.isEmpty()) {
       LOG.debug("Empty aggregate records.");
       return;
@@ -906,12 +905,10 @@ public class PhoenixHBaseAccessor {
     Connection conn = getConnection();
     PreparedStatement stmt = null;
     try {
-      stmt = conn.prepareStatement(String.format
-        (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
+      stmt = conn.prepareStatement(String.format(UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
       int rowCount = 0;
 
-      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
-        aggregateEntry : records.entrySet()) {
+      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> aggregateEntry : records.entrySet()) {
         TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
         MetricHostAggregate aggregate = aggregateEntry.getValue();
 
@@ -928,7 +925,6 @@ public class PhoenixHBaseAccessor {
         stmt.setLong(4, clusterMetric.getTimestamp());
         stmt.setString(5, clusterMetric.getType());
         stmt.setDouble(6, aggregate.getSum());
-//        stmt.setInt(7, aggregate.getNumberOfHosts());
         stmt.setLong(7, aggregate.getNumberOfSamples());
         stmt.setDouble(8, aggregate.getMax());
         stmt.setDouble(9, aggregate.getMin());

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java
index c0e1ebc..ee0e87c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java
@@ -26,7 +26,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 public enum Precision {
   SECONDS,
   MINUTES,
-  HOURS;
+  HOURS,
+  DAYS;
 
   public static class PrecisionFormatException extends IllegalArgumentException {
     public PrecisionFormatException(String message, Throwable cause) {
@@ -41,7 +42,8 @@ public enum Precision {
     try {
       return Precision.valueOf(precision.toUpperCase());
     } catch (IllegalArgumentException e) {
-      throw new PrecisionFormatException("precision should be seconds, minutes or hours", e);
+      throw new PrecisionFormatException("precision should be seconds, " +
+        "minutes, hours or days", e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index b72aa64..0595c20 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -54,15 +54,25 @@ public class TimelineMetricConfiguration {
 
   public static final String PRECISION_TABLE_TTL =
     "timeline.metrics.host.aggregator.ttl";
+
   public static final String HOST_MINUTE_TABLE_TTL =
     "timeline.metrics.host.aggregator.minute.ttl";
+
+  public static final String HOST_DAILY_TABLE_TTL =
+    "timeline.metrics.host.aggregator.daily.ttl";
+
   public static final String HOST_HOUR_TABLE_TTL =
     "timeline.metrics.host.aggregator.hourly.ttl";
+
   public static final String CLUSTER_MINUTE_TABLE_TTL =
     "timeline.metrics.cluster.aggregator.minute.ttl";
+
   public static final String CLUSTER_HOUR_TABLE_TTL =
     "timeline.metrics.cluster.aggregator.hourly.ttl";
 
+  public static final String CLUSTER_DAILY_TABLE_TTL =
+    "timeline.metrics.cluster.aggregator.daily.ttl";
+
   public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL =
     "timeline.metrics.cluster.aggregator.minute.timeslice.interval";
 
@@ -78,26 +88,35 @@ public class TimelineMetricConfiguration {
   public static final String HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL =
     "timeline.metrics.host.aggregator.hourly.interval";
 
+  public static final String HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL =
+    "timeline.metrics.host.aggregator.daily.interval";
+
   public static final String CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL =
     "timeline.metrics.cluster.aggregator.minute.interval";
 
   public static final String CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL =
     "timeline.metrics.cluster.aggregator.hourly.interval";
 
+  public static final String CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL =
+    "timeline.metrics.cluster.aggregator.daily.interval";
+
   public static final String HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
     "timeline.metrics.host.aggregator.minute.checkpointCutOffMultiplier";
 
   public static final String HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
     "timeline.metrics.host.aggregator.hourly.checkpointCutOffMultiplier";
 
+  public static final String HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.host.aggregator.daily.checkpointCutOffMultiplier";
+
   public static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
     "timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier";
 
   public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
     "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier";
 
-  public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL =
-    "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffInterval";
+  public static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.cluster.aggregator.daily.checkpointCutOffMultiplier";
 
   public static final String GLOBAL_RESULT_LIMIT =
     "timeline.metrics.service.default.result.limit";
@@ -114,12 +133,18 @@ public class TimelineMetricConfiguration {
   public static final String HOST_AGGREGATOR_HOUR_DISABLED =
     "timeline.metrics.host.aggregator.hourly.disabled";
 
+  public static final String HOST_AGGREGATOR_DAILY_DISABLED =
+    "timeline.metrics.host.aggregator.hourly.disabled";
+
   public static final String CLUSTER_AGGREGATOR_MINUTE_DISABLED =
     "timeline.metrics.cluster.aggregator.minute.disabled";
 
   public static final String CLUSTER_AGGREGATOR_HOUR_DISABLED =
     "timeline.metrics.cluster.aggregator.hourly.disabled";
 
+  public static final String CLUSTER_AGGREGATOR_DAILY_DISABLED =
+    "timeline.metrics.cluster.aggregator.daily.disabled";
+
   public static final String DISABLE_APPLICATION_TIMELINE_STORE =
     "timeline.service.disable.application.timeline.store";
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index 07717a8..415471d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -41,19 +41,21 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
  * Base class for all runnable aggregators. Provides common functions like
  * check pointing and scheduling.
  */
-public abstract class AbstractTimelineAggregator implements Runnable {
+public abstract class AbstractTimelineAggregator implements TimelineMetricAggregator {
   protected final PhoenixHBaseAccessor hBaseAccessor;
   private final Log LOG;
-
   private Clock clock;
   protected final long checkpointDelayMillis;
   protected final Integer resultsetFetchSize;
   protected Configuration metricsConf;
 
-  public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
-                                    Configuration metricsConf) {
-    this(hBaseAccessor, metricsConf, new SystemClock());
-  }
+  private String checkpointLocation;
+  private Long sleepIntervalMillis;
+  private Integer checkpointCutOffMultiplier;
+  private String aggregatorDisableParam;
+  protected String tableName;
+  protected String outputTableName;
+  protected Long nativeTimeRangeDelay;
 
   public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
                                     Configuration metricsConf, Clock clk) {
@@ -66,6 +68,30 @@ public abstract class AbstractTimelineAggregator implements Runnable {
     this.clock = clk;
   }
 
+  public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                    Configuration metricsConf) {
+    this(hBaseAccessor, metricsConf, new SystemClock());
+  }
+
+  public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                    Configuration metricsConf,
+                                    String checkpointLocation,
+                                    Long sleepIntervalMillis,
+                                    Integer checkpointCutOffMultiplier,
+                                    String aggregatorDisableParam,
+                                    String tableName,
+                                    String outputTableName,
+                                    Long nativeTimeRangeDelay) {
+    this(hBaseAccessor, metricsConf);
+    this.checkpointLocation = checkpointLocation;
+    this.sleepIntervalMillis = sleepIntervalMillis;
+    this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+    this.aggregatorDisableParam = aggregatorDisableParam;
+    this.tableName = tableName;
+    this.outputTableName = outputTableName;
+    this.nativeTimeRangeDelay =  nativeTimeRangeDelay;
+  }
+
   @Override
   public void run() {
     LOG.info("Started Timeline aggregator thread @ " + new Date());
@@ -198,6 +224,7 @@ public abstract class AbstractTimelineAggregator implements Runnable {
    * @param startTime Sample start time
    * @param endTime Sample end time
    */
+  @Override
   public boolean doWork(long startTime, long endTime) {
     LOG.info("Start aggregation cycle @ " + new Date() + ", " +
       "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
@@ -257,18 +284,25 @@ public abstract class AbstractTimelineAggregator implements Runnable {
 
   protected abstract Condition prepareMetricQueryCondition(long startTime, long endTime);
 
-  protected abstract void aggregate(ResultSet rs, long startTime, long endTime)
-    throws IOException, SQLException;
+  protected abstract void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException;
 
-  protected abstract Long getSleepIntervalMillis();
+  protected Long getSleepIntervalMillis() {
+    return sleepIntervalMillis;
+  }
 
-  protected abstract Integer getCheckpointCutOffMultiplier();
+  protected Integer getCheckpointCutOffMultiplier() {
+    return checkpointCutOffMultiplier;
+  }
 
   protected Long getCheckpointCutOffIntervalMillis() {
     return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
   }
 
-  public abstract boolean isDisabled();
+  public boolean isDisabled() {
+    return metricsConf.getBoolean(aggregatorDisableParam, false);
+  }
 
-  protected abstract String getCheckpointLocation();
+  protected String getCheckpointLocation() {
+    return checkpointLocation;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java
deleted file mode 100644
index 3df88d2..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-public class TimelineClusterMetricReader {
-
-  private boolean ignoreInstance;
-
-  public TimelineClusterMetricReader(boolean ignoreInstance) {
-    this.ignoreInstance = ignoreInstance;
-  }
-
-  public TimelineClusterMetric fromResultSet(ResultSet rs)
-    throws SQLException {
-
-    return new TimelineClusterMetric(
-      rs.getString("METRIC_NAME"),
-      rs.getString("APP_ID"),
-      ignoreInstance ? null : rs.getString("INSTANCE_ID"),
-      rs.getLong("SERVER_TIME"),
-      rs.getString("UNITS"));
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
index a2887ea..96be48d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
@@ -1,3 +1,5 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -6,142 +8,27 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-
-public class TimelineMetricAggregator extends AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog
-    (TimelineMetricAggregator.class);
-
-  private final String checkpointLocation;
-  private final Long sleepIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-  private final String hostAggregatorDisabledParam;
-  private final String tableName;
-  private final String outputTableName;
-  private final Long nativeTimeRangeDelay;
-
-  public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
-                                  Configuration metricsConf,
-                                  String checkpointLocation,
-                                  Long sleepIntervalMillis,
-                                  Integer checkpointCutOffMultiplier,
-                                  String hostAggregatorDisabledParam,
-                                  String tableName,
-                                  String outputTableName,
-                                  Long nativeTimeRangeDelay) {
-    super(hBaseAccessor, metricsConf);
-    this.checkpointLocation = checkpointLocation;
-    this.sleepIntervalMillis = sleepIntervalMillis;
-    this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
-    this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
-    this.tableName = tableName;
-    this.outputTableName = outputTableName;
-    this.nativeTimeRangeDelay =  nativeTimeRangeDelay;
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime)
-    throws IOException, SQLException {
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-      aggregateMetricsFromResultSet(rs);
-
-    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-    hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
-      outputTableName);
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
-      endTime, null, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
-      tableName));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("HOSTNAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet
-      (ResultSet rs) throws IOException, SQLException {
-    TimelineMetric existingMetric = null;
-    MetricHostAggregate hostAggregate = null;
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-      new HashMap<TimelineMetric, MetricHostAggregate>();
-
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
-      MetricHostAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
-      if (existingMetric == null) {
-        // First row
-        existingMetric = currentMetric;
-        hostAggregate = new MetricHostAggregate();
-        hostAggregateMap.put(currentMetric, hostAggregate);
-      }
-
-      if (existingMetric.equalsExceptTime(currentMetric)) {
-        // Recalculate totals with current metric
-        hostAggregate.updateAggregates(currentHostAggregate);
-      } else {
-        // Switched over to a new metric - save existing - create new aggregate
-        hostAggregate = new MetricHostAggregate();
-        hostAggregate.updateAggregates(currentHostAggregate);
-        hostAggregateMap.put(currentMetric, hostAggregate);
-        existingMetric = currentMetric;
-      }
-    }
-    return hostAggregateMap;
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  public boolean isDisabled() {
-    return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
-  }
+public interface TimelineMetricAggregator extends Runnable {
+  /**
+   * Aggregate metric data within the time bounds.
+   * @param startTime start time millis
+   * @param endTime end time millis
+   * @return success
+   */
+  public boolean doWork(long startTime, long endTime);
+
+  /**
+   * Is aggregator is disabled by configuration.
+   * @return true/false
+   */
+  public boolean isDisabled();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index a0e4e32..642fcfe 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -22,10 +22,20 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
@@ -33,20 +43,42 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 
+/**
+ * Factory class that knows how to create a aggregator instance using
+ * @TimelineMetricConfiguration
+ */
 public class TimelineMetricAggregatorFactory {
-  private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+  private static final String HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE =
     "timeline-metrics-host-aggregator-checkpoint";
-  private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+  private static final String HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE =
     "timeline-metrics-host-aggregator-hourly-checkpoint";
+  private static final String HOST_AGGREGATE_DAILY_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-daily-checkpoint";
+  private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-checkpoint";
+  private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-hourly-checkpoint";
+  private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-daily-checkpoint";
 
+  /**
+   * Minute based aggregation for hosts.
+   */
   public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
     String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      MINUTE_AGGREGATE_CHECKPOINT_FILE);
+      HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE);
     long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
       (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
 
@@ -57,7 +89,7 @@ public class TimelineMetricAggregatorFactory {
     String inputTableName = METRICS_RECORD_TABLE_NAME;
     String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
 
-    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+    return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
       checkpointCutOffMultiplier,
@@ -67,13 +99,16 @@ public class TimelineMetricAggregatorFactory {
       120000l);
   }
 
+  /**
+   * Hourly aggregation for hosts.
+   */
   public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
     String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+      HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE);
     long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
       (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
 
@@ -84,7 +119,37 @@ public class TimelineMetricAggregatorFactory {
     String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
     String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 
-    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+    return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      3600000l);
+  }
+
+  /**
+   * Daily aggregation for hosts.
+   */
+  public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      HOST_AGGREGATE_DAILY_CHECKPOINT_FILE);
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_DAILY_DISABLED;
+
+    String inputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_DAILY_TABLE_NAME;
+
+    return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
       checkpointCutOffMultiplier,
@@ -94,5 +159,110 @@ public class TimelineMetricAggregatorFactory {
       3600000l);
   }
 
+  /**
+   * Minute based aggregation for cluster.
+   */
+  public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
+      PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
+
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
+
+    long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
+      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
+
+    int checkpointCutOffMultiplier =
+      metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+    String inputTableName = METRICS_RECORD_TABLE_NAME;
+    String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+
+    // Minute based aggregation have added responsibility of time slicing
+    return new TimelineMetricClusterAggregatorMinute(
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      aggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l,
+      timeSliceIntervalMillis
+    );
+  }
+
+  /**
+   * Hourly aggregation for cluster.
+   */
+  public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
+
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+    String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+    String outputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_HOUR_DISABLED;
+
+    return new TimelineMetricClusterAggregator(
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      aggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l
+    );
+  }
+
+  /**
+   * Daily aggregation for cluster.
+   */
+  public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE);
+
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1);
 
+    String inputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+    String outputTableName = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_DAILY_DISABLED;
+
+    return new TimelineMetricClusterAggregator(
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      aggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l
+    );
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 68b2ba9..9ed11e1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -17,12 +17,9 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
-
-import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
@@ -30,74 +27,32 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 
-/**
- * Aggregates a metric across all hosts in the cluster. Reads metrics from
- * the precision table and saves into the aggregate.
- */
 public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+  private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true);
   private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
-  private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
-    "timeline-metrics-cluster-aggregator-checkpoint";
-  private final String checkpointLocation;
-  private final Long sleepIntervalMillis;
-  public final int timeSliceIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-  private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
-  // Aggregator to perform app-level aggregates for host metrics
-  private final TimelineMetricAppAggregator appAggregator;
+  private final boolean isClusterPrecisionInputTable;
 
   public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
-                                         Configuration metricsConf) {
-    super(hBaseAccessor, metricsConf);
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    checkpointLocation = FilenameUtils.concat(checkpointDir,
-      CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
-
-    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
-    timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt
-      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
-    checkpointCutOffMultiplier =
-      metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-
-    appAggregator = new TimelineMetricAppAggregator(metricsConf);
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime)
-    throws SQLException, IOException {
-    List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
-    // Initialize app aggregates for host metrics
-    appAggregator.init();
-    Map<TimelineClusterMetric, MetricClusterAggregate>
-      aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
-
-    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
-    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
-    appAggregator.cleanup();
+                                         Configuration metricsConf,
+                                         String checkpointLocation,
+                                         Long sleepIntervalMillis,
+                                         Integer checkpointCutOffMultiplier,
+                                         String hostAggregatorDisabledParam,
+                                         String inputTableName,
+                                         String outputTableName,
+                                         Long nativeTimeRangeDelay) {
+    super(hBaseAccessor, metricsConf, checkpointLocation,
+      sleepIntervalMillis, checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam, inputTableName, outputTableName,
+      nativeTimeRangeDelay);
+    isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
   }
 
   @Override
@@ -106,9 +61,17 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
       endTime, null, null, true);
     condition.setNoLimit();
     condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_SQL,
+    String sqlStr = String.format(GET_CLUSTER_AGGREGATE_TIME_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
-      METRICS_RECORD_TABLE_NAME));
+      tableName);
+    // HOST_COUNT vs METRIC_COUNT
+    if (isClusterPrecisionInputTable) {
+      sqlStr = String.format(GET_CLUSTER_AGGREGATE_SQL,
+        PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+        tableName);
+    }
+
+    condition.setStatement(sqlStr);
     condition.addOrderByColumn("METRIC_NAME");
     condition.addOrderByColumn("APP_ID");
     condition.addOrderByColumn("INSTANCE_ID");
@@ -116,120 +79,58 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
     return condition;
   }
 
-  private List<Long[]> getTimeSlices(long startTime, long endTime) {
-    List<Long[]> timeSlices = new ArrayList<Long[]>();
-    long sliceStartTime = startTime;
-    while (sliceStartTime < endTime) {
-      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
-      sliceStartTime += timeSliceIntervalMillis;
-    }
-    return timeSlices;
-  }
-
-  private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
-      throws SQLException, IOException {
-    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
-      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-    // Create time slices
-
-    while (rs.next()) {
-      TimelineMetric metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
-
-      Map<TimelineClusterMetric, Double> clusterMetrics =
-        sliceFromTimelineMetric(metric, timeSlices);
-
-      if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
-        for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
-            clusterMetrics.entrySet()) {
-
-          TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
-          Double avgValue = clusterMetricEntry.getValue();
-
-          MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
-
-          if (aggregate == null) {
-            aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
-            aggregateClusterMetrics.put(clusterMetric, aggregate);
-          } else {
-            aggregate.updateSum(avgValue);
-            aggregate.updateNumberOfHosts(1);
-            aggregate.updateMax(avgValue);
-            aggregate.updateMin(avgValue);
-          }
-          // Update app level aggregates
-          appAggregator.processTimelineClusterMetric(clusterMetric,
-            metric.getHostName(), avgValue);
-        }
-      }
-    }
-    // Add app level aggregates to save
-    aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
-    return aggregateClusterMetrics;
-  }
-
   @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs);
 
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterTimeAggregateRecords(hostAggregateMap, outputTableName);
   }
 
-  @Override
-  public boolean isDisabled() {
-    return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false);
-  }
+  private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs)
+    throws IOException, SQLException {
 
-  private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
-        TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+    TimelineClusterMetric existingMetric = null;
+    MetricHostAggregate hostAggregate = null;
+    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
+      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
 
-    if (timelineMetric.getMetricValues().isEmpty()) {
-      return null;
-    }
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
+
+      MetricClusterAggregate currentHostAggregate =
+        isClusterPrecisionInputTable ?
+          readHelper.getMetricClusterAggregateFromResultSet(rs) :
+          readHelper.getMetricClusterTimeAggregateFromResultSet(rs);
+
+      if (existingMetric == null) {
+        // First row
+        existingMetric = currentMetric;
+        hostAggregate = new MetricHostAggregate();
+        hostAggregateMap.put(currentMetric, hostAggregate);
+      }
 
-    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
-      new HashMap<TimelineClusterMetric, Double>();
+      if (existingMetric.equalsExceptTime(currentMetric)) {
+        // Recalculate totals with current metric
+        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
 
-    for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
-      // TODO: investigate null values - pre filter
-      if (metric.getValue() == null) {
-        continue;
-      }
-      Long timestamp = getSliceTimeForMetric(timeSlices,
-                       Long.parseLong(metric.getKey().toString()));
-      if (timestamp != -1) {
-        // Metric is within desired time range
-        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
-          timelineMetric.getMetricName(),
-          timelineMetric.getAppId(),
-          timelineMetric.getInstanceId(),
-          timestamp,
-          timelineMetric.getType());
-        if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
-          timelineClusterMetricMap.put(clusterMetric, metric.getValue());
-        } else {
-          Double oldValue = timelineClusterMetricMap.get(clusterMetric);
-          Double newValue = (oldValue + metric.getValue()) / 2;
-          timelineClusterMetricMap.put(clusterMetric, newValue);
-        }
+      } else {
+        // Switched over to a new metric - save existing
+        hostAggregate = new MetricHostAggregate();
+        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        existingMetric = currentMetric;
       }
+
     }
 
-    return timelineClusterMetricMap;
+    return hostAggregateMap;
   }
 
-  /**
-   * Return beginning of the time slice into which the metric fits.
-   */
-  private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
-    for (Long[] timeSlice : timeSlices) {
-      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
-        return timeSlice[0];
-      }
-    }
-    return -1l;
+  private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) {
+    agg.updateMax(currentClusterAggregate.getMax());
+    agg.updateMin(currentClusterAggregate.getMin());
+    agg.updateSum(currentClusterAggregate.getSum());
+    agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
deleted file mode 100644
index 264e4e6..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog
-    (TimelineMetricClusterAggregatorHourly.class);
-  private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
-    "timeline-metrics-cluster-aggregator-hourly-checkpoint";
-  private final String checkpointLocation;
-  private final long sleepIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-  private long checkpointCutOffIntervalMillis;
-  private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour
-  private final TimelineClusterMetricReader timelineClusterMetricReader
-     = new TimelineClusterMetricReader(true);
-
-  public TimelineMetricClusterAggregatorHourly(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
-    super(hBaseAccessor, metricsConf);
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    checkpointLocation = FilenameUtils.concat(checkpointDir,
-      CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
-
-    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
-    checkpointCutOffIntervalMillis =  SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
-    checkpointCutOffMultiplier = metricsConf.getInt
-      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime)
-    throws SQLException, IOException {
-      Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
-        aggregateMetricsFromResultSet(rs);
-
-    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-    hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
-      METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime,
-                                                  long endTime) {
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
-      endTime, null, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
-        METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs)
-      throws IOException, SQLException {
-
-    TimelineClusterMetric existingMetric = null;
-    MetricHostAggregate hostAggregate = null;
-    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
-      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
-
-    while (rs.next()) {
-      TimelineClusterMetric currentMetric =
-        timelineClusterMetricReader.fromResultSet(rs);
-      MetricClusterAggregate currentHostAggregate =
-        getMetricClusterAggregateFromResultSet(rs);
-
-      if (existingMetric == null) {
-        // First row
-        existingMetric = currentMetric;
-        hostAggregate = new MetricHostAggregate();
-        hostAggregateMap.put(currentMetric, hostAggregate);
-      }
-
-      if (existingMetric.equalsExceptTime(currentMetric)) {
-        // Recalculate totals with current metric
-        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
-
-      } else {
-        // Switched over to a new metric - save existing
-        hostAggregate = new MetricHostAggregate();
-        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
-        hostAggregateMap.put(currentMetric, hostAggregate);
-        existingMetric = currentMetric;
-      }
-
-    }
-
-    return hostAggregateMap;
-  }
-
-  private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) {
-    agg.updateMax(currentClusterAggregate.getMax());
-    agg.updateMin(currentClusterAggregate.getMin());
-    agg.updateSum(currentClusterAggregate.getSum());
-    agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  protected Long getCheckpointCutOffIntervalMillis() {
-    return checkpointCutOffIntervalMillis;
-  }
-
-  @Override
-  public boolean isDisabled() {
-    return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false);
-  }
-
-
-}