You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/11/04 06:31:20 UTC

ambari git commit: AMBARI-13701. Introduce cluster wide MINUTE aggregator in Ambari Metrics service. (Aravindan Vijayan via swagle)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.1 ee7c3bdc2 -> 2a89def1c


AMBARI-13701. Introduce cluster wide MINUTE aggregator in Ambari Metrics service. (Aravindan Vijayan via swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2a89def1
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2a89def1
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2a89def1

Branch: refs/heads/branch-2.1
Commit: 2a89def1c26e85f65ab441223f1a76c5b6d1c849
Parents: ee7c3bd
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Nov 3 21:31:03 2015 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Nov 3 21:31:03 2015 -0800

----------------------------------------------------------------------
 .../timeline/HBaseTimelineMetricStore.java      |  10 +-
 .../metrics/timeline/PhoenixHBaseAccessor.java  |  33 ++-
 .../timeline/TimelineMetricConfiguration.java   |  14 +-
 .../TimelineMetricAggregatorFactory.java        |  79 +++++-
 .../TimelineMetricClusterAggregatorMinute.java  | 248 -------------------
 .../TimelineMetricClusterAggregatorSecond.java  | 248 +++++++++++++++++++
 .../timeline/query/PhoenixTransactSQL.java      |   9 +-
 .../metrics/timeline/ITClusterAggregator.java   |  92 ++++++-
 .../timeline/ITPhoenixHBaseAccessor.java        |   7 +-
 .../cache/TimelineMetricCacheEntryFactory.java  |  19 +-
 .../server/upgrade/UpgradeCatalog213.java       |  24 +-
 .../0.1.0/configuration/ams-site.xml            |  54 +++-
 .../0.1.0/package/scripts/split_points.py       |   2 +-
 .../server/upgrade/UpgradeCatalog213Test.java   |  76 ++++++
 14 files changed, 602 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 17df629..aed5fed 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -76,7 +76,15 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
         LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
       }
 
-      // Start the cluster aggregator minute
+      // Start the cluster aggregator second
+      TimelineMetricAggregator secondClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf);
+      if (!secondClusterAggregator.isDisabled()) {
+        Thread aggregatorThread = new Thread(secondClusterAggregator);
+        aggregatorThread.start();
+      }
+
+      // Start the minute cluster aggregator
       TimelineMetricAggregator minuteClusterAggregator =
         TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
       if (!minuteClusterAggregator.isDisabled()) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 3ce30fd..be06650 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -51,8 +51,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -63,6 +61,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
@@ -76,7 +75,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
@@ -86,6 +85,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
@@ -242,13 +242,14 @@ public class PhoenixHBaseAccessor {
 
     String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
     String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION);
-    String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");
-    String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");
-    String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");
-    String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000");
-    String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000");
-    String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000");
-    String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "31536000");
+    String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");           //1 day
+    String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");          //7 days
+    String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");          //30 days
+    String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000");       //1 year
+    String clusterSecTtl = metricsConf.get(CLUSTER_SECOND_TABLE_TTL, "2592000");     //7 days
+    String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "7776000");   //30 days
+    String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000");   //1 year
+    String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "63072000"); //2 years
 
     try {
       LOG.info("Initializing metrics schema...");
@@ -278,9 +279,11 @@ public class PhoenixHBaseAccessor {
         aggregateSql += getSplitPointsStr(splitPoints);
       }
       stmt.executeUpdate(aggregateSql);
-      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
+      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
+        METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding, clusterHourTtl, compression));
+      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
         METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding, clusterHourTtl, compression));
-      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
+      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
         METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, clusterDailyTtl, compression));
 
       //alter TTL options to update tables
