You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/11/25 18:29:37 UTC

[2/2] ambari git commit: AMBARI-7680. Implement the Metric Collector using ATS. Unit tests.

AMBARI-7680. Implement the Metric Collector using ATS. Unit tests.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1d817954
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1d817954
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1d817954

Branch: refs/heads/branch-metrics-dev
Commit: 1d8179543a8b35ce1d27962f44494efa75acf9bc
Parents: 3b877ac
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Nov 25 09:29:17 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Nov 25 09:29:17 2014 -0800

----------------------------------------------------------------------
 .../pom.xml                                     |  26 +-
 .../timeline/AbstractTimelineAggregator.java    | 255 +++----------
 .../timeline/DefaultPhoenixDataSource.java      |   1 +
 .../timeline/HBaseTimelineMetricStore.java      |  24 +-
 .../metrics/timeline/MetricAggregate.java       | 110 ++++++
 .../timeline/MetricClusterAggregate.java        |  74 ++++
 .../metrics/timeline/MetricHostAggregate.java   |  81 ++++
 .../metrics/timeline/PhoenixHBaseAccessor.java  |  34 +-
 .../metrics/timeline/TimelineClusterMetric.java |  97 +++++
 .../timeline/TimelineMetricAggregator.java      | 147 ++++++++
 .../TimelineMetricAggregatorFactory.java        |  89 +++++
 .../TimelineMetricAggregatorHourly.java         | 198 ----------
 .../TimelineMetricAggregatorMinute.java         | 181 ---------
 .../TimelineMetricClusterAggregator.java        | 235 ++++--------
 .../TimelineMetricClusterAggregatorHourly.java  |  98 ++---
 .../timeline/TimelineMetricConfiguration.java   |   3 +
 .../TestApplicationHistoryServer.java           |  31 +-
 .../timeline/AbstractMiniHBaseClusterTest.java  | 141 ++++---
 .../AbstractPhoenixConnectionlessTest.java      | 113 ++++++
 .../metrics/timeline/ITClusterAggregator.java   | 376 +++++++++++++++++++
 .../metrics/timeline/ITMetricAggregator.java    | 298 +++++++++++++++
 .../metrics/timeline/TestClusterAggregator.java | 275 --------------
 .../metrics/timeline/TestClusterSuite.java      |  32 ++
 .../metrics/timeline/TestHBaseAccessor.java     | 332 ----------------
 .../timeline/TestMetricHostAggregate.java       |  19 +-
 .../timeline/TestPhoenixTransactSQL.java        |   2 +-
 .../src/test/resources/hbase-default.xml        |  36 ++
 .../src/test/resources/log4j.properties         |   1 +
 .../src/test/resources/logging.properties       |   3 +
 29 files changed, 1756 insertions(+), 1556 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
index 7efdb6b..ae2872d 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
@@ -214,7 +214,18 @@
           </mappings>
         </configuration>
       </plugin>
-
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkMode>always</forkMode>
+          <systemProperties>
+            <property>
+              <name>java.util.logging.config.file</name>
+              <value>src/test/resources/logging.properties</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 
@@ -470,6 +481,19 @@
       <scope>test</scope>
       <classifier>tests</classifier>
     </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-testing-util</artifactId>