@@ -298,6 +301,9 @@ public class PhoenixHBaseAccessor {
         hostDailyTtl));
       stmt.executeUpdate(String.format(ALTER_SQL,
         METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
+        clusterSecTtl));
+      stmt.executeUpdate(String.format(ALTER_SQL,
+        METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME,
         clusterMinTtl));
       stmt.executeUpdate(String.format(ALTER_SQL,
         METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
@@ -664,7 +670,8 @@ public class PhoenixHBaseAccessor {
     for (Function aggregateFunction : functions) {
       SingleValuedTimelineMetric metric;
 
-      if (condition.getPrecision() == Precision.HOURS
+      if (condition.getPrecision() == Precision.MINUTES
+          || condition.getPrecision() == Precision.HOURS
           || condition.getPrecision() == Precision.DAYS) {
         metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, false);
       } else {

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 30e42f2..fd51f3d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -64,6 +64,9 @@ public class TimelineMetricConfiguration {
   public static final String HOST_HOUR_TABLE_TTL =
     "timeline.metrics.host.aggregator.hourly.ttl";
 
+  public static final String CLUSTER_SECOND_TABLE_TTL =
+    "timeline.metrics.cluster.aggregator.second.ttl";
+
   public static final String CLUSTER_MINUTE_TABLE_TTL =
     "timeline.metrics.cluster.aggregator.minute.ttl";
 
@@ -74,7 +77,7 @@ public class TimelineMetricConfiguration {
     "timeline.metrics.cluster.aggregator.daily.ttl";
 
   public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL =
-    "timeline.metrics.cluster.aggregator.minute.timeslice.interval";
+    "timeline.metrics.cluster.aggregator.second.timeslice.interval";
 
   public static final String AGGREGATOR_CHECKPOINT_DELAY =
     "timeline.metrics.service.checkpointDelay";
@@ -91,6 +94,9 @@ public class TimelineMetricConfiguration {
   public static final String HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL =
     "timeline.metrics.host.aggregator.daily.interval";
 
+  public static final String CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL =
+    "timeline.metrics.cluster.aggregator.second.interval";
+
   public static final String CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL =
     "timeline.metrics.cluster.aggregator.minute.interval";
 
@@ -109,6 +115,9 @@ public class TimelineMetricConfiguration {
   public static final String HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER =
     "timeline.metrics.host.aggregator.daily.checkpointCutOffMultiplier";
 
+  public static final String CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier";
+
   public static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
     "timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier";
 
@@ -136,6 +145,9 @@ public class TimelineMetricConfiguration {
   public static final String HOST_AGGREGATOR_DAILY_DISABLED =
     "timeline.metrics.host.aggregator.hourly.disabled";
 
+  public static final String CLUSTER_AGGREGATOR_SECOND_DISABLED =
+    "timeline.metrics.cluster.aggregator.second.disabled";
+
   public static final String CLUSTER_AGGREGATOR_MINUTE_DISABLED =
     "timeline.metrics.cluster.aggregator.minute.disabled";
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index f07918c..ba019fa 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -28,8 +28,11 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
@@ -49,6 +52,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 
@@ -65,6 +69,8 @@ public class TimelineMetricAggregatorFactory {
     "timeline-metrics-host-aggregator-daily-checkpoint";
   private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
     "timeline-metrics-cluster-aggregator-checkpoint";
+  private static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-minute-checkpoint";
   private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
     "timeline-metrics-cluster-aggregator-hourly-checkpoint";
   private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE =
@@ -76,6 +82,7 @@ public class TimelineMetricAggregatorFactory {
 
   /**
    * Minute based aggregation for hosts.
+   * Interval : 5 mins
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -119,6 +126,7 @@ public class TimelineMetricAggregatorFactory {
 
   /**
    * Hourly aggregation for hosts.
+   * Interval : 1 hour
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -162,6 +170,7 @@ public class TimelineMetricAggregatorFactory {
 
   /**
    * Daily aggregation for hosts.
+   * Interval : 1 day
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -204,10 +213,12 @@ public class TimelineMetricAggregatorFactory {
   }
 
   /**
-   * Minute based aggregation for cluster.
+   * Second aggregation for cluster.
+   * Interval : 2 mins
+   * Timeslice : 30 sec
    */
-  public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
-      PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+  public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -216,20 +227,20 @@ public class TimelineMetricAggregatorFactory {
       CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
 
     long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
+      (CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120l));
 
     long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
-      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
+      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
 
     int checkpointCutOffMultiplier =
-      metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+      metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
 
     String inputTableName = METRICS_RECORD_TABLE_NAME;
     String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_SECOND_DISABLED;
 
-    // Minute based aggregation have added responsibility of time slicing
-    return new TimelineMetricClusterAggregatorMinute(
+    // Second based aggregation have added responsibility of time slicing
+    return new TimelineMetricClusterAggregatorSecond(
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -243,7 +254,56 @@ public class TimelineMetricAggregatorFactory {
   }
 
   /**
+   * Minute aggregation for cluster.
+   * Interval : 5 mins
+   */
+  public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE);
+
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+    String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+    String outputTableName = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        aggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l
+      );
+    }
+
+    return new TimelineMetricClusterAggregator(
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      aggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l
+    );
+  }
+
+  /**
    * Hourly aggregation for cluster.
+   * Interval : 1 hour
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -291,6 +351,7 @@ public class TimelineMetricAggregatorFactory {
 
   /**
    * Daily aggregation for cluster.
+   * Interval : 1 day
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
deleted file mode 100644
index 85bdbbc..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
-
-/**
- * Aggregates a metric across all hosts in the cluster. Reads metrics from
- * the precision table and saves into the aggregate.
- */
-public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregatorMinute.class);
-  public Long timeSliceIntervalMillis;
-  private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
-  // Aggregator to perform app-level aggregates for host metrics
-  private final TimelineMetricAppAggregator appAggregator;
-  // 1 minute client side buffering adjustment
-  private final Long serverTimeShiftAdjustment;
-
-  public TimelineMetricClusterAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor,
-                                               Configuration metricsConf,
-                                               String checkpointLocation,
-                                               Long sleepIntervalMillis,
-                                               Integer checkpointCutOffMultiplier,
-                                               String aggregatorDisabledParam,
-                                               String tableName,
-                                               String outputTableName,
-                                               Long nativeTimeRangeDelay,
-                                               Long timeSliceInterval) {
-    super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis,
-      checkpointCutOffMultiplier, aggregatorDisabledParam, tableName,
-      outputTableName, nativeTimeRangeDelay);
-
-    appAggregator = new TimelineMetricAppAggregator(metricsConf);
-    this.timeSliceIntervalMillis = timeSliceInterval;
-    this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
-    // Account for time shift due to client side buffering by shifting the
-    // timestamps with the difference between server time and series start time
-    List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime);
-    // Initialize app aggregates for host metrics
-    appAggregator.init();
-    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
-      aggregateMetricsFromResultSet(rs, timeSlices);
-
-    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
-    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
-    appAggregator.cleanup();
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
-      endTime, null, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
-      METRICS_RECORD_TABLE_NAME));
-    // Retaining order of the row-key avoids client side merge sort.
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("HOSTNAME");
-    condition.addOrderByColumn("SERVER_TIME");
-    condition.addOrderByColumn("APP_ID");
-    return condition;
-  }
-
-  /**
-   * Return time slices to normalize the timeseries data.
-   */
-  private List<Long[]> getTimeSlices(long startTime, long endTime) {
-    List<Long[]> timeSlices = new ArrayList<Long[]>();
-    long sliceStartTime = startTime;
-    while (sliceStartTime < endTime) {
-      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis });
-      sliceStartTime += timeSliceIntervalMillis;
-    }
-    return timeSlices;
-  }
-
-  private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
-      throws SQLException, IOException {
-    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
-      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-
-    TimelineMetric metric = null;
-    if (rs.next()) {
-      metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
-
-      // Call slice after all rows for a host are read
-      while (rs.next()) {
-        TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
-        // If rows belong to same host combine them before slicing. This
-        // avoids issues across rows that belong to same hosts but get
-        // counted as coming from different ones.
-        if (metric.equalsExceptTime(nextMetric)) {
-          metric.addMetricValues(nextMetric.getMetricValues());
-        } else {
-          // Process the current metric
-          processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
-          metric = nextMetric;
-        }
-      }
-    }
-    // Process last metric
-    if (metric != null) {
-      processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
-    }
-
-    // Add app level aggregates to save
-    aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
-    return aggregateClusterMetrics;
-  }
-
-  /**
-   * Slice metric values into interval specified by :
-   * timeline.metrics.cluster.aggregator.minute.timeslice.interval
-   * Normalize value by averaging them within the interval
-   */
-  private void processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
-                                              TimelineMetric metric, List<Long[]> timeSlices) {
-    // Create time slices
-    Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices);
-
-    if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
-      for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
-        clusterMetrics.entrySet()) {
-
-        TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
-        Double avgValue = clusterMetricEntry.getValue();
-
-        MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
-
-        if (aggregate == null) {
-          aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
-          aggregateClusterMetrics.put(clusterMetric, aggregate);
-        } else {
-          aggregate.updateSum(avgValue);
-          aggregate.updateNumberOfHosts(1);
-          aggregate.updateMax(avgValue);
-          aggregate.updateMin(avgValue);
-        }
-        // Update app level aggregates
-        appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue);
-      }
-    }
-  }
-
-  private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
-      TimelineMetric timelineMetric, List<Long[]> timeSlices) {
-
-    if (timelineMetric.getMetricValues().isEmpty()) {
-      return null;
-    }
-
-    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
-      new HashMap<TimelineClusterMetric, Double>();
-
-    Long timeShift = timelineMetric.getTimestamp() - timelineMetric.getStartTime();
-    if (timeShift < 0) {
-      LOG.debug("Invalid time shift found, possible discrepancy in clocks. " +
-        "timeShift = " + timeShift);
-      timeShift = 0l;
-    }
-
-    for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
-      // TODO: investigate null values - pre filter
-      if (metric.getValue() == null) {
-        continue;
-      }
-
-      Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString()));
-      if (timestamp != -1) {
-        // Metric is within desired time range
-        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
-          timelineMetric.getMetricName(),
-          timelineMetric.getAppId(),
-          timelineMetric.getInstanceId(),
-          timestamp,
-          timelineMetric.getType());
-
-        // do a sum / count here to get average for all points in a slice
-        int count = 1;
-        Double sum;
-        if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
-          sum = metric.getValue();
-        } else {
-          count++;
-          Double oldValue = timelineClusterMetricMap.get(clusterMetric);
-          sum = oldValue + metric.getValue();
-        }
-        timelineClusterMetricMap.put(clusterMetric, (sum / count));
-      }
-    }
-
-    return timelineClusterMetricMap;
-  }
-
-  /**
-   * Return beginning of the time slice into which the metric fits.
-   */
-  private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
-    for (Long[] timeSlice : timeSlices) {
-      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
-        return timeSlice[0];
-      }
-    }
-    return -1l;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
new file mode 100644
index 0000000..1c7bf7f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+
+/**
+ * Aggregates a metric across all hosts in the cluster. Reads metrics from
+ * the precision table and saves into the aggregate.
+ */
+public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregatorSecond.class);
+  public Long timeSliceIntervalMillis;
+  private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
+  // Aggregator to perform app-level aggregates for host metrics
+  private final TimelineMetricAppAggregator appAggregator;
+  // 1 minute client side buffering adjustment
+  private final Long serverTimeShiftAdjustment;
+
+  public TimelineMetricClusterAggregatorSecond(PhoenixHBaseAccessor hBaseAccessor,
+                                               Configuration metricsConf,
+                                               String checkpointLocation,
+                                               Long sleepIntervalMillis,
+                                               Integer checkpointCutOffMultiplier,
+                                               String aggregatorDisabledParam,
+                                               String tableName,
+                                               String outputTableName,
+                                               Long nativeTimeRangeDelay,
+                                               Long timeSliceInterval) {
+    super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis,
+      checkpointCutOffMultiplier, aggregatorDisabledParam, tableName,
+      outputTableName, nativeTimeRangeDelay);
+
+    appAggregator = new TimelineMetricAppAggregator(metricsConf);
+    this.timeSliceIntervalMillis = timeSliceInterval;
+    this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
+    // Account for time shift due to client side buffering by shifting the
+    // timestamps with the difference between server time and series start time
+    List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime);
+    // Initialize app aggregates for host metrics
+    appAggregator.init();
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+      aggregateMetricsFromResultSet(rs, timeSlices);
+
+    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+    appAggregator.cleanup();
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_METRIC_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+      METRICS_RECORD_TABLE_NAME));
+    // Retaining order of the row-key avoids client side merge sort.
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("HOSTNAME");
+    condition.addOrderByColumn("SERVER_TIME");
+    condition.addOrderByColumn("APP_ID");
+    return condition;
+  }
+
+  /**
+   * Return time slices to normalize the timeseries data.
+   */
+  private List<Long[]> getTimeSlices(long startTime, long endTime) {
+    List<Long[]> timeSlices = new ArrayList<Long[]>();
+    long sliceStartTime = startTime;
+    while (sliceStartTime < endTime) {
+      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis });
+      sliceStartTime += timeSliceIntervalMillis;
+    }
+    return timeSlices;
+  }
+
+  private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+      throws SQLException, IOException {
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    TimelineMetric metric = null;
+    if (rs.next()) {
+      metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+
+      // Call slice after all rows for a host are read
+      while (rs.next()) {
+        TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+        // If rows belong to same host combine them before slicing. This
+        // avoids issues across rows that belong to same hosts but get
+        // counted as coming from different ones.
+        if (metric.equalsExceptTime(nextMetric)) {
+          metric.addMetricValues(nextMetric.getMetricValues());
+        } else {
+          // Process the current metric
+          processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+          metric = nextMetric;
+        }
+      }
+    }
+    // Process last metric
+    if (metric != null) {
+      processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+    }
+
+    // Add app level aggregates to save
+    aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
+    return aggregateClusterMetrics;
+  }
+
+  /**
+   * Slice metric values into interval specified by :
+   * timeline.metrics.cluster.aggregator.minute.timeslice.interval
+   * Normalize value by averaging them within the interval
+   */
+  private void processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+                                              TimelineMetric metric, List<Long[]> timeSlices) {
+    // Create time slices
+    Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices);
+
+    if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+      for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+        clusterMetrics.entrySet()) {
+
+        TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+        Double avgValue = clusterMetricEntry.getValue();
+
+        MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+
+        if (aggregate == null) {
+          aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
+          aggregateClusterMetrics.put(clusterMetric, aggregate);
+        } else {
+          aggregate.updateSum(avgValue);
+          aggregate.updateNumberOfHosts(1);
+          aggregate.updateMax(avgValue);
+          aggregate.updateMin(avgValue);
+        }
+        // Update app level aggregates
+        appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue);
+      }
+    }
+  }
+
+  private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+      TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+
+    if (timelineMetric.getMetricValues().isEmpty()) {
+      return null;
+    }
+
+    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+      new HashMap<TimelineClusterMetric, Double>();
+
+    Long timeShift = timelineMetric.getTimestamp() - timelineMetric.getStartTime();
+    if (timeShift < 0) {
+      LOG.debug("Invalid time shift found, possible discrepancy in clocks. " +
+        "timeShift = " + timeShift);
+      timeShift = 0l;
+    }
+
+    for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
+      // TODO: investigate null values - pre filter
+      if (metric.getValue() == null) {
+        continue;
+      }
+
+      Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString()));
+      if (timestamp != -1) {
+        // Metric is within desired time range
+        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+          timelineMetric.getMetricName(),
+          timelineMetric.getAppId(),
+          timelineMetric.getInstanceId(),
+          timestamp,
+          timelineMetric.getType());
+
+        // do a sum / count here to get average for all points in a slice
+        int count = 1;
+        Double sum;
+        if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
+          sum = metric.getValue();
+        } else {
+          count++;
+          Double oldValue = timelineClusterMetricMap.get(clusterMetric);
+          sum = oldValue + metric.getValue();
+        }
+        timelineClusterMetricMap.put(clusterMetric, (sum / count));
+      }
+    }
+
+    return timelineClusterMetricMap;
+  }
+
+  /**
+   * Return beginning of the time slice into which the metric fits.
+   */
+  private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+    for (Long[] timeSlice : timeSlices) {
+      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
+        return timeSlice[0];
+      }
+    }
+    return -1l;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index 092c983..92d59e2 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -86,7 +86,7 @@ public class PhoenixTransactSQL {
       "TTL=%s, COMPRESSION='%s'";
 
   // HOSTS_COUNT vs METRIC_COUNT
-  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
+  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS %s " +
       "(METRIC_NAME VARCHAR, " +
       "APP_ID VARCHAR, " +
@@ -248,6 +248,8 @@ public class PhoenixTransactSQL {
     "METRIC_RECORD_DAILY";
   public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
     "METRIC_AGGREGATE";
+  public static final String METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME =
+    "METRIC_AGGREGATE_MINUTE";
   public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
     "METRIC_AGGREGATE_HOURLY";
   public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME =
@@ -555,7 +557,10 @@ public class PhoenixTransactSQL {
         metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
         queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
         break;
-      //TODO : Include MINUTE case after introducing CLUSTER_AGGREGATOR_MINUTE
+      case MINUTES:
+        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+        queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
+        break;
       default:
         metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
         queryStmt = GET_CLUSTER_AGGREGATE_SQL;

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
index cbf0233..4ddecdc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
@@ -42,15 +42,14 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
@@ -101,7 +100,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateClusterProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -111,7 +110,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
       "disk_free", 1));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
       "disk_free", 2));
-    ctime += minute;
+    ctime += 2*minute;
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
       "disk_free", 2));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
@@ -153,7 +152,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateClusterIgnoringInstance() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -214,21 +213,21 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
       if ("disk_free".equals(currentMetric.getMetricName())) {
         System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate);
         assertEquals(2, currentHostAggregate.getNumberOfHosts());
-        assertEquals(5.0, currentHostAggregate.getSum());
+        assertEquals(5.0, Math.floor(currentHostAggregate.getSum()));
         recordCount++;
       } else {
         fail("Unexpected entry");
       }
     }
 
-    Assert.assertEquals(8, recordCount);
+    Assert.assertEquals(5, recordCount);
   }
 
   @Test
   public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // here we put some metrics tha will be aggregated
@@ -242,7 +241,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
       "disk_used", 1));
 
-    ctime += minute;
+    ctime += 2*minute;
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
       "disk_free", 2));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
@@ -334,6 +333,73 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   }
 
   @Test
+  public void testShouldAggregateClusterOnMinuteProperly() throws Exception {
+
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long second = 1000;
+    long minute = 60*second;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(createEmptyTimelineClusterMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+    agg.doWork(startTime, ctime + second);
+    long oldCtime = ctime + second;
+
+    //Next minute
+    ctime = startTime + minute;
+
+    records.put(createEmptyTimelineClusterMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+    agg.doWork(oldCtime, ctime + second);
+
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE");
+    int count = 0;
+    long diff = 0 ;
+    while (rs.next()) {
+      assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
+      assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      if (count == 0) {
+        diff+=rs.getLong("SERVER_TIME");
+      } else {
+        diff-=rs.getLong("SERVER_TIME");
+        if (diff < 0) {
+          diff*=-1;
+        }
+        assertTrue(diff == minute);
+      }
+      count++;
+    }
+
+    assertEquals("One hourly aggregated row expected ", 2, count);
+  }
+
+  @Test
   public void testShouldAggregateClusterOnHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
@@ -444,7 +510,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     Configuration conf = getConfigurationForTest(false);
     conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, conf);
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, conf);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -483,7 +549,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
       currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs);
       recordCount++;
     }