+        <version>0.98.4-hadoop2</version>
+        <scope>test</scope>
+        <optional>true</optional>
+        <exclusions>
+          <exclusion>
+            <groupId>org.jruby</groupId>
+            <artifactId>jruby-complete</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
     <dependency>
       <groupId>org.powermock</groupId>
       <artifactId>powermock-module-junit4</artifactId>

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
index 43ec648..b3c1af9 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -21,16 +21,14 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.annotate.JsonSubTypes;
-import org.codehaus.jackson.map.ObjectMapper;
 
 import java.io.File;
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Date;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -42,15 +40,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
 public abstract class AbstractTimelineAggregator implements Runnable {
   protected final PhoenixHBaseAccessor hBaseAccessor;
   private final Log LOG;
-  private static final ObjectMapper mapper;
   protected final long checkpointDelayMillis;
   protected final Integer resultsetFetchSize;
   protected Configuration metricsConf;
 
-  static {
-    mapper = new ObjectMapper();
-  }
-
   public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
                                     Configuration metricsConf) {
     this.hBaseAccessor = hBaseAccessor;
@@ -162,204 +155,78 @@ public abstract class AbstractTimelineAggregator implements Runnable {
     FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
   }
 
-  // TODO: Abstract out doWork implementation for cluster and host levels
-  protected abstract boolean doWork(long startTime, long endTime);
-
-  protected abstract Long getSleepIntervalMillis();
-
-  protected abstract Integer getCheckpointCutOffMultiplier();
-
-  protected Long getCheckpointCutOffIntervalMillis() {
-    return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
-  }
-
-  protected abstract boolean isDisabled();
-
-  protected abstract String getCheckpointLocation();
-
-  @JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
-    @JsonSubTypes.Type(value = MetricHostAggregate.class)})
-  @InterfaceAudience.Public
-  @InterfaceStability.Unstable
-  public static class MetricAggregate {
-    protected Double sum = 0.0;
-    protected Double deviation;
-    protected Double max = Double.MIN_VALUE;
-    protected Double min = Double.MAX_VALUE;
-
-    public MetricAggregate() {
-    }
+  /**
+   * Read metrics written during the time interval and save the sum and total
+   * in the aggregate table.
+   *
+   * @param startTime Sample start time
+   * @param endTime Sample end time
+   */
+  protected boolean doWork(long startTime, long endTime) {
+    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
+      "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
 
-    protected MetricAggregate(Double sum, Double deviation, Double max,
-                              Double min) {
-      this.sum = sum;
-      this.deviation = deviation;
-      this.max = max;
-      this.min = min;
-    }
+    boolean success = true;
+    PhoenixTransactSQL.Condition condition =
+      prepareMetricQueryCondition(startTime, endTime);
 
-    void updateSum(Double sum) {
-      this.sum += sum;
-    }
+    Connection conn = null;
+    PreparedStatement stmt = null;
 
-    void updateMax(Double max) {
-      if (max > this.max) {
-        this.max = max;
+    try {
+      conn = hBaseAccessor.getConnection();
+      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+      LOG.debug("Query issued @: " + new Date());
+      ResultSet rs = stmt.executeQuery();
+      LOG.debug("Query returned @: " + new Date());
+
+      aggregate(rs, startTime, endTime);
+      LOG.info("End aggregation cycle @ " + new Date());
+
+    } catch (SQLException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } catch (IOException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
       }
-    }
-
-    void updateMin(Double min) {
-      if (min < this.min) {
-        this.min = min;
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
       }
     }
 
-    @JsonProperty("sum")
-    Double getSum() {
-      return sum;
-    }
-
-    @JsonProperty("deviation")
-    Double getDeviation() {
-      return deviation;
-    }
-
-    @JsonProperty("max")
-    Double getMax() {
-      return max;
-    }
-
-    @JsonProperty("min")
-    Double getMin() {
-      return min;
-    }
-
-    public void setSum(Double sum) {
-      this.sum = sum;
-    }
-
-    public void setDeviation(Double deviation) {
-      this.deviation = deviation;
-    }
-
-    public void setMax(Double max) {
-      this.max = max;
-    }
-
-    public void setMin(Double min) {
-      this.min = min;
-    }
-
-    public String toJSON() throws IOException {
-      return mapper.writeValueAsString(this);
-    }
+    LOG.info("End aggregation cycle @ " + new Date());
+    return success;
   }
 
-  public static class MetricClusterAggregate extends MetricAggregate {
-    private int numberOfHosts;
-
-    @JsonCreator
-    public MetricClusterAggregate() {
-    }
+  protected abstract PhoenixTransactSQL.Condition
+  prepareMetricQueryCondition(long startTime, long endTime);
 
-    MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
-                           Double max, Double min) {
-      super(sum, deviation, max, min);
-      this.numberOfHosts = numberOfHosts;
-    }
+  protected abstract void aggregate(ResultSet rs, long startTime, long endTime)
+    throws IOException, SQLException;
 
-    @JsonProperty("numberOfHosts")
-    int getNumberOfHosts() {
-      return numberOfHosts;
-    }
-
-    void updateNumberOfHosts(int count) {
-      this.numberOfHosts += count;
-    }
-
-    public void setNumberOfHosts(int numberOfHosts) {
-      this.numberOfHosts = numberOfHosts;
-    }
+  protected abstract Long getSleepIntervalMillis();
 
-    /**
-     * Find and update min, max and avg for a minute
-     */
-    void updateAggregates(MetricClusterAggregate hostAggregate) {
-      updateMax(hostAggregate.getMax());
-      updateMin(hostAggregate.getMin());
-      updateSum(hostAggregate.getSum());
-      updateNumberOfHosts(hostAggregate.getNumberOfHosts());
-    }
+  protected abstract Integer getCheckpointCutOffMultiplier();
 
-    @Override
-    public String toString() {
-//    MetricClusterAggregate
-      return "MetricAggregate{" +
-        "sum=" + sum +
-        ", numberOfHosts=" + numberOfHosts +
-        ", deviation=" + deviation +
-        ", max=" + max +
-        ", min=" + min +
-        '}';
-    }
+  protected Long getCheckpointCutOffIntervalMillis() {
+    return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
   }
 
-  /**
-   * Represents a collection of minute based aggregation of values for
-   * resolution greater than a minute.
-   */
-  public static class MetricHostAggregate extends MetricAggregate {
-
-    private long numberOfSamples = 0;
-
-    @JsonCreator
-    public MetricHostAggregate() {
-      super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
-    }
-
-    public MetricHostAggregate(Double sum, int numberOfSamples,
-                               Double deviation,
-                               Double max, Double min) {
-      super(sum, deviation, max, min);
-      this.numberOfSamples = numberOfSamples;
-    }
-
-    @JsonProperty("numberOfSamples")
-    long getNumberOfSamples() {
-      return numberOfSamples == 0 ? 1 : numberOfSamples;
-    }
-
-    void updateNumberOfSamples(long count) {
-      this.numberOfSamples += count;
-    }
-
-    public void setNumberOfSamples(long numberOfSamples) {
-      this.numberOfSamples = numberOfSamples;
-    }
-
-    public double getAvg() {
-      return sum / numberOfSamples;
-    }
+  protected abstract boolean isDisabled();
 
-    /**
-     * Find and update min, max and avg for a minute
-     */
-    void updateAggregates(MetricHostAggregate hostAggregate) {
-      updateMax(hostAggregate.getMax());
-      updateMin(hostAggregate.getMin());
-      updateSum(hostAggregate.getSum());
-      updateNumberOfSamples(hostAggregate.getNumberOfSamples());
-    }
+  protected abstract String getCheckpointLocation();
 
-    @Override
-    public String toString() {
-      return "MetricHostAggregate{" +
-        "sum=" + sum +
-        ", numberOfSamples=" + numberOfSamples +
-        ", deviation=" + deviation +
-        ", max=" + max +
-        ", min=" + min +
-        '}';
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
index c20dd14..679ee36 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
@@ -69,6 +69,7 @@ public class DefaultPhoenixDataSource implements ConnectionProvider {
     LOG.debug("Metric store connection url: " + url);
     try {
       // TODO: Exception is swallowed, it should be thrown - discuss it
+
       connection = DriverManager.getConnection(url);
     } catch (SQLException e) {
       LOG.warn("Unable to connect to HBase store using Phoenix.", e);

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index d2b96ec..a3eb731 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -24,6 +25,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
 import java.io.IOException;
 import java.net.URL;
 import java.sql.SQLException;
@@ -31,9 +33,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
 
 public class HBaseTimelineMetricStore extends AbstractService
     implements TimelineMetricStore {
@@ -82,7 +88,7 @@ public class HBaseTimelineMetricStore extends AbstractService
                                    Configuration metricsConf) {
     hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
     hBaseAccessor.initMetricSchema();
-
+//...BUG...
     // Start the cluster aggregator
     TimelineMetricClusterAggregator minuteClusterAggregator =
       new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf);
@@ -100,16 +106,18 @@ public class HBaseTimelineMetricStore extends AbstractService
     }
 
     // Start the 5 minute aggregator
-    TimelineMetricAggregatorMinute minuteHostAggregator =
-      new TimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
+    TimelineMetricAggregator minuteHostAggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute
+        (hBaseAccessor, metricsConf);
     if (!minuteHostAggregator.isDisabled()) {
       Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
       minuteAggregatorThread.start();
     }
 
     // Start hourly host aggregator
-    TimelineMetricAggregatorHourly hourlyHostAggregator =
-      new TimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
+    TimelineMetricAggregator hourlyHostAggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly
+        (hBaseAccessor, metricsConf);
     if (!hourlyHostAggregator.isDisabled()) {
       Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
       aggregatorHourlyThread.start();

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
new file mode 100644
index 0000000..61e15d7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+  @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  protected Double sum = 0.0;
+  protected Double deviation;
+  protected Double max = Double.MIN_VALUE;
+  protected Double min = Double.MAX_VALUE;
+
+  public MetricAggregate() {
+  }
+
+  MetricAggregate(Double sum, Double deviation, Double max,
+                  Double min) {
+    this.sum = sum;
+    this.deviation = deviation;
+    this.max = max;
+    this.min = min;
+  }
+
+  void updateSum(Double sum) {
+    this.sum += sum;
+  }
+
+  void updateMax(Double max) {
+    if (max > this.max) {
+      this.max = max;
+    }
+  }
+
+  void updateMin(Double min) {
+    if (min < this.min) {
+      this.min = min;
+    }
+  }
+
+  @JsonProperty("sum")
+  Double getSum() {
+    return sum;
+  }
+
+  @JsonProperty("deviation")
+  Double getDeviation() {
+    return deviation;
+  }
+
+  @JsonProperty("max")
+  Double getMax() {
+    return max;
+  }
+
+  @JsonProperty("min")
+  Double getMin() {
+    return min;
+  }
+
+  public void setSum(Double sum) {
+    this.sum = sum;
+  }
+
+  public void setDeviation(Double deviation) {
+    this.deviation = deviation;
+  }
+
+  public void setMax(Double max) {
+    this.max = max;
+  }
+
+  public void setMin(Double min) {
+    this.min = min;
+  }
+
+  public String toJSON() throws IOException {
+    return mapper.writeValueAsString(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
new file mode 100644
index 0000000..c13c85f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+  private int numberOfHosts;
+
+  @JsonCreator
+  public MetricClusterAggregate() {
+  }
+
+  MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+                         Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  @JsonProperty("numberOfHosts")
+  int getNumberOfHosts() {
+    return numberOfHosts;
+  }
+
+  void updateNumberOfHosts(int count) {
+    this.numberOfHosts += count;
+  }
+
+  public void setNumberOfHosts(int numberOfHosts) {
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  void updateAggregates(MetricClusterAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+  }
+
+  @Override
+  public String toString() {
+//    MetricClusterAggregate
+    return "MetricAggregate{" +
+      "sum=" + sum +
+      ", numberOfHosts=" + numberOfHosts +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
new file mode 100644
index 0000000..02cc207
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+  private long numberOfSamples = 0;
+
+  @JsonCreator
+  public MetricHostAggregate() {
+    super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+  }
+
+  public MetricHostAggregate(Double sum, int numberOfSamples,
+                             Double deviation,
+                             Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  @JsonProperty("numberOfSamples")
+  long getNumberOfSamples() {
+    return numberOfSamples == 0 ? 1 : numberOfSamples;
+  }
+
+  void updateNumberOfSamples(long count) {
+    this.numberOfSamples += count;
+  }
+
+  public void setNumberOfSamples(long numberOfSamples) {
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  public double getAvg() {
+    return sum / numberOfSamples;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  void updateAggregates(MetricHostAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+  }
+
+  @Override
+  public String toString() {
+    return "MetricHostAggregate{" +
+      "sum=" + sum +
+      ", numberOfSamples=" + numberOfSamples +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 0851d07..41eb30e 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -27,41 +28,16 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.AbstractTimelineAggregator.MetricClusterAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricHostAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
+  .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.*;
 
 /**
  * Provides a facade over the Phoenix API to access HBase schema

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
new file mode 100644
index 0000000..d227993
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+public class TimelineClusterMetric {
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private long timestamp;
+  private String type;
+
+  TimelineClusterMetric(String metricName, String appId, String instanceId,
+                        long timestamp, String type) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.timestamp = timestamp;
+    this.type = type;
+  }
+
+  String getMetricName() {
+    return metricName;
+  }
+
+  String getAppId() {
+    return appId;
+  }
+
+  String getInstanceId() {
+    return instanceId;
+  }
+
+  long getTimestamp() {
+    return timestamp;
+  }
+
+  String getType() { return type; }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+    if (timestamp != that.timestamp) return false;
+    if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+      return false;
+    if (!metricName.equals(that.metricName)) return false;
+
+    return true;
+  }
+
+  public boolean equalsExceptTime(TimelineClusterMetric metric) {
+    if (!metricName.equals(metric.metricName)) return false;
+    if (!appId.equals(metric.appId)) return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+
+    return true;
+  }
+  @Override
+  public int hashCode() {
+    int result = metricName.hashCode();
+    result = 31 * result + (appId != null ? appId.hashCode() : 0);
+    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "TimelineClusterMetric{" +
+      "metricName='" + metricName + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", timestamp=" + timestamp +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
new file mode 100644
index 0000000..eaa1ab9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
+public class TimelineMetricAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog
+    (TimelineMetricAggregator.class);
+
+  private final String checkpointLocation;
+  private final Long sleepIntervalMillis;
+  private final Integer checkpointCutOffMultiplier;
+  private final String hostAggregatorDisabledParam;
+  private final String inputTableName;
+  private final String outputTableName;
+
+  public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                  Configuration metricsConf,
+                                  String checkpointLocation,
+                                  Long sleepIntervalMillis,
+                                  Integer checkpointCutOffMultiplier,
+                                  String hostAggregatorDisabledParam,
+                                  String inputTableName,
+                                  String outputTableName) {
+    super(hBaseAccessor, metricsConf);
+    this.checkpointLocation = checkpointLocation;
+    this.sleepIntervalMillis = sleepIntervalMillis;
+    this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+    this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
+    this.inputTableName = inputTableName;
+    this.outputTableName = outputTableName;
+  }
+
+  @Override
+  protected String getCheckpointLocation() {
+    return checkpointLocation;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws IOException, SQLException {
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+      aggregateMetricsFromResultSet(rs);
+
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+      outputTableName);
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime,
+                                                  long endTime) {
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      inputTableName));
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("HOSTNAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet
+      (ResultSet rs) throws IOException, SQLException {
+    TimelineMetric existingMetric = null;
+    MetricHostAggregate hostAggregate = null;
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+      new HashMap<TimelineMetric, MetricHostAggregate>();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+      if (existingMetric == null) {
+        // First row
+        existingMetric = currentMetric;
+        hostAggregate = new MetricHostAggregate();
+        hostAggregateMap.put(currentMetric, hostAggregate);
+      }
+
+      if (existingMetric.equalsExceptTime(currentMetric)) {
+        // Recalculate totals with current metric
+        hostAggregate.updateAggregates(currentHostAggregate);
+
+      } else {
+        // Switched over to a new metric - save existing - create new aggregate
+        hostAggregate = new MetricHostAggregate();
+        hostAggregate.updateAggregates(currentHostAggregate);
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        existingMetric = currentMetric;
+      }
+    }
+    return hostAggregateMap;
+  }
+
+  @Override
+  protected Long getSleepIntervalMillis() {
+    return sleepIntervalMillis;
+  }
+
+  @Override
+  protected Integer getCheckpointCutOffMultiplier() {
+    return checkpointCutOffMultiplier;
+  }
+
+  @Override
+  protected boolean isDisabled() {
+    return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..d0dafeb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.*;
+
+/**
+ *
+ */
+public class TimelineMetricAggregatorFactory {
+  private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-checkpoint";
+  private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-hourly-checkpoint";
+
+  public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      MINUTE_AGGREGATE_CHECKPOINT_FILE);
+    long sleepInterval = metricsConf.getLong
+      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300000l);  // 5 mins
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+    String inputTableName = METRICS_RECORD_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepInterval,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName);
+  }
+
+  public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+    long sleepInterval = metricsConf.getLong
+      (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600000l);
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+    String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepInterval,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
deleted file mode 100644
index 16f5ab9..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.PhoenixTransactSQL.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration
-  .HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration
-  .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricAggregatorHourly extends AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog
-    (TimelineMetricAggregatorHourly.class);
-  private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
-    "timeline-metrics-host-aggregator-hourly-checkpoint";
-  private final String checkpointLocation;
-  private final Long sleepIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-
-  public TimelineMetricAggregatorHourly(PhoenixHBaseAccessor hBaseAccessor,
-                                        Configuration metricsConf) {
-
-    super(hBaseAccessor, metricsConf);
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    checkpointLocation = FilenameUtils.concat(checkpointDir,
-      MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
-
-    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
-    checkpointCutOffMultiplier =
-      metricsConf.getInt(HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected boolean doWork(long startTime, long endTime) {
-    LOG.info("Start aggregation cycle @ " + new Date());
-
-    boolean success = true;
-    Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
-    Connection conn = null;
-    PreparedStatement stmt = null;
-
-    try {
-      conn = hBaseAccessor.getConnection();
-      stmt = prepareGetMetricsSqlStmt(conn, condition);
-
-      ResultSet rs = stmt.executeQuery();
-      Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-        aggregateMetricsFromResultSet(rs);
-
-      LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
-      hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
-        METRICS_AGGREGATE_HOURLY_TABLE_NAME);
-
-    } catch (SQLException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } catch (IOException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-
-    LOG.info("End aggregation cycle @ " + new Date());
-    return success;
-  }
-
-  private Condition prepareMetricQueryCondition(long startTime, long endTime) {
-    Condition condition = new Condition(null, null, null, null, startTime,
-      endTime, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("HOSTNAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  private Map<TimelineMetric, MetricHostAggregate>
-  aggregateMetricsFromResultSet(ResultSet rs) throws SQLException, IOException {
-    TimelineMetric existingMetric = null;
-    MetricHostAggregate hostAggregate = null;
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-      new HashMap<TimelineMetric, MetricHostAggregate>();
-
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
-      MetricHostAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
-      if (existingMetric == null) {
-        // First row
-        existingMetric = currentMetric;
-        hostAggregate = new MetricHostAggregate();
-        hostAggregateMap.put(currentMetric, hostAggregate);
-      }
-
-      if (existingMetric.equalsExceptTime(currentMetric)) {
-        // Recalculate totals with current metric
-        hostAggregate.updateAggregates(currentHostAggregate);
-
-      } else {
-        // Switched over to a new metric - save existing
-        hostAggregate = new MetricHostAggregate();
-        hostAggregate.updateAggregates(currentHostAggregate);
-        hostAggregateMap.put(currentMetric, hostAggregate);
-        existingMetric = currentMetric;
-      }
-    }
-    return hostAggregateMap;
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  protected boolean isDisabled() {
-    return metricsConf.getBoolean(HOST_AGGREGATOR_HOUR_DISABLED, false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
deleted file mode 100644
index ac9d12e..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricAggregatorMinute extends AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog(TimelineMetricAggregatorMinute.class);
-  private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
-    "timeline-metrics-host-aggregator-checkpoint";
-  private final String checkpointLocation;
-  private final Long sleepIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-
-  public TimelineMetricAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor,
-                                        Configuration metricsConf) {
-    super(hBaseAccessor, metricsConf);
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    checkpointLocation = FilenameUtils.concat(checkpointDir,
-      MINUTE_AGGREGATE_CHECKPOINT_FILE);
-
-    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
-    checkpointCutOffMultiplier =
-      metricsConf.getInt(HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected boolean doWork(long startTime, long endTime) {
-    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
-      "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
-
-    boolean success = true;
-    Condition condition = new Condition(null, null, null, null, startTime,
-                                        endTime, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      METRICS_RECORD_TABLE_NAME));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("HOSTNAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-
-    Connection conn = null;
-    PreparedStatement stmt = null;
-
-    try {
-      conn = hBaseAccessor.getConnection();
-      stmt = prepareGetMetricsSqlStmt(conn, condition);
-      LOG.debug("Query issued @: " + new Date());
-      ResultSet rs = stmt.executeQuery();
-      LOG.debug("Query returned @: " + new Date());
-      TimelineMetric existingMetric = null;
-      MetricHostAggregate hostAggregate = null;
-
-      Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-        new HashMap<TimelineMetric, MetricHostAggregate>();
-
-      while (rs.next()) {
-        TimelineMetric currentMetric =
-          PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
-        MetricHostAggregate currentHostAggregate =
-          PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
-        if (existingMetric == null) {
-          // First row
-          existingMetric = currentMetric;
-          hostAggregate = new MetricHostAggregate();
-          hostAggregateMap.put(currentMetric, hostAggregate);
-        }
-
-        if (existingMetric.equalsExceptTime(currentMetric)) {
-          // Recalculate totals with current metric
-          hostAggregate.updateAggregates(currentHostAggregate);
-
-        } else {
-          // Switched over to a new metric - create new aggregate
-          hostAggregate = new MetricHostAggregate();
-          hostAggregate.updateAggregates(currentHostAggregate);
-          hostAggregateMap.put(currentMetric, hostAggregate);
-          existingMetric = currentMetric;
-        }
-      }
-
-      LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
-      hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
-        METRICS_AGGREGATE_MINUTE_TABLE_NAME);
-
-    } catch (SQLException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } catch (IOException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-
-    LOG.info("End aggregation cycle @ " + new Date());
-    return success;
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  protected boolean isDisabled() {
-    return metricsConf.getBoolean(HOST_AGGREGATOR_MINUTE_DISABLED, false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
index c52451e..96de1a9 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -24,27 +25,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.*;
 
 /**
  * Aggregates a metric across all hosts in the cluster. Reads metrics from
@@ -82,98 +74,79 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
     return checkpointLocation;
   }
 
-  /**
-   * Read metrics written during the time interval and save the sum and total
-   * in the aggregate table.
-   *
-   * @param startTime Sample start time
-   * @param endTime Sample end time
-   */
-  protected boolean doWork(long startTime, long endTime) {
-    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
-      "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
-
-    boolean success = true;
-    Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
-    Connection conn;
-    PreparedStatement stmt;
-
-    try {
-      conn = hBaseAccessor.getConnection();
-      stmt = prepareGetMetricsSqlStmt(conn, condition);
-      LOG.debug("Query issued @: " + new Date());
-      ResultSet rs = stmt.executeQuery();
-      LOG.debug("Query returned @: " + new Date());
-      Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
-        new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-      List<Long[]> timeSlices = new ArrayList<Long[]>();
-      // Create time slices
-      long sliceStartTime = startTime;
-      while (sliceStartTime < endTime) {
-        timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
-        sliceStartTime += timeSliceIntervalMillis;
-      }
-
-      while (rs.next()) {
-        TimelineMetric metric =
-          PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
-
-        Map<TimelineClusterMetric, Double> clusterMetrics =
-          sliceFromTimelineMetric(metric, timeSlices);
-
-        if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
-          for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
-              clusterMetrics.entrySet()) {
-            TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
-            MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
-            Double avgValue = clusterMetricEntry.getValue();
-
-            if (aggregate == null) {
-              aggregate = new MetricClusterAggregate(avgValue, 1, null,
-                avgValue, avgValue);
-              aggregateClusterMetrics.put(clusterMetric, aggregate);
-            } else {
-              aggregate.updateSum(avgValue);
-              aggregate.updateNumberOfHosts(1);
-              aggregate.updateMax(avgValue);
-              aggregate.updateMin(avgValue);
-            }
-          }
-        }
-      }
-      LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
-
-      hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
-
-      LOG.info("End aggregation cycle @ " + new Date());
-
-    } catch (SQLException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } catch (IOException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    }
-
-    return success;
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws SQLException, IOException {
+    List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+    Map<TimelineClusterMetric, MetricClusterAggregate>
+      aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
+
+    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
   }
 
-  private Condition prepareMetricQueryCondition(long startTime, long endTime) {
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
     Condition condition = new Condition(null, null, null, null, startTime,
-                                        endTime, null, true);
-    condition.setFetchSize(resultsetFetchSize);
+      endTime, null, true);
     condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
     condition.setStatement(String.format(GET_METRIC_SQL,
       METRICS_RECORD_TABLE_NAME));
     condition.addOrderByColumn("METRIC_NAME");
     condition.addOrderByColumn("APP_ID");
     condition.addOrderByColumn("INSTANCE_ID");
     condition.addOrderByColumn("SERVER_TIME");
-
     return condition;
   }
 
+  private List<Long[]> getTimeSlices(long startTime, long endTime) {
+    List<Long[]> timeSlices = new ArrayList<Long[]>();
+    long sliceStartTime = startTime;
+    while (sliceStartTime < endTime) {
+      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
+      sliceStartTime += timeSliceIntervalMillis;
+    }
+    return timeSlices;
+  }
+
+  private Map<TimelineClusterMetric, MetricClusterAggregate>
+  aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+    throws SQLException, IOException {
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+    // Create time slices
+
+    while (rs.next()) {
+      TimelineMetric metric =
+        PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
+
+      Map<TimelineClusterMetric, Double> clusterMetrics =
+        sliceFromTimelineMetric(metric, timeSlices);
+
+      if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+        for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+            clusterMetrics.entrySet()) {
+          TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+          MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+          Double avgValue = clusterMetricEntry.getValue();
+
+          if (aggregate == null) {
+            aggregate = new MetricClusterAggregate(avgValue, 1, null,
+              avgValue, avgValue);
+            aggregateClusterMetrics.put(clusterMetric, aggregate);
+          } else {
+            aggregate.updateSum(avgValue);
+            aggregate.updateNumberOfHosts(1);
+            aggregate.updateMax(avgValue);
+            aggregate.updateMin(avgValue);
+          }
+        }
+      }
+    }
+    return aggregateClusterMetrics;
+  }
+
   @Override
   protected Long getSleepIntervalMillis() {
     return sleepIntervalMillis;
@@ -239,82 +212,4 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
     return -1l;
   }
 
-  public static class TimelineClusterMetric {
-    private String metricName;
-    private String appId;
-    private String instanceId;
-    private long timestamp;
-    private String type;
-
-    TimelineClusterMetric(String metricName, String appId, String instanceId,
-                          long timestamp, String type) {
-      this.metricName = metricName;
-      this.appId = appId;
-      this.instanceId = instanceId;
-      this.timestamp = timestamp;
-      this.type = type;
-    }
-
-    String getMetricName() {
-      return metricName;
-    }
-
-    String getAppId() {
-      return appId;
-    }
-
-    String getInstanceId() {
-      return instanceId;
-    }
-
-    long getTimestamp() {
-      return timestamp;
-    }
-
-    String getType() { return type; }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      TimelineClusterMetric that = (TimelineClusterMetric) o;
-
-      if (timestamp != that.timestamp) return false;
-      if (appId != null ? !appId.equals(that.appId) : that.appId != null)
-        return false;
-      if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
-        return false;
-      if (!metricName.equals(that.metricName)) return false;
-
-      return true;
-    }
-
-    public boolean equalsExceptTime(TimelineClusterMetric metric) {
-      if (!metricName.equals(metric.metricName)) return false;
-      if (!appId.equals(metric.appId)) return false;
-      if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
-        return false;
-
-      return true;
-    }
-    @Override
-    public int hashCode() {
-      int result = metricName.hashCode();
-      result = 31 * result + (appId != null ? appId.hashCode() : 0);
-      result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
-      result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-      return result;
-    }
-
-    @Override
-    public String toString() {
-      return "TimelineClusterMetric{" +
-        "metricName='" + metricName + '\'' +
-        ", appId='" + appId + '\'' +
-        ", instanceId='" + instanceId + '\'' +
-        ", timestamp=" + timestamp +
-        '}';
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
index e886b71..54d3fdd 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
@@ -24,11 +24,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -40,19 +37,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline.PhoenixTransactSQL.*;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration
-  .CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration
-  .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
 
 public class TimelineMetricClusterAggregatorHourly extends
   AbstractTimelineAggregator {
@@ -77,7 +62,8 @@ public class TimelineMetricClusterAggregatorHourly extends
 
     sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
       (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
-    checkpointCutOffIntervalMillis = 7200000l;
+    checkpointCutOffIntervalMillis =  SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
     checkpointCutOffMultiplier = metricsConf.getInt
       (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
   }
@@ -88,60 +74,31 @@ public class TimelineMetricClusterAggregatorHourly extends
   }
 
   @Override
-  protected boolean doWork(long startTime, long endTime) {
-    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
-      "startTime = " + new Date(startTime) + ", endTime = " + new Date
-      (endTime));
-
-    boolean success = true;
-    Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
-    Connection conn = null;
-    PreparedStatement stmt = null;
-
-    try {
-      conn = hBaseAccessor.getConnection();
-      stmt = prepareGetMetricsSqlStmt(conn, condition);
-
-      ResultSet rs = stmt.executeQuery();
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws SQLException, IOException {
       Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
         aggregateMetricsFromResultSet(rs);
 
-      LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
-      hBaseAccessor.saveClusterAggregateHourlyRecords(
-        hostAggregateMap,
-        METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
-
-    } catch (SQLException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } catch (IOException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
+      METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+  }
 
-    LOG.info("End aggregation cycle @ " + new Date());
-    return success;
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime,
+                                                  long endTime) {
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
   }
 
-  // should rewrite from host agg to cluster agg
-  //
   private Map<TimelineClusterMetric, MetricHostAggregate>
   aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException {
 
@@ -189,19 +146,6 @@ public class TimelineMetricClusterAggregatorHourly extends
     agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
   }
 
-  private Condition prepareMetricQueryCondition(long startTime, long endTime) {
-    Condition condition = new Condition
-      (null, null, null, null, startTime, endTime, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
   @Override
   protected Long getSleepIntervalMillis() {
     return sleepIntervalMillis;

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 6b19847..e022e5e 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -86,6 +86,9 @@ public interface TimelineMetricConfiguration {
   public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
     "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier";
 
+  public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL =
+    "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffInterval";
+
   public static final String GLOBAL_RESULT_LIMIT =
     "timeline.metrics.service.default.result.limit";