-    assertEquals(4, recordCount);
+    assertEquals(3, recordCount);
     assertNotNull(currentMetric);
     assertEquals("cpu_user", currentMetric.getMetricName());
     assertEquals("app1", currentMetric.getAppId());
@@ -495,7 +561,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   @Test
   public void testClusterAggregateMetricNormalization() throws Exception {
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // Sample data
@@ -565,7 +631,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
         fail("Unexpected entry");
       }
     }
-    Assert.assertEquals(9, recordCount);
+    Assert.assertEquals(5, recordCount);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 89fee7c..5e7234c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -48,7 +48,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 
 
 public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
@@ -205,7 +204,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
   public void testGetClusterMetricRecordsSeconds() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, new Configuration());
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
@@ -236,7 +235,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     TimelineMetric metric = timelineMetrics.getMetrics().get(0);
 
     assertEquals("disk_free", metric.getMetricName());
-    assertEquals(8, metric.getMetricValues().size());
+    assertEquals(5, metric.getMetricValues().size());
     assertEquals(1.5, metric.getMetricValues().values().iterator().next(), 0.00001);
   }
 
@@ -244,7 +243,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
   public void testGetClusterMetricRecordLatestWithFunction() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, new Configuration());
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
index 7c7db9f..9100afd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
@@ -40,6 +40,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.TreeMap;
 
 @Singleton
@@ -250,24 +251,16 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor
       Long requestedStartTime, Long requestedEndTime, boolean removeAll) {
 
     for (TimelineMetric existingMetric : existingMetrics.getMetrics()) {
-      if(removeAll) {
+      if (removeAll) {
         existingMetric.setMetricValues(new TreeMap<Long, Double>());
       } else {
-        Map<Long, Double> existingMetricValues = existingMetric.getMetricValues();
+        TreeMap<Long, Double> existingMetricValues = existingMetric.getMetricValues();
         LOG.trace("Existing metric: " + existingMetric.getMetricName() +
           " # " + existingMetricValues.size());
 
-        Iterator<Map.Entry<Long, Double>> valueIterator = existingMetricValues.entrySet().iterator();
-
-        // Remove old values
-        // Assumption: All return value are millis
-        while (valueIterator.hasNext()) {
-          Map.Entry<Long, Double> metricEntry = valueIterator.next();
-          if (metricEntry.getKey() < requestedStartTime
-            || metricEntry.getKey() > requestedEndTime) {
-            valueIterator.remove();
-          }
-        }
+        // Retain only the values that are within the [requestStartTime, requestedEndTime] window
+        existingMetricValues.headMap(requestedStartTime,false).clear();
+        existingMetricValues.tailMap(requestedEndTime, false).clear();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
index 8f1a481..5d1e934 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
@@ -85,6 +85,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
   private static final String KAFKA_BROKER = "kafka-broker";
   private static final String AMS_ENV = "ams-env";
   private static final String AMS_HBASE_ENV = "ams-hbase-env";
+  private static final String AMS_SITE = "ams-site";
   private static final String HBASE_ENV_CONFIG = "hbase-env";
   private static final String HIVE_SITE_CONFIG = "hive-site";
   private static final String RANGER_ENV_CONFIG = "ranger-env";
@@ -647,7 +648,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
 
       if (hostDiskUsageAlertDefinitionEntity != null) {
         hostDiskUsageAlertDefinitionEntity.setDescription("This host-level alert is triggered if the amount of disk " +
-            "space used goes above specific thresholds. The default threshold values are 50% for WARNING and 80% for CRITICAL");
+          "space used goes above specific thresholds. The default threshold values are 50% for WARNING and 80% for CRITICAL");
         hostDiskUsageAlertDefinitionEntity.setLabel("Host Disk Usage");
 
         alertDefinitionDAO.merge(hostDiskUsageAlertDefinitionEntity);
@@ -832,6 +833,27 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
             newProperties.put("content", updateAmsHbaseEnvContent(content));
             updateConfigurationPropertiesForCluster(cluster, AMS_HBASE_ENV, newProperties, true, true);
           }
+          Config amsSite = cluster.getDesiredConfigByType(AMS_SITE);
+          if (amsSite != null) {
+            Map<String, String> newProperties = new HashMap<>();
+
+            //Interval
+            newProperties.put("timeline.metrics.cluster.aggregator.second.interval",String.valueOf(120));
+            newProperties.put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(300));
+            newProperties.put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(300));
+
+            //ttl
+            newProperties.put("timeline.metrics.cluster.aggregator.second.ttl", String.valueOf(2592000));
+            newProperties.put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(7776000));
+
+            //checkpoint
+            newProperties.put("timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier", String.valueOf(2));
+
+            //disabled
+            newProperties.put("timeline.metrics.cluster.aggregator.second.disabled", String.valueOf(false));
+
+            updateConfigurationPropertiesForCluster(cluster, AMS_SITE, newProperties, true, true);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index 89b584b..c73a401 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -59,11 +59,11 @@
   </property>
   <property>
     <name>timeline.metrics.host.aggregator.minute.interval</name>
-    <value>120</value>
+    <value>300</value>
     <display-name>Minute host aggregator interval</display-name>
     <description>
       Time in seconds to sleep for the minute resolution host based
-      aggregator. Default resolution is 2 minutes.
+      aggregator. Default resolution is 5 minutes.
     </description>
     <value-attributes>
       <type>int</type>
@@ -111,10 +111,22 @@
   </property>
   <property>
     <name>timeline.metrics.cluster.aggregator.minute.interval</name>
-    <value>120</value>
+    <value>300</value>
     <display-name>Minute cluster aggregator interval</display-name>
     <description>
       Time in seconds to sleep for the minute resolution cluster wide
+      aggregator. Default resolution is 5 minutes.
+    </description>
+    <value-attributes>
+      <type>int</type>
+    </value-attributes>
+  </property>
+  <property>
+    <name>timeline.metrics.cluster.aggregator.second.interval</name>
+    <value>120</value>
+    <display-name>Second cluster aggregator interval</display-name>
+    <description>
+      Time in seconds to sleep for the second resolution cluster wide
       aggregator. Default resolution is 2 minutes.
     </description>
     <value-attributes>
@@ -170,6 +182,19 @@
     </value-attributes>
   </property>
   <property>
+    <name>timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier</name>
+    <value>2</value>
+    <display-name>Second cluster aggregator checkpoint cutOff multiplier</display-name>
+    <description>
+      Multiplier value * interval = Max allowed checkpoint lag. Effectively
+      if aggregator checkpoint is greater than max allowed checkpoint delay,
+      the checkpoint will be discarded by the aggregator.
+    </description>
+    <value-attributes>
+      <type>int</type>
+    </value-attributes>
+  </property>
+  <property>
     <name>timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier</name>
     <value>2</value>
     <display-name>Minute cluster aggregator checkpoint cutOff multiplier</display-name>
@@ -238,11 +263,19 @@
     </description>
   </property>
   <property>
-    <name>timeline.metrics.cluster.aggregator.minute.timeslice.interval</name>
+    <name>timeline.metrics.cluster.aggregator.second.disabled</name>
+    <value>false</value>
+    <display-name>Disable second cluster aggregator</display-name>
+    <description>
+      Disable cluster based second aggregations.
+    </description>
+  </property>
+  <property>
+    <name>timeline.metrics.cluster.aggregator.second.timeslice.interval</name>
     <value>30</value>
-    <display-name>Minute cluster aggregator timeslice interval</display-name>
+    <display-name>Second cluster aggregator timeslice interval</display-name>
     <description>
-      Lowest resolution of desired data for cluster level minute aggregates.
+      Lowest resolution of desired data for cluster level second aggregates.
     </description>
     <value-attributes>
       <type>int</type>
@@ -270,9 +303,16 @@
     </description>
   </property>
   <property>
-    <name>timeline.metrics.cluster.aggregator.minute.ttl</name>
+    <name>timeline.metrics.cluster.aggregator.second.ttl</name>
     <value>2592000</value>
     <description>
+      Cluster wide second resolution data purge interval. Default is 7 days.
+    </description>
+  </property>
+  <property>
+    <name>timeline.metrics.cluster.aggregator.minute.ttl</name>
+    <value>7776000</value>
+    <description>
       Cluster wide minute resolution data purge interval. Default is 30 days.
     </description>
   </property>

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py
index cd9c844..fa4deaf 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py
@@ -27,7 +27,7 @@ import ast
 
 metric_filename_ext = '.txt'
 # 5 regions for higher order aggregate tables
-other_region_static_count = 5
+other_region_static_count = 6
 # Max equidistant points to return per service
 max_equidistant_points = 50
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/2a89def1/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
index eb4d677..64b7af0 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
@@ -18,6 +18,8 @@
 
 package org.apache.ambari.server.upgrade;
 
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
@@ -26,9 +28,15 @@ import com.google.inject.Module;
 import com.google.inject.Provider;
 import com.google.inject.persist.PersistService;
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariManagementControllerImpl;
+import org.apache.ambari.server.controller.ConfigurationRequest;
+import org.apache.ambari.server.controller.ConfigurationResponse;
+import org.apache.ambari.server.controller.KerberosHelper;
+import org.apache.ambari.server.controller.MaintenanceStateHelper;
 import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -78,6 +86,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.anyObject;
@@ -91,6 +100,8 @@ import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * {@link org.apache.ambari.server.upgrade.UpgradeCatalog213} unit tests.
@@ -667,6 +678,71 @@ public class UpgradeCatalog213Test {
   }
 
   @Test
+  public void testAmsSiteUpdateConfigs() throws Exception{
+
+    Map<String, String> oldPropertiesAmsSite = new HashMap<String, String>() {
+      {
+        //Including only those properties that might be present in an older version.
+        put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(1000));
+        put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(1000));
+        put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(1000));
+      }
+    };
+    Map<String, String> newPropertiesAmsSite = new HashMap<String, String>() {
+      {
+        put("timeline.metrics.cluster.aggregator.second.interval",String.valueOf(120));
+        put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(300));
+        put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(300));
+        put("timeline.metrics.cluster.aggregator.second.ttl", String.valueOf(2592000));
+        put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(7776000));
+        put("timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier", String.valueOf(2));
+        put("timeline.metrics.cluster.aggregator.second.disabled", String.valueOf(false));
+      }
+    };
+    EasyMockSupport easyMockSupport = new EasyMockSupport();
+
+    Clusters clusters = easyMockSupport.createNiceMock(Clusters.class);
+    final Cluster cluster = easyMockSupport.createNiceMock(Cluster.class);
+    Config mockAmsSite = easyMockSupport.createNiceMock(Config.class);
+
+    expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+      put("normal", cluster);
+    }}).once();
+    expect(cluster.getDesiredConfigByType("ams-site")).andReturn(mockAmsSite).atLeastOnce();
+    expect(mockAmsSite.getProperties()).andReturn(oldPropertiesAmsSite).times(1);
+
+    Injector injector = easyMockSupport.createNiceMock(Injector.class);
+    expect(injector.getInstance(Gson.class)).andReturn(null).anyTimes();
+    expect(injector.getInstance(MaintenanceStateHelper.class)).andReturn(null).anyTimes();
+    expect(injector.getInstance(KerberosHelper.class)).andReturn(createNiceMock(KerberosHelper.class)).anyTimes();
+
+    replay(injector, clusters, mockAmsSite, cluster);
+
+    AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class)
+      .addMockedMethod("createConfiguration")
+      .addMockedMethod("getClusters", new Class[] { })
+      .withConstructor(createNiceMock(ActionManager.class), clusters, injector)
+      .createNiceMock();
+
+    Injector injector2 = easyMockSupport.createNiceMock(Injector.class);
+    Capture<ConfigurationRequest> configurationRequestCapture = EasyMock.newCapture();
+    ConfigurationResponse configurationResponseMock = easyMockSupport.createMock(ConfigurationResponse.class);
+
+    expect(injector2.getInstance(AmbariManagementController.class)).andReturn(controller).anyTimes();
+    expect(controller.getClusters()).andReturn(clusters).anyTimes();
+    expect(controller.createConfiguration(capture(configurationRequestCapture))).andReturn(configurationResponseMock).once();
+
+    replay(controller, injector2, configurationResponseMock);
+    new UpgradeCatalog213(injector2).updateAMSConfigs();
+    easyMockSupport.verifyAll();
+
+    ConfigurationRequest configurationRequest = configurationRequestCapture.getValue();
+    Map<String, String> updatedProperties = configurationRequest.getProperties();
+    assertTrue(Maps.difference(newPropertiesAmsSite, updatedProperties).areEqual());
+
+  }
+
+  @Test
   public void testUpdateAlertDefinitions() {
     EasyMockSupport easyMockSupport = new EasyMockSupport();
     UpgradeCatalog213 upgradeCatalog213 = new UpgradeCatalog213(injector);