You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/11/25 18:29:36 UTC

[1/2] ambari git commit: AMBARI-7680. Implement the Metric Collector using ATS. Unit tests.

Repository: ambari
Updated Branches:
  refs/heads/branch-metrics-dev 3b877ac82 -> 1d8179543


http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
index 71dbec2..3720852 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
@@ -24,19 +24,19 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixHBaseAccessor;
 import org.apache.zookeeper.ClientCnxn;
 import org.easymock.EasyMock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
@@ -46,22 +46,21 @@ import java.net.URLClassLoader;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
 import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.*;
 import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
+import static org.junit.Assert.*;
+import static org.powermock.api.easymock.PowerMock.*;
 import static org.powermock.api.support.membermodification.MemberMatcher.method;
-import static org.powermock.api.support.membermodification.MemberModifier.suppress;
+import static org.powermock.api.support.membermodification.MemberModifier
+  .suppress;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ PhoenixHBaseAccessor.class, UserGroupInformation.class,
-  ClientCnxn.class })
+  ClientCnxn.class, DefaultPhoenixDataSource.class})
 @PowerMockIgnore( {"javax.management.*"})
 public class TestApplicationHistoryServer {
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index db39250..528f1b1 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -15,99 +15,96 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
 
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
-import org.apache.phoenix.jdbc.PhoenixTestDriver;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
+import java.sql.*;
+import java.util.Map;
+import java.util.Properties;
 
-import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
-public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
+public abstract class AbstractMiniHbaseClusterTest extends BaseTest {
 
-  protected static String getUrl() {
-    return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
-  }
+  protected static final long BATCH_SIZE = 3;
 
-  protected static String getUrl(String tenantId) {
-    return getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+  @BeforeClass
+  public static void doSetup() throws Exception {
+    Map<String, String> props = getDefaultProps();
+    props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
+    props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
+    // Make a small batch size to test multiple calls to reserve sequences
+    props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
+      Long.toString(BATCH_SIZE));
+    // Must update config before starting server
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 
-  protected static PhoenixTestDriver driver;
-
-  private static void startServer(String url) throws Exception {
-    assertNull(driver);
-    // only load the test driver if we are testing locally - for integration tests, we want to
-    // test on a wider scale
-    if (PhoenixEmbeddedDriver.isTestUrl(url)) {
-      driver = initDriver(ReadOnlyProps.EMPTY_PROPS);
-      assertTrue(DriverManager.getDriver(url) == driver);
-      driver.connect(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
-    }
+  @AfterClass
+  public static void doTeardown() throws Exception {
+    dropNonSystemTables();
   }
 
-  protected static synchronized PhoenixTestDriver initDriver(ReadOnlyProps props) throws Exception {
-    if (driver == null) {
-      driver = new PhoenixTestDriver(props);
-      DriverManager.registerDriver(driver);
-    }
-    return driver;
+  @After
+  public void cleanUpAfterTest() throws Exception {
+    deletePriorTables(HConstants.LATEST_TIMESTAMP, getUrl());
   }
 
-  private String connUrl;
+  public static Map<String, String> getDefaultProps() {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(5);
+    // Must update config before starting server
+    props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+      Boolean.FALSE.toString());
+    return props;
+  }
 
-  @Before
-  public void setup() throws Exception {
-    connUrl = getUrl();
-    startServer(connUrl);
+  protected Connection getConnection(String url) throws SQLException {
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    Connection conn = DriverManager.getConnection(getUrl(), props);
+    return conn;
   }
 
+  /**
+   * A canary test. Will show if the infrastructure is set-up correctly.
+   */
   @Test
-  public void testStorageSystemInitialized() throws Exception {
-    String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
-      "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) DATA_BLOCK_ENCODING='FAST_DIFF', " +
-      "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
-
-    Connection conn = null;
-    PreparedStatement stmt = null;
-    try {
-      conn = DriverManager.getConnection(connUrl);
-      stmt = conn.prepareStatement(sampleDDL);
-      stmt.execute();
-      conn.commit();
-    } finally {
-      if (stmt != null) {
-        stmt.close();
-      }
-      if (conn != null) {
-        conn.close();
-      }
-    }
-  }
+  public void testClusterOK() throws Exception {
+    Connection conn = getConnection(getUrl());
+    conn.setAutoCommit(true);
 
-  @After
-  public void tearDown() throws Exception {
-    if (driver != null) {
-      try {
-        driver.close();
-      } finally {
-        PhoenixTestDriver phoenixTestDriver = driver;
-        driver = null;
-        DriverManager.deregisterDriver(phoenixTestDriver);
-      }
-    }
+    String sampleDDL = "CREATE TABLE TEST_METRICS " +
+      "(TEST_COLUMN VARCHAR " +
+      "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
+      "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, " +
+      "TTL=86400, COMPRESSION='NONE' ";
+
+    Statement stmt = conn.createStatement();
+    stmt.executeUpdate(sampleDDL);
+    conn.commit();
+
+    ResultSet rs = stmt.executeQuery(
+      "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
+
+    rs.next();
+    long l = rs.getLong(1);
+    assertThat(l).isGreaterThanOrEqualTo(0);
+
+    stmt.execute("DROP TABLE TEST_METRICS");
+    conn.close();
   }
-}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java
new file mode 100644
index 0000000..e7f2d5c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public abstract class AbstractPhoenixConnectionlessTest extends BaseTest {
+
+  protected static String getUrl() {
+    return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
+  }
+
+  protected static String getUrl(String tenantId) {
+    return getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+  }
+
+  protected static PhoenixTestDriver driver;
+
+  private static void startServer(String url) throws Exception {
+    assertNull(driver);
+    // only load the test driver if we are testing locally - for integration tests, we want to
+    // test on a wider scale
+    if (PhoenixEmbeddedDriver.isTestUrl(url)) {
+      driver = initDriver(ReadOnlyProps.EMPTY_PROPS);
+      assertTrue(DriverManager.getDriver(url) == driver);
+      driver.connect(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
+    }
+  }
+
+  protected static synchronized PhoenixTestDriver initDriver(ReadOnlyProps props) throws Exception {
+    if (driver == null) {
+      driver = new PhoenixTestDriver(props);
+      DriverManager.registerDriver(driver);
+    }
+    return driver;
+  }
+
+  private String connUrl;
+
+  @Before
+  public void setup() throws Exception {
+    connUrl = getUrl();
+    startServer(connUrl);
+  }
+
+  @Test
+  public void testStorageSystemInitialized() throws Exception {
+    String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
+      "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) DATA_BLOCK_ENCODING='FAST_DIFF', " +
+      "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
+
+    Connection conn = null;
+    PreparedStatement stmt = null;
+    try {
+      conn = DriverManager.getConnection(connUrl);
+      stmt = conn.prepareStatement(sampleDDL);
+      stmt.execute();
+      conn.commit();
+    } finally {
+      if (stmt != null) {
+        stmt.close();
+      }
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (driver != null) {
+      try {
+        driver.close();
+      } finally {
+        PhoenixTestDriver phoenixTestDriver = driver;
+        driver = null;
+        DriverManager.deregisterDriver(phoenixTestDriver);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
new file mode 100644
index 0000000..7a8a7d8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.*;
+
+public class ITClusterAggregator extends AbstractMiniHbaseClusterTest {
+  private Connection conn;
+  private PhoenixHBaseAccessor hdb;
+
+  @Before
+  public void setUp() throws Exception {
+    hdb = createTestableHBaseAccessor();
+    // inits connection, starts mini cluster
+    conn = getConnection(getUrl());
+
+    hdb.initMetricSchema();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Connection conn = getConnection(getUrl());
+    Statement stmt = conn.createStatement();
+
+    stmt.execute("delete from METRIC_AGGREGATE");
+    stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
+    stmt.execute("delete from METRIC_RECORD");
+    stmt.execute("delete from METRIC_RECORD_HOURLY");
+    stmt.execute("delete from METRIC_RECORD_MINUTE");
+    conn.commit();
+
+    stmt.close();
+    conn.close();
+  }
+
+  @Test
+  public void testShouldAggregateClusterProperly() throws Exception {
+    // GIVEN
+    TimelineMetricClusterAggregator agg =
+      new TimelineMetricClusterAggregator(hdb, new Configuration());
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 1));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 2));
+    ctime += minute;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 2));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 1));
+
+    // WHEN
+    long endTime = ctime + minute;
+    boolean success = agg.doWork(startTime, endTime);
+
+    //THEN
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+      (conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2, currentHostAggregate.getNumberOfHosts());
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(3.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+  }
+
+
+  @Test
+  public void testShouldAggregateDifferentMetricsOnClusterProperly()
+    throws Exception {
+    // GIVEN
+    TimelineMetricClusterAggregator agg =
+      new TimelineMetricClusterAggregator(hdb, new Configuration());
+
+    // here we put some metrics tha will be aggregated
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 1));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 2));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_used", 1));
+
+    ctime += minute;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 2));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "disk_free", 1));
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "disk_used", 1));
+
+    // WHEN
+    long endTime = ctime + minute;
+    boolean success = agg.doWork(startTime, endTime);
+
+    //THEN
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+      (conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2, currentHostAggregate.getNumberOfHosts());
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(3.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else if ("disk_used".equals(currentMetric.getMetricName())) {
+        assertEquals(1, currentHostAggregate.getNumberOfHosts());
+        assertEquals(1.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(1.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+  }
+
+
+  @Test
+  public void testShouldAggregateClusterOnHourProperly() throws Exception {
+    // GIVEN
+    TimelineMetricClusterAggregatorHourly agg =
+      new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
+
+    // this time can be virtualized! or made independent from real clock
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(createEmptyTimelineMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineMetric(ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineMetric(ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineMetric(ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+
+    // WHEN
+    agg.doWork(startTime, ctime + minute);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+    int count = 0;
+    while (rs.next()) {
+      assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
+      assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      count++;
+    }
+
+    assertEquals("One hourly aggregated row expected ", 1, count);
+  }
+
+  @Test
+  public void testShouldAggregateDifferentMetricsOnHourProperly() throws
+    Exception {
+    // GIVEN
+    TimelineMetricClusterAggregatorHourly agg =
+      new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(createEmptyTimelineMetric("disk_used", ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    hdb.saveClusterAggregateRecords(records);
+
+    // WHEN
+    agg.doWork(startTime, ctime + minute);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+    int count = 0;
+    while (rs.next()) {
+      if ("disk_used".equals(rs.getString("METRIC_NAME"))) {
+        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+        assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) {
+        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+        assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
+      }
+
+      count++;
+    }
+
+    assertEquals("Two hourly aggregated row expected ", 2, count);
+  }
+
+  private ResultSet executeQuery(String query) throws SQLException {
+    Connection conn = getConnection(getUrl());
+    Statement stmt = conn.createStatement();
+    return stmt.executeQuery(query);
+  }
+
+  private TimelineClusterMetric createEmptyTimelineMetric(String name,
+                                                          long startTime) {
+    TimelineClusterMetric metric = new TimelineClusterMetric(name,
+      "test_app", null, startTime, null);
+
+    return metric;
+  }
+
+  private TimelineClusterMetric createEmptyTimelineMetric(long startTime) {
+    return createEmptyTimelineMetric("disk_used", startTime);
+  }
+
+  private MetricHostAggregate
+  createMetricHostAggregate(double max, double min, int numberOfSamples,
+                            double sum) {
+    MetricHostAggregate expectedAggregate =
+      new MetricHostAggregate();
+    expectedAggregate.setMax(max);
+    expectedAggregate.setMin(min);
+    expectedAggregate.setNumberOfSamples(numberOfSamples);
+    expectedAggregate.setSum(sum);
+
+    return expectedAggregate;
+  }
+
+  private PhoenixHBaseAccessor createTestableHBaseAccessor() {
+    Configuration metricsConf = new Configuration();
+    metricsConf.set(
+      TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+
+    return
+      new PhoenixHBaseAccessor(
+        new Configuration(),
+        metricsConf,
+        new ConnectionProvider() {
+          @Override
+          public Connection getConnection() {
+            Connection connection = null;
+            try {
+              connection = DriverManager.getConnection(getUrl());
+            } catch (SQLException e) {
+              LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+            }
+            return connection;
+          }
+        });
+  }
+
+  private TimelineMetrics prepareSingleTimelineMetric(long startTime,
+                                                      String host,
+                                                      String metricName,
+                                                      double val) {
+    TimelineMetrics m = new TimelineMetrics();
+    m.setMetrics(Arrays.asList(
+      createTimelineMetric(startTime, metricName, host, val)));
+
+    return m;
+  }
+
+  private TimelineMetric createTimelineMetric(long startTime,
+                                              String metricName,
+                                              String host,
+                                              double val) {
+    TimelineMetric m = new TimelineMetric();
+    m.setAppId("host");
+    m.setHostName(host);
+    m.setMetricName(metricName);
+    m.setStartTime(startTime);
+    Map<Long, Double> vals = new HashMap<Long, Double>();
+    vals.put(startTime + 15000l, val);
+    vals.put(startTime + 30000l, val);
+    vals.put(startTime + 45000l, val);
+    vals.put(startTime + 60000l, val);
+
+    m.setMetricValues(vals);
+
+    return m;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
new file mode 100644
index 0000000..19e5f21
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.*;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+import static junit.framework.Assert.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ITMetricAggregator extends AbstractMiniHbaseClusterTest {
+  private Connection conn;
+  private PhoenixHBaseAccessor hdb;
+
+  @Before
+  public void setUp() throws Exception {
+    hdb = createTestableHBaseAccessor();
+    // inits connection, starts mini cluster
+    conn = getConnection(getUrl());
+
+    hdb.initMetricSchema();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Connection conn = getConnection(getUrl());
+    Statement stmt = conn.createStatement();
+
+    stmt.execute("delete from METRIC_AGGREGATE");
+    stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
+    stmt.execute("delete from METRIC_RECORD");
+    stmt.execute("delete from METRIC_RECORD_HOURLY");
+    stmt.execute("delete from METRIC_RECORD_MINUTE");
+    conn.commit();
+
+    stmt.close();
+    conn.close();
+  }
+
+  @Test
+  public void testShouldInsertMetrics() throws Exception {
+    // GIVEN
+
+    // WHEN
+    long startTime = System.currentTimeMillis();
+    TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
+    hdb.insertMetricRecords(metricsSent);
+
+    Condition queryCondition = new Condition(null, "local", null, null,
+      startTime, startTime + (15 * 60 * 1000), null, false);
+    TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition);
+
+    // THEN
+    assertThat(recordRead.getMetrics()).hasSize(2)
+      .extracting("metricName")
+      .containsOnly("mem_free", "disk_free");
+
+    assertThat(metricsSent.getMetrics())
+      .usingElementComparator(TIME_IGNORING_COMPARATOR)
+      .containsExactlyElementsOf(recordRead.getMetrics());
+  }
+
+  @Test
+  public void testShouldAggregateMinuteProperly() throws Exception {
+    // GIVEN
+//    TimelineMetricAggregatorMinute aggregatorMinute =
+//      new TimelineMetricAggregatorMinute(hdb, new Configuration());
+    TimelineMetricAggregator aggregatorMinute = TimelineMetricAggregatorFactory
+      .createTimelineMetricAggregatorMinute(hdb, new Configuration());
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+
+    // WHEN
+    long endTime = startTime + 1000 * 60 * 4;
+    boolean success = aggregatorMinute.doWork(startTime, endTime);
+
+    //THEN
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+      (conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+    MetricHostAggregate expectedAggregate =
+      createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        count++;
+      } else if ("mem_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        count++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+    assertEquals("Two aggregated entries expected", 2, count);
+  }
+
+  @Test
+  public void testShouldAggregateHourProperly() throws Exception {
+    // GIVEN
+//    TimelineMetricAggregatorHourly aggregator =
+//      new TimelineMetricAggregatorHourly(hdb, new Configuration());
+
+    TimelineMetricAggregator aggregator = TimelineMetricAggregatorFactory
+      .createTimelineMetricAggregatorHourly(hdb, new Configuration());
+    long startTime = System.currentTimeMillis();
+
+    MetricHostAggregate expectedAggregate =
+      createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+    Map<TimelineMetric, MetricHostAggregate>
+      aggMap = new HashMap<TimelineMetric,
+      MetricHostAggregate>();
+
+    int min_5 = 5 * 60 * 1000;
+    long ctime = startTime - min_5;
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+
+    hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+    //WHEN
+    long endTime = ctime + min_5;
+    boolean success = aggregator.doWork(startTime, endTime);
+    assertTrue(success);
+
+    //THEN
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      METRICS_AGGREGATE_HOURLY_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+      (conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_used".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(12 * 15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+      }
+    }
+  }
+
+  private TimelineMetric createEmptyTimelineMetric(long startTime) {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName("disk_used");
+    metric.setAppId("test_app");
+    metric.setHostName("test_host");
+    metric.setTimestamp(startTime);
+
+    return metric;
+  }
+
+  private MetricHostAggregate
+  createMetricHostAggregate(double max, double min, int numberOfSamples,
+                            double sum) {
+    MetricHostAggregate expectedAggregate =
+      new MetricHostAggregate();
+    expectedAggregate.setMax(max);
+    expectedAggregate.setMin(min);
+    expectedAggregate.setNumberOfSamples(numberOfSamples);
+    expectedAggregate.setSum(sum);
+
+    return expectedAggregate;
+  }
+
+  private PhoenixHBaseAccessor createTestableHBaseAccessor() {
+    Configuration metricsConf = new Configuration();
+    metricsConf.set(
+      TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+
+    return
+      new PhoenixHBaseAccessor(
+        new Configuration(),
+        metricsConf,
+        new ConnectionProvider() {
+          @Override
+          public Connection getConnection() {
+            Connection connection = null;
+            try {
+              connection = DriverManager.getConnection(getUrl());
+            } catch (SQLException e) {
+              LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+            }
+            return connection;
+          }
+        });
+  }
+
+  private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
+    new Comparator<TimelineMetric>() {
+      @Override
+      public int compare(TimelineMetric o1, TimelineMetric o2) {
+        return o1.equalsExceptTime(o2) ? 0 : 1;
+      }
+    };
+
+  private TimelineMetrics prepareTimelineMetrics(long startTime, String host) {
+    TimelineMetrics metrics = new TimelineMetrics();
+    metrics.setMetrics(Arrays.asList(
+      createMetric(startTime, "disk_free", host),
+      createMetric(startTime, "mem_free", host)));
+
+    return metrics;
+  }
+
+  private TimelineMetric createMetric(long startTime,
+                                      String metricName,
+                                      String host) {
+    TimelineMetric m = new TimelineMetric();
+    m.setAppId("host");
+    m.setHostName(host);
+    m.setMetricName(metricName);
+    m.setStartTime(startTime);
+    Map<Long, Double> vals = new HashMap<Long, Double>();
+    vals.put(startTime + 15000l, 0.0);
+    vals.put(startTime + 30000l, 0.0);
+    vals.put(startTime + 45000l, 1.0);
+    vals.put(startTime + 60000l, 2.0);
+
+    m.setMetricValues(vals);
+
+    return m;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java
deleted file mode 100644
index ac14ca3..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java
+++ /dev/null
@@ -1,275 +0,0 @@
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.sql.*;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-
-import static junit.framework.Assert.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.AbstractTimelineAggregator.MetricClusterAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.AbstractTimelineAggregator.MetricHostAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.PhoenixTransactSQL.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class TestClusterAggregator {
-  private static String MY_LOCAL_URL =
-    "jdbc:phoenix:c6503.ambari.apache.org:" + 2181 + ":/hbase";
-  private Connection conn;
-  private PhoenixHBaseAccessor hdb;
-
-
-  @Before
-  public void setUp() throws Exception {
-    hdb = createTestableHBaseAccessor();
-    conn = getConnection(getUrl());
-    Statement stmt = conn.createStatement();
-
-    hdb.initMetricSchema();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    Connection conn = getConnection(getUrl());
-    Statement stmt = conn.createStatement();
-
-    stmt.execute("delete from METRIC_AGGREGATE");
-    stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
-    stmt.execute("delete from METRIC_RECORD");
-    stmt.execute("delete from METRIC_RECORD_HOURLY");
-    stmt.execute("delete from METRIC_RECORD_MINUTE");
-    conn.commit();
-
-    stmt.close();
-    conn.close();
-  }
-
-  /**
-   * A canary test.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testClusterOK() throws Exception {
-    Connection conn = getConnection(getUrl());
-    Statement stmt = conn.createStatement();
-    String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
-      "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
-      "DATA_BLOCK_ENCODING='FAST_DIFF', " +
-      "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
-
-    stmt.executeUpdate(sampleDDL);
-    ResultSet rs = stmt.executeQuery(
-      "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
-
-    rs.next();
-    long l = rs.getLong(1);
-    assertThat(l).isGreaterThanOrEqualTo(0);
-
-    stmt.execute("DROP TABLE TEST_METRICS");
-  }
-
-  @Test
-  public void testShouldAggregateClusterProperly() throws Exception {
-    // GIVEN
-    TimelineMetricClusterAggregator agg =
-      new TimelineMetricClusterAggregator(hdb, new Configuration());
-
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000;
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", 1));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", 2));
-    ctime += minute;
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", 2));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", 1));
-
-    // WHEN
-    long endTime = ctime + minute;
-    boolean success = agg.doWork(startTime, endTime);
-
-    //THEN
-    Condition condition = new Condition(null, null, null, null, startTime,
-      endTime, null, true);
-    condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
-
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-    MetricHostAggregate expectedAggregate =
-      createMetricHostAggregate(2.0, 0.0, 20, 15.0);
-
-    int recordCount = 0;
-    while (rs.next()) {
-      TimelineClusterMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
-      MetricClusterAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
-
-      if ("disk_free".equals(currentMetric.getMetricName())) {
-        assertEquals(2, currentHostAggregate.getNumberOfHosts());
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(1.0, currentHostAggregate.getMin());
-        assertEquals(3.0, currentHostAggregate.getSum());
-        recordCount++;
-      } else {
-        fail("Unexpected entry");
-      }
-    }
-  }
-
-  @Test
-  public void testShouldAggregateClusterOnHourProperly() throws Exception {
-    // GIVEN
-    TimelineMetricClusterAggregatorHourly agg =
-      new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
-
-    // this time can be virtualized! or made independent from real clock
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000;
-
-    Map<TimelineClusterMetric, MetricClusterAggregate> records =
-      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-
-    records.put(createEmptyTimelineMetric(ctime),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineMetric(ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineMetric(ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineMetric(ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-
-    hdb.saveClusterAggregateRecords(records);
-
-    // WHEN
-    agg.doWork(startTime, ctime + minute);
-
-    // THEN
-    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
-    int count = 0;
-    while (rs.next()) {
-      assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
-      assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
-      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
-      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
-      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
-      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
-      count++;
-    }
-
-    assertEquals("One hourly aggregated row expected ",1, count);
-
-    System.out.println(rs);
-
-
-  }
-
-  private ResultSet executeQuery(String query) throws SQLException {
-    Connection conn = getConnection(getUrl());
-    Statement stmt = conn.createStatement();
-    return stmt.executeQuery(query);
-  }
-
-  private TimelineClusterMetric createEmptyTimelineMetric(long startTime) {
-    TimelineClusterMetric metric = new TimelineClusterMetric("disk_used",
-      "test_app", null, startTime, null);
-
-    return metric;
-  }
-
-  private MetricHostAggregate
-  createMetricHostAggregate(double max, double min, int numberOfSamples,
-                            double sum) {
-    MetricHostAggregate expectedAggregate =
-      new MetricHostAggregate();
-    expectedAggregate.setMax(max);
-    expectedAggregate.setMin(min);
-    expectedAggregate.setNumberOfSamples(numberOfSamples);
-    expectedAggregate.setSum(sum);
-
-    return expectedAggregate;
-  }
-
-  private PhoenixHBaseAccessor createTestableHBaseAccessor() {
-    return
-      new PhoenixHBaseAccessor(
-        new Configuration(),
-        new Configuration(),
-        new ConnectionProvider() {
-          @Override
-          public Connection getConnection() {
-            Connection connection = null;
-            try {
-              connection = DriverManager.getConnection(getUrl());
-            } catch (SQLException e) {
-              LOG.warn("Unable to connect to HBase store using Phoenix.", e);
-            }
-            return connection;
-          }
-        });
-  }
-
-  private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
-    new Comparator<TimelineMetric>() {
-      @Override
-      public int compare(TimelineMetric o1, TimelineMetric o2) {
-        return o1.equalsExceptTime(o2) ? 0 : 1;
-      }
-    };
-
-  private TimelineMetrics prepareSingleTimelineMetric(long startTime,
-                                                      String host,
-                                                      double val) {
-    TimelineMetrics m = new TimelineMetrics();
-    m.setMetrics(Arrays.asList(
-      createTimelineMetric(startTime, "disk_free", host, val)));
-
-    return m;
-  }
-
-  private TimelineMetric createTimelineMetric(long startTime,
-                                              String metricName,
-                                              String host,
-                                              double val) {
-    TimelineMetric m = new TimelineMetric();
-    m.setAppId("host");
-    m.setHostName(host);
-    m.setMetricName(metricName);
-    m.setStartTime(startTime);
-    Map<Long, Double> vals = new HashMap<Long, Double>();
-    vals.put(startTime + 15000l, val);
-    vals.put(startTime + 30000l, val);
-    vals.put(startTime + 45000l, val);
-    vals.put(startTime + 60000l, val);
-
-    m.setMetricValues(vals);
-
-    return m;
-  }
-
-
-  protected static String getUrl() {
-    return MY_LOCAL_URL;
-//    return  TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
-  }
-
-  private static Connection getConnection(String url) throws SQLException {
-    return DriverManager.getConnection(url);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
new file mode 100644
index 0000000..6aa227b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+import static org.junit.runners.Suite.SuiteClasses;
+
+@RunWith(Suite.class)
+@SuiteClasses({ITMetricAggregator.class, ITClusterAggregator.class})
+public class TestClusterSuite {
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
deleted file mode 100644
index a10cfeb..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
+++ /dev/null
@@ -1,332 +0,0 @@
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.junit.*;
-
-import java.sql.*;
-import java.util.*;
-
-import static junit.framework.Assert.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.PhoenixTransactSQL.*;
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class TestHBaseAccessor {
-  private static String MY_LOCAL_URL =
-    "jdbc:phoenix:c6503.ambari.apache.org:" + 2181 + ":/hbase";
-  private Connection conn;
-  private PhoenixHBaseAccessor hdb;
-
-
-  @Before
-  public void setUp() throws Exception {
-    hdb = createTestableHBaseAccessor();
-    conn = getConnection(getUrl());
-    Statement stmt = conn.createStatement();
-
-    hdb.initMetricSchema();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    Connection conn = getConnection(getUrl());
-    Statement stmt = conn.createStatement();
-
-    stmt.execute("delete from METRIC_AGGREGATE");
-    stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
-    stmt.execute("delete from METRIC_RECORD");
-    stmt.execute("delete from METRIC_RECORD_HOURLY");
-    stmt.execute("delete from METRIC_RECORD_MINUTE");
-    conn.commit();
-
-    stmt.close();
-    conn.close();
-  }
-  /**
-   * A canary test.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testClusterOK() throws Exception {
-    Connection conn = getConnection(getUrl());
-    Statement stmt = conn.createStatement();
-    String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
-      "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
-      "DATA_BLOCK_ENCODING='FAST_DIFF', " +
-      "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
-
-    stmt.executeUpdate(sampleDDL);
-    ResultSet rs = stmt.executeQuery(
-      "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
-
-    rs.next();
-    long l = rs.getLong(1);
-    assertThat(l).isGreaterThanOrEqualTo(0);
-
-    stmt.execute("DROP TABLE TEST_METRICS");
-  }
-
-  //  @Test
-  public void testShouldInsertMetrics() throws Exception {
-    // GIVEN
-
-    // WHEN
-    long startTime = System.currentTimeMillis();
-    TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
-    hdb.insertMetricRecords(metricsSent);
-
-    Condition queryCondition = new Condition(null, "local", null, null,
-      startTime, startTime + (15 * 60 * 1000), null, false);
-    TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition);
-
-    // THEN
-    assertThat(recordRead.getMetrics()).hasSize(2)
-      .extracting("metricName")
-      .containsOnly("mem_free", "disk_free");
-
-    assertThat(metricsSent.getMetrics())
-      .usingElementComparator(TIME_IGNORING_COMPARATOR)
-      .containsExactlyElementsOf(recordRead.getMetrics());
-  }
-
-  //  @Test
-  public void testShouldAggregateMinuteProperly() throws Exception {
-    // GIVEN
-    TimelineMetricAggregatorMinute aggregatorMinute =
-      new TimelineMetricAggregatorMinute(hdb, new Configuration());
-
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000;
-    hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-
-    // WHEN
-    long endTime = startTime + 1000 * 60 * 4;
-    boolean success = aggregatorMinute.doWork(startTime, endTime);
-
-    //THEN
-    Condition condition = new Condition(null, null, null, null, startTime,
-      endTime, null, true);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
-
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-    AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
-      createMetricHostAggregate(2.0, 0.0, 20, 15.0);
-
-    int count = 0;
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
-      AbstractTimelineAggregator.MetricHostAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
-      if ("disk_free".equals(currentMetric.getMetricName())) {
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(0.0, currentHostAggregate.getMin());
-        assertEquals(20, currentHostAggregate.getNumberOfSamples());
-        assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
-        count++;
-      } else if ("mem_free".equals(currentMetric.getMetricName())) {
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(0.0, currentHostAggregate.getMin());
-        assertEquals(20, currentHostAggregate.getNumberOfSamples());
-        assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
-        count++;
-      } else {
-        fail("Unexpected entry");
-      }
-    }
-    assertEquals("Two aggregated entries expected", 2, count);
-  }
-
-  //  @Test
-  public void testShouldAggregateHourProperly() throws Exception {
-    // GIVEN
-    TimelineMetricAggregatorHourly aggregator =
-      new TimelineMetricAggregatorHourly(hdb, new Configuration());
-    long startTime = System.currentTimeMillis();
-
-    AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
-      createMetricHostAggregate(2.0, 0.0, 20, 15.0);
-    Map<TimelineMetric, AbstractTimelineAggregator.MetricHostAggregate>
-      aggMap = new HashMap<TimelineMetric,
-      AbstractTimelineAggregator.MetricHostAggregate>();
-
-    int min_5 = 5 * 60 * 1000;
-    long ctime = startTime - min_5;
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-
-    hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
-
-    //WHEN
-    long endTime = ctime + min_5;
-    boolean success = aggregator.doWork(startTime, endTime);
-    assertTrue(success);
-
-    //THEN
-    Condition condition = new Condition(null, null, null, null, startTime,
-      endTime, null, true);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      METRICS_AGGREGATE_HOURLY_TABLE_NAME));
-
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
-      AbstractTimelineAggregator.MetricHostAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
-      if ("disk_used".equals(currentMetric.getMetricName())) {
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(0.0, currentHostAggregate.getMin());
-        assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
-        assertEquals(12 * 15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
-      }
-    }
-  }
-
-  private TimelineMetric createEmptyTimelineMetric(long startTime) {
-    TimelineMetric metric = new TimelineMetric();
-    metric.setMetricName("disk_used");
-    metric.setAppId("test_app");
-    metric.setHostName("test_host");
-    metric.setTimestamp(startTime);
-
-    return metric;
-  }
-
-  private AbstractTimelineAggregator.MetricHostAggregate
-  createMetricHostAggregate(double max, double min, int numberOfSamples,
-                            double sum) {
-    AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
-      new AbstractTimelineAggregator.MetricHostAggregate();
-    expectedAggregate.setMax(max);
-    expectedAggregate.setMin(min);
-    expectedAggregate.setNumberOfSamples(numberOfSamples);
-    expectedAggregate.setSum(sum);
-
-    return expectedAggregate;
-  }
-
-  private PhoenixHBaseAccessor createTestableHBaseAccessor() {
-    return
-      new PhoenixHBaseAccessor(
-        new Configuration(),
-        new Configuration(),
-        new ConnectionProvider() {
-          @Override
-          public Connection getConnection() {
-            Connection connection = null;
-            try {
-              connection = DriverManager.getConnection(getUrl());
-            } catch (SQLException e) {
-              LOG.warn("Unable to connect to HBase store using Phoenix.", e);
-            }
-            return connection;
-          }
-        });
-  }
-
-  private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
-    new Comparator<TimelineMetric>() {
-      @Override
-      public int compare(TimelineMetric o1, TimelineMetric o2) {
-        return o1.equalsExceptTime(o2) ? 0 : 1;
-      }
-    };
-
-  private TimelineMetrics prepareSingleTimelineMetric(long startTime,
-                                                      String host,
-                                                      double val) {
-    TimelineMetrics m = new TimelineMetrics();
-    m.setMetrics(Arrays.asList(
-      createTimelineMetric(startTime, "disk_free", host, val)));
-
-    return m;
-  }
-
-  private TimelineMetric createTimelineMetric(long startTime,
-                                              String metricName,
-                                              String host,
-                                              double val) {
-    TimelineMetric m = new TimelineMetric();
-    m.setAppId("host");
-    m.setHostName(host);
-    m.setMetricName(metricName);
-    m.setStartTime(startTime);
-    Map<Long, Double> vals = new HashMap<Long, Double>();
-    vals.put(startTime + 15000l, val);
-    vals.put(startTime + 30000l, val);
-    vals.put(startTime + 45000l, val);
-    vals.put(startTime + 60000l, val);
-
-    m.setMetricValues(vals);
-
-    return m;
-  }
-
-  private TimelineMetrics prepareTimelineMetrics(long startTime, String host) {
-    TimelineMetrics metrics = new TimelineMetrics();
-    metrics.setMetrics(Arrays.asList(
-      createMetric(startTime, "disk_free", host),
-      createMetric(startTime, "mem_free", host)));
-
-    return metrics;
-  }
-
-  private TimelineMetric createMetric(long startTime,
-                                      String metricName,
-                                      String host) {
-    TimelineMetric m = new TimelineMetric();
-    m.setAppId("host");
-    m.setHostName(host);
-    m.setMetricName(metricName);
-    m.setStartTime(startTime);
-    Map<Long, Double> vals = new HashMap<Long, Double>();
-    vals.put(startTime + 15000l, 0.0);
-    vals.put(startTime + 30000l, 0.0);
-    vals.put(startTime + 45000l, 1.0);
-    vals.put(startTime + 60000l, 2.0);
-
-    m.setMetricValues(vals);
-
-    return m;
-  }
-
-  protected static String getUrl() {
-    return MY_LOCAL_URL;
-//    return  TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
-  }
-
-  private static Connection getConnection(String url) throws SQLException {
-    return DriverManager.getConnection(url);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
index aebbdb3..5d8ba96 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
@@ -1,10 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline;
 
 import org.junit.Test;
 
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.AbstractTimelineAggregator.MetricHostAggregate;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class TestMetricHostAggregate {

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
index b0f0153..758f5a9 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
 
-public class TestPhoenixTransactSQL extends AbstractMiniHBaseClusterTest {
+public class TestPhoenixTransactSQL {
   @Test
   public void testConditionClause() throws Exception {
     Condition condition = new Condition(

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/hbase-default.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/hbase-default.xml
new file mode 100644
index 0000000..7502346
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/hbase-default.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>hbase.defaults.for.version.skip</name>
+    <value>true</value>
+    <description>
+      Set to true to skip the 'hbase.defaults.for.version' check.
+      Setting this to true can be useful in contexts other than
+      the other side of a maven generation; i.e. running in an
+      ide.  You'll want to set this boolean to true to avoid
+      seeing the RuntimException complaint: "hbase-default.xml file
+      seems to be for and old version of HBase (@@@VERSION@@@), this
+      version is X.X.X-SNAPSHOT"
+    </description>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/log4j.properties b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8520d48
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/log4j.properties
@@ -0,0 +1 @@
+log4j.rootLogger=OFF

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/logging.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/logging.properties b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/logging.properties
new file mode 100644
index 0000000..f5edede
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/logging.properties
@@ -0,0 +1,3 @@
+.level = OFF
+java.util.logging.ConsoleHandler.level=OFF
+java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter


[2/2] ambari git commit: AMBARI-7680. Implement the Metric Collector using ATS. Unit tests.

Posted by sw...@apache.org.
AMBARI-7680. Implement the Metric Collector using ATS. Unit tests.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1d817954
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1d817954
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1d817954

Branch: refs/heads/branch-metrics-dev
Commit: 1d8179543a8b35ce1d27962f44494efa75acf9bc
Parents: 3b877ac
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Nov 25 09:29:17 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Nov 25 09:29:17 2014 -0800

----------------------------------------------------------------------
 .../pom.xml                                     |  26 +-
 .../timeline/AbstractTimelineAggregator.java    | 255 +++----------
 .../timeline/DefaultPhoenixDataSource.java      |   1 +
 .../timeline/HBaseTimelineMetricStore.java      |  24 +-
 .../metrics/timeline/MetricAggregate.java       | 110 ++++++
 .../timeline/MetricClusterAggregate.java        |  74 ++++
 .../metrics/timeline/MetricHostAggregate.java   |  81 ++++
 .../metrics/timeline/PhoenixHBaseAccessor.java  |  34 +-
 .../metrics/timeline/TimelineClusterMetric.java |  97 +++++
 .../timeline/TimelineMetricAggregator.java      | 147 ++++++++
 .../TimelineMetricAggregatorFactory.java        |  89 +++++
 .../TimelineMetricAggregatorHourly.java         | 198 ----------
 .../TimelineMetricAggregatorMinute.java         | 181 ---------
 .../TimelineMetricClusterAggregator.java        | 235 ++++--------
 .../TimelineMetricClusterAggregatorHourly.java  |  98 ++---
 .../timeline/TimelineMetricConfiguration.java   |   3 +
 .../TestApplicationHistoryServer.java           |  31 +-
 .../timeline/AbstractMiniHBaseClusterTest.java  | 141 ++++---
 .../AbstractPhoenixConnectionlessTest.java      | 113 ++++++
 .../metrics/timeline/ITClusterAggregator.java   | 376 +++++++++++++++++++
 .../metrics/timeline/ITMetricAggregator.java    | 298 +++++++++++++++
 .../metrics/timeline/TestClusterAggregator.java | 275 --------------
 .../metrics/timeline/TestClusterSuite.java      |  32 ++
 .../metrics/timeline/TestHBaseAccessor.java     | 332 ----------------
 .../timeline/TestMetricHostAggregate.java       |  19 +-
 .../timeline/TestPhoenixTransactSQL.java        |   2 +-
 .../src/test/resources/hbase-default.xml        |  36 ++
 .../src/test/resources/log4j.properties         |   1 +
 .../src/test/resources/logging.properties       |   3 +
 29 files changed, 1756 insertions(+), 1556 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
index 7efdb6b..ae2872d 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
@@ -214,7 +214,18 @@
           </mappings>
         </configuration>
       </plugin>
-
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkMode>always</forkMode>
+          <systemProperties>
+            <property>
+              <name>java.util.logging.config.file</name>
+              <value>src/test/resources/logging.properties</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 
@@ -470,6 +481,19 @@
       <scope>test</scope>
       <classifier>tests</classifier>
     </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-testing-util</artifactId>
+        <version>0.98.4-hadoop2</version>
+        <scope>test</scope>
+        <optional>true</optional>
+        <exclusions>
+          <exclusion>
+            <groupId>org.jruby</groupId>
+            <artifactId>jruby-complete</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
     <dependency>
       <groupId>org.powermock</groupId>
       <artifactId>powermock-module-junit4</artifactId>

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
index 43ec648..b3c1af9 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -21,16 +21,14 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.annotate.JsonSubTypes;
-import org.codehaus.jackson.map.ObjectMapper;
 
 import java.io.File;
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Date;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -42,15 +40,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
 public abstract class AbstractTimelineAggregator implements Runnable {
   protected final PhoenixHBaseAccessor hBaseAccessor;
   private final Log LOG;
-  private static final ObjectMapper mapper;
   protected final long checkpointDelayMillis;
   protected final Integer resultsetFetchSize;
   protected Configuration metricsConf;
 
-  static {
-    mapper = new ObjectMapper();
-  }
-
   public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
                                     Configuration metricsConf) {
     this.hBaseAccessor = hBaseAccessor;
@@ -162,204 +155,78 @@ public abstract class AbstractTimelineAggregator implements Runnable {
     FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
   }
 
-  // TODO: Abstract out doWork implementation for cluster and host levels
-  protected abstract boolean doWork(long startTime, long endTime);
-
-  protected abstract Long getSleepIntervalMillis();
-
-  protected abstract Integer getCheckpointCutOffMultiplier();
-
-  protected Long getCheckpointCutOffIntervalMillis() {
-    return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
-  }
-
-  protected abstract boolean isDisabled();
-
-  protected abstract String getCheckpointLocation();
-
-  @JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
-    @JsonSubTypes.Type(value = MetricHostAggregate.class)})
-  @InterfaceAudience.Public
-  @InterfaceStability.Unstable
-  public static class MetricAggregate {
-    protected Double sum = 0.0;
-    protected Double deviation;
-    protected Double max = Double.MIN_VALUE;
-    protected Double min = Double.MAX_VALUE;
-
-    public MetricAggregate() {
-    }
+  /**
+   * Read metrics written during the time interval and save the sum and total
+   * in the aggregate table.
+   *
+   * @param startTime Sample start time
+   * @param endTime Sample end time
+   */
+  protected boolean doWork(long startTime, long endTime) {
+    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
+      "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
 
-    protected MetricAggregate(Double sum, Double deviation, Double max,
-                              Double min) {
-      this.sum = sum;
-      this.deviation = deviation;
-      this.max = max;
-      this.min = min;
-    }
+    boolean success = true;
+    PhoenixTransactSQL.Condition condition =
+      prepareMetricQueryCondition(startTime, endTime);
 
-    void updateSum(Double sum) {
-      this.sum += sum;
-    }
+    Connection conn = null;
+    PreparedStatement stmt = null;
 
-    void updateMax(Double max) {
-      if (max > this.max) {
-        this.max = max;
+    try {
+      conn = hBaseAccessor.getConnection();
+      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+      LOG.debug("Query issued @: " + new Date());
+      ResultSet rs = stmt.executeQuery();
+      LOG.debug("Query returned @: " + new Date());
+
+      aggregate(rs, startTime, endTime);
+      LOG.info("End aggregation cycle @ " + new Date());
+
+    } catch (SQLException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } catch (IOException e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
       }
-    }
-
-    void updateMin(Double min) {
-      if (min < this.min) {
-        this.min = min;
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
       }
     }
 
-    @JsonProperty("sum")
-    Double getSum() {
-      return sum;
-    }
-
-    @JsonProperty("deviation")
-    Double getDeviation() {
-      return deviation;
-    }
-
-    @JsonProperty("max")
-    Double getMax() {
-      return max;
-    }
-
-    @JsonProperty("min")
-    Double getMin() {
-      return min;
-    }
-
-    public void setSum(Double sum) {
-      this.sum = sum;
-    }
-
-    public void setDeviation(Double deviation) {
-      this.deviation = deviation;
-    }
-
-    public void setMax(Double max) {
-      this.max = max;
-    }
-
-    public void setMin(Double min) {
-      this.min = min;
-    }
-
-    public String toJSON() throws IOException {
-      return mapper.writeValueAsString(this);
-    }
+    LOG.info("End aggregation cycle @ " + new Date());
+    return success;
   }
 
-  public static class MetricClusterAggregate extends MetricAggregate {
-    private int numberOfHosts;
-
-    @JsonCreator
-    public MetricClusterAggregate() {
-    }
+  protected abstract PhoenixTransactSQL.Condition
+  prepareMetricQueryCondition(long startTime, long endTime);
 
-    MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
-                           Double max, Double min) {
-      super(sum, deviation, max, min);
-      this.numberOfHosts = numberOfHosts;
-    }
+  protected abstract void aggregate(ResultSet rs, long startTime, long endTime)
+    throws IOException, SQLException;
 
-    @JsonProperty("numberOfHosts")
-    int getNumberOfHosts() {
-      return numberOfHosts;
-    }
-
-    void updateNumberOfHosts(int count) {
-      this.numberOfHosts += count;
-    }
-
-    public void setNumberOfHosts(int numberOfHosts) {
-      this.numberOfHosts = numberOfHosts;
-    }
+  protected abstract Long getSleepIntervalMillis();
 
-    /**
-     * Find and update min, max and avg for a minute
-     */
-    void updateAggregates(MetricClusterAggregate hostAggregate) {
-      updateMax(hostAggregate.getMax());
-      updateMin(hostAggregate.getMin());
-      updateSum(hostAggregate.getSum());
-      updateNumberOfHosts(hostAggregate.getNumberOfHosts());
-    }
+  protected abstract Integer getCheckpointCutOffMultiplier();
 
-    @Override
-    public String toString() {
-//    MetricClusterAggregate
-      return "MetricAggregate{" +
-        "sum=" + sum +
-        ", numberOfHosts=" + numberOfHosts +
-        ", deviation=" + deviation +
-        ", max=" + max +
-        ", min=" + min +
-        '}';
-    }
+  protected Long getCheckpointCutOffIntervalMillis() {
+    return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
   }
 
-  /**
-   * Represents a collection of minute based aggregation of values for
-   * resolution greater than a minute.
-   */
-  public static class MetricHostAggregate extends MetricAggregate {
-
-    private long numberOfSamples = 0;
-
-    @JsonCreator
-    public MetricHostAggregate() {
-      super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
-    }
-
-    public MetricHostAggregate(Double sum, int numberOfSamples,
-                               Double deviation,
-                               Double max, Double min) {
-      super(sum, deviation, max, min);
-      this.numberOfSamples = numberOfSamples;
-    }
-
-    @JsonProperty("numberOfSamples")
-    long getNumberOfSamples() {
-      return numberOfSamples == 0 ? 1 : numberOfSamples;
-    }
-
-    void updateNumberOfSamples(long count) {
-      this.numberOfSamples += count;
-    }
-
-    public void setNumberOfSamples(long numberOfSamples) {
-      this.numberOfSamples = numberOfSamples;
-    }
-
-    public double getAvg() {
-      return sum / numberOfSamples;
-    }
+  protected abstract boolean isDisabled();
 
-    /**
-     * Find and update min, max and avg for a minute
-     */
-    void updateAggregates(MetricHostAggregate hostAggregate) {
-      updateMax(hostAggregate.getMax());
-      updateMin(hostAggregate.getMin());
-      updateSum(hostAggregate.getSum());
-      updateNumberOfSamples(hostAggregate.getNumberOfSamples());
-    }
+  protected abstract String getCheckpointLocation();
 
-    @Override
-    public String toString() {
-      return "MetricHostAggregate{" +
-        "sum=" + sum +
-        ", numberOfSamples=" + numberOfSamples +
-        ", deviation=" + deviation +
-        ", max=" + max +
-        ", min=" + min +
-        '}';
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
index c20dd14..679ee36 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
@@ -69,6 +69,7 @@ public class DefaultPhoenixDataSource implements ConnectionProvider {
     LOG.debug("Metric store connection url: " + url);
     try {
       // TODO: Exception is swallowed, it should be thrown - discuss it
+
       connection = DriverManager.getConnection(url);
     } catch (SQLException e) {
       LOG.warn("Unable to connect to HBase store using Phoenix.", e);

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index d2b96ec..a3eb731 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -24,6 +25,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
 import java.io.IOException;
 import java.net.URL;
 import java.sql.SQLException;
@@ -31,9 +33,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
 
 public class HBaseTimelineMetricStore extends AbstractService
     implements TimelineMetricStore {
@@ -82,7 +88,7 @@ public class HBaseTimelineMetricStore extends AbstractService
                                    Configuration metricsConf) {
     hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
     hBaseAccessor.initMetricSchema();
-
+//...BUG...
     // Start the cluster aggregator
     TimelineMetricClusterAggregator minuteClusterAggregator =
       new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf);
@@ -100,16 +106,18 @@ public class HBaseTimelineMetricStore extends AbstractService
     }
 
     // Start the 5 minute aggregator
-    TimelineMetricAggregatorMinute minuteHostAggregator =
-      new TimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
+    TimelineMetricAggregator minuteHostAggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute
+        (hBaseAccessor, metricsConf);
     if (!minuteHostAggregator.isDisabled()) {
       Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
       minuteAggregatorThread.start();
     }
 
     // Start hourly host aggregator
-    TimelineMetricAggregatorHourly hourlyHostAggregator =
-      new TimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
+    TimelineMetricAggregator hourlyHostAggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly
+        (hBaseAccessor, metricsConf);
     if (!hourlyHostAggregator.isDisabled()) {
       Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
       aggregatorHourlyThread.start();

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
new file mode 100644
index 0000000..61e15d7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+  @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  protected Double sum = 0.0;
+  protected Double deviation;
+  protected Double max = Double.MIN_VALUE;
+  protected Double min = Double.MAX_VALUE;
+
+  public MetricAggregate() {
+  }
+
+  MetricAggregate(Double sum, Double deviation, Double max,
+                  Double min) {
+    this.sum = sum;
+    this.deviation = deviation;
+    this.max = max;
+    this.min = min;
+  }
+
+  void updateSum(Double sum) {
+    this.sum += sum;
+  }
+
+  void updateMax(Double max) {
+    if (max > this.max) {
+      this.max = max;
+    }
+  }
+
+  void updateMin(Double min) {
+    if (min < this.min) {
+      this.min = min;
+    }
+  }
+
+  @JsonProperty("sum")
+  Double getSum() {
+    return sum;
+  }
+
+  @JsonProperty("deviation")
+  Double getDeviation() {
+    return deviation;
+  }
+
+  @JsonProperty("max")
+  Double getMax() {
+    return max;
+  }
+
+  @JsonProperty("min")
+  Double getMin() {
+    return min;
+  }
+
+  public void setSum(Double sum) {
+    this.sum = sum;
+  }
+
+  public void setDeviation(Double deviation) {
+    this.deviation = deviation;
+  }
+
+  public void setMax(Double max) {
+    this.max = max;
+  }
+
+  public void setMin(Double min) {
+    this.min = min;
+  }
+
+  public String toJSON() throws IOException {
+    return mapper.writeValueAsString(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
new file mode 100644
index 0000000..c13c85f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+  private int numberOfHosts;
+
+  @JsonCreator
+  public MetricClusterAggregate() {
+  }
+
+  MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+                         Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  @JsonProperty("numberOfHosts")
+  int getNumberOfHosts() {
+    return numberOfHosts;
+  }
+
+  void updateNumberOfHosts(int count) {
+    this.numberOfHosts += count;
+  }
+
+  public void setNumberOfHosts(int numberOfHosts) {
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  void updateAggregates(MetricClusterAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+  }
+
+  @Override
+  public String toString() {
+//    MetricClusterAggregate
+    return "MetricAggregate{" +
+      "sum=" + sum +
+      ", numberOfHosts=" + numberOfHosts +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
new file mode 100644
index 0000000..02cc207
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+  private long numberOfSamples = 0;
+
+  @JsonCreator
+  public MetricHostAggregate() {
+    super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+  }
+
+  public MetricHostAggregate(Double sum, int numberOfSamples,
+                             Double deviation,
+                             Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  @JsonProperty("numberOfSamples")
+  long getNumberOfSamples() {
+    return numberOfSamples == 0 ? 1 : numberOfSamples;
+  }
+
+  void updateNumberOfSamples(long count) {
+    this.numberOfSamples += count;
+  }
+
+  public void setNumberOfSamples(long numberOfSamples) {
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  public double getAvg() {
+    return sum / numberOfSamples;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  void updateAggregates(MetricHostAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+  }
+
+  @Override
+  public String toString() {
+    return "MetricHostAggregate{" +
+      "sum=" + sum +
+      ", numberOfSamples=" + numberOfSamples +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 0851d07..41eb30e 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -27,41 +28,16 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.AbstractTimelineAggregator.MetricClusterAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricHostAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
+  .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.*;
 
 /**
  * Provides a facade over the Phoenix API to access HBase schema

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
new file mode 100644
index 0000000..d227993
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+public class TimelineClusterMetric {
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private long timestamp;
+  private String type;
+
+  TimelineClusterMetric(String metricName, String appId, String instanceId,
+                        long timestamp, String type) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.timestamp = timestamp;
+    this.type = type;
+  }
+
+  String getMetricName() {
+    return metricName;
+  }
+
+  String getAppId() {
+    return appId;
+  }
+
+  String getInstanceId() {
+    return instanceId;
+  }
+
+  long getTimestamp() {
+    return timestamp;
+  }
+
+  String getType() { return type; }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+    if (timestamp != that.timestamp) return false;
+    if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+      return false;
+    if (!metricName.equals(that.metricName)) return false;
+
+    return true;
+  }
+
+  public boolean equalsExceptTime(TimelineClusterMetric metric) {
+    if (!metricName.equals(metric.metricName)) return false;
+    if (!appId.equals(metric.appId)) return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+
+    return true;
+  }
+  @Override
+  public int hashCode() {
+    int result = metricName.hashCode();
+    result = 31 * result + (appId != null ? appId.hashCode() : 0);
+    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "TimelineClusterMetric{" +
+      "metricName='" + metricName + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", timestamp=" + timestamp +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
new file mode 100644
index 0000000..eaa1ab9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
+public class TimelineMetricAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog
+    (TimelineMetricAggregator.class);
+
+  private final String checkpointLocation;
+  private final Long sleepIntervalMillis;
+  private final Integer checkpointCutOffMultiplier;
+  private final String hostAggregatorDisabledParam;
+  private final String inputTableName;
+  private final String outputTableName;
+
+  public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                  Configuration metricsConf,
+                                  String checkpointLocation,
+                                  Long sleepIntervalMillis,
+                                  Integer checkpointCutOffMultiplier,
+                                  String hostAggregatorDisabledParam,
+                                  String inputTableName,
+                                  String outputTableName) {
+    super(hBaseAccessor, metricsConf);
+    this.checkpointLocation = checkpointLocation;
+    this.sleepIntervalMillis = sleepIntervalMillis;
+    this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+    this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
+    this.inputTableName = inputTableName;
+    this.outputTableName = outputTableName;
+  }
+
+  @Override
+  protected String getCheckpointLocation() {
+    return checkpointLocation;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws IOException, SQLException {
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+      aggregateMetricsFromResultSet(rs);
+
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+      outputTableName);
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime,
+                                                  long endTime) {
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      inputTableName));
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("HOSTNAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
+  }
+
+  private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet
+      (ResultSet rs) throws IOException, SQLException {
+    TimelineMetric existingMetric = null;
+    MetricHostAggregate hostAggregate = null;
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+      new HashMap<TimelineMetric, MetricHostAggregate>();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+      if (existingMetric == null) {
+        // First row
+        existingMetric = currentMetric;
+        hostAggregate = new MetricHostAggregate();
+        hostAggregateMap.put(currentMetric, hostAggregate);
+      }
+
+      if (existingMetric.equalsExceptTime(currentMetric)) {
+        // Recalculate totals with current metric
+        hostAggregate.updateAggregates(currentHostAggregate);
+
+      } else {
+        // Switched over to a new metric - save existing - create new aggregate
+        hostAggregate = new MetricHostAggregate();
+        hostAggregate.updateAggregates(currentHostAggregate);
+        hostAggregateMap.put(currentMetric, hostAggregate);
+        existingMetric = currentMetric;
+      }
+    }
+    return hostAggregateMap;
+  }
+
+  @Override
+  protected Long getSleepIntervalMillis() {
+    return sleepIntervalMillis;
+  }
+
+  @Override
+  protected Integer getCheckpointCutOffMultiplier() {
+    return checkpointCutOffMultiplier;
+  }
+
+  @Override
+  protected boolean isDisabled() {
+    return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..d0dafeb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.*;
+
+/**
+ *
+ */
+public class TimelineMetricAggregatorFactory {
+  private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-checkpoint";
+  private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+    "timeline-metrics-host-aggregator-hourly-checkpoint";
+
+  public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      MINUTE_AGGREGATE_CHECKPOINT_FILE);
+    long sleepInterval = metricsConf.getLong
+      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300000l);  // 5 mins
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+    String inputTableName = METRICS_RECORD_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepInterval,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName);
+  }
+
+  public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+    long sleepInterval = metricsConf.getLong
+      (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600000l);
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+    String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+    String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+    String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+    return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepInterval,
+      checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam,
+      inputTableName,
+      outputTableName);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
deleted file mode 100644
index 16f5ab9..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.PhoenixTransactSQL.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration
-  .HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration
-  .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricAggregatorHourly extends AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog
-    (TimelineMetricAggregatorHourly.class);
-  private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
-    "timeline-metrics-host-aggregator-hourly-checkpoint";
-  private final String checkpointLocation;
-  private final Long sleepIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-
-  public TimelineMetricAggregatorHourly(PhoenixHBaseAccessor hBaseAccessor,
-                                        Configuration metricsConf) {
-
-    super(hBaseAccessor, metricsConf);
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    checkpointLocation = FilenameUtils.concat(checkpointDir,
-      MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
-
-    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
-    checkpointCutOffMultiplier =
-      metricsConf.getInt(HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected boolean doWork(long startTime, long endTime) {
-    LOG.info("Start aggregation cycle @ " + new Date());
-
-    boolean success = true;
-    Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
-    Connection conn = null;
-    PreparedStatement stmt = null;
-
-    try {
-      conn = hBaseAccessor.getConnection();
-      stmt = prepareGetMetricsSqlStmt(conn, condition);
-
-      ResultSet rs = stmt.executeQuery();
-      Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-        aggregateMetricsFromResultSet(rs);
-
-      LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
-      hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
-        METRICS_AGGREGATE_HOURLY_TABLE_NAME);
-
-    } catch (SQLException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } catch (IOException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-
-    LOG.info("End aggregation cycle @ " + new Date());
-    return success;
-  }
-
-  private Condition prepareMetricQueryCondition(long startTime, long endTime) {
-    Condition condition = new Condition(null, null, null, null, startTime,
-      endTime, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("HOSTNAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  private Map<TimelineMetric, MetricHostAggregate>
-  aggregateMetricsFromResultSet(ResultSet rs) throws SQLException, IOException {
-    TimelineMetric existingMetric = null;
-    MetricHostAggregate hostAggregate = null;
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-      new HashMap<TimelineMetric, MetricHostAggregate>();
-
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
-      MetricHostAggregate currentHostAggregate =
-        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
-      if (existingMetric == null) {
-        // First row
-        existingMetric = currentMetric;
-        hostAggregate = new MetricHostAggregate();
-        hostAggregateMap.put(currentMetric, hostAggregate);
-      }
-
-      if (existingMetric.equalsExceptTime(currentMetric)) {
-        // Recalculate totals with current metric
-        hostAggregate.updateAggregates(currentHostAggregate);
-
-      } else {
-        // Switched over to a new metric - save existing
-        hostAggregate = new MetricHostAggregate();
-        hostAggregate.updateAggregates(currentHostAggregate);
-        hostAggregateMap.put(currentMetric, hostAggregate);
-        existingMetric = currentMetric;
-      }
-    }
-    return hostAggregateMap;
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  protected boolean isDisabled() {
-    return metricsConf.getBoolean(HOST_AGGREGATOR_HOUR_DISABLED, false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
deleted file mode 100644
index ac9d12e..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricAggregatorMinute extends AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog(TimelineMetricAggregatorMinute.class);
-  private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
-    "timeline-metrics-host-aggregator-checkpoint";
-  private final String checkpointLocation;
-  private final Long sleepIntervalMillis;
-  private final Integer checkpointCutOffMultiplier;
-
-  public TimelineMetricAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor,
-                                        Configuration metricsConf) {
-    super(hBaseAccessor, metricsConf);
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    checkpointLocation = FilenameUtils.concat(checkpointDir,
-      MINUTE_AGGREGATE_CHECKPOINT_FILE);
-
-    sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
-    checkpointCutOffMultiplier =
-      metricsConf.getInt(HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
-  }
-
-  @Override
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  @Override
-  protected boolean doWork(long startTime, long endTime) {
-    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
-      "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
-
-    boolean success = true;
-    Condition condition = new Condition(null, null, null, null, startTime,
-                                        endTime, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
-      METRICS_RECORD_TABLE_NAME));
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("HOSTNAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-
-    Connection conn = null;
-    PreparedStatement stmt = null;
-
-    try {
-      conn = hBaseAccessor.getConnection();
-      stmt = prepareGetMetricsSqlStmt(conn, condition);
-      LOG.debug("Query issued @: " + new Date());
-      ResultSet rs = stmt.executeQuery();
-      LOG.debug("Query returned @: " + new Date());
-      TimelineMetric existingMetric = null;
-      MetricHostAggregate hostAggregate = null;
-
-      Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
-        new HashMap<TimelineMetric, MetricHostAggregate>();
-
-      while (rs.next()) {
-        TimelineMetric currentMetric =
-          PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
-        MetricHostAggregate currentHostAggregate =
-          PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
-        if (existingMetric == null) {
-          // First row
-          existingMetric = currentMetric;
-          hostAggregate = new MetricHostAggregate();
-          hostAggregateMap.put(currentMetric, hostAggregate);
-        }
-
-        if (existingMetric.equalsExceptTime(currentMetric)) {
-          // Recalculate totals with current metric
-          hostAggregate.updateAggregates(currentHostAggregate);
-
-        } else {
-          // Switched over to a new metric - create new aggregate
-          hostAggregate = new MetricHostAggregate();
-          hostAggregate.updateAggregates(currentHostAggregate);
-          hostAggregateMap.put(currentMetric, hostAggregate);
-          existingMetric = currentMetric;
-        }
-      }
-
-      LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
-      hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
-        METRICS_AGGREGATE_MINUTE_TABLE_NAME);
-
-    } catch (SQLException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } catch (IOException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-
-    LOG.info("End aggregation cycle @ " + new Date());
-    return success;
-  }
-
-  @Override
-  protected Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  @Override
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  @Override
-  protected boolean isDisabled() {
-    return metricsConf.getBoolean(HOST_AGGREGATOR_MINUTE_DISABLED, false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
index c52451e..96de1a9 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -24,27 +25,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.TimelineMetricConfiguration.*;
 
 /**
  * Aggregates a metric across all hosts in the cluster. Reads metrics from
@@ -82,98 +74,79 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
     return checkpointLocation;
   }
 
-  /**
-   * Read metrics written during the time interval and save the sum and total
-   * in the aggregate table.
-   *
-   * @param startTime Sample start time
-   * @param endTime Sample end time
-   */
-  protected boolean doWork(long startTime, long endTime) {
-    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
-      "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
-
-    boolean success = true;
-    Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
-    Connection conn;
-    PreparedStatement stmt;
-
-    try {
-      conn = hBaseAccessor.getConnection();
-      stmt = prepareGetMetricsSqlStmt(conn, condition);
-      LOG.debug("Query issued @: " + new Date());
-      ResultSet rs = stmt.executeQuery();
-      LOG.debug("Query returned @: " + new Date());
-      Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
-        new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-      List<Long[]> timeSlices = new ArrayList<Long[]>();
-      // Create time slices
-      long sliceStartTime = startTime;
-      while (sliceStartTime < endTime) {
-        timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
-        sliceStartTime += timeSliceIntervalMillis;
-      }
-
-      while (rs.next()) {
-        TimelineMetric metric =
-          PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
-
-        Map<TimelineClusterMetric, Double> clusterMetrics =
-          sliceFromTimelineMetric(metric, timeSlices);
-
-        if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
-          for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
-              clusterMetrics.entrySet()) {
-            TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
-            MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
-            Double avgValue = clusterMetricEntry.getValue();
-
-            if (aggregate == null) {
-              aggregate = new MetricClusterAggregate(avgValue, 1, null,
-                avgValue, avgValue);
-              aggregateClusterMetrics.put(clusterMetric, aggregate);
-            } else {
-              aggregate.updateSum(avgValue);
-              aggregate.updateNumberOfHosts(1);
-              aggregate.updateMax(avgValue);
-              aggregate.updateMin(avgValue);
-            }
-          }
-        }
-      }
-      LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
-
-      hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
-
-      LOG.info("End aggregation cycle @ " + new Date());
-
-    } catch (SQLException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } catch (IOException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    }
-
-    return success;
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws SQLException, IOException {
+    List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+    Map<TimelineClusterMetric, MetricClusterAggregate>
+      aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
+
+    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
   }
 
-  private Condition prepareMetricQueryCondition(long startTime, long endTime) {
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
     Condition condition = new Condition(null, null, null, null, startTime,
-                                        endTime, null, true);
-    condition.setFetchSize(resultsetFetchSize);
+      endTime, null, true);
     condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
     condition.setStatement(String.format(GET_METRIC_SQL,
       METRICS_RECORD_TABLE_NAME));
     condition.addOrderByColumn("METRIC_NAME");
     condition.addOrderByColumn("APP_ID");
     condition.addOrderByColumn("INSTANCE_ID");
     condition.addOrderByColumn("SERVER_TIME");
-
     return condition;
   }
 
+  private List<Long[]> getTimeSlices(long startTime, long endTime) {
+    List<Long[]> timeSlices = new ArrayList<Long[]>();
+    long sliceStartTime = startTime;
+    while (sliceStartTime < endTime) {
+      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
+      sliceStartTime += timeSliceIntervalMillis;
+    }
+    return timeSlices;
+  }
+
+  private Map<TimelineClusterMetric, MetricClusterAggregate>
+  aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+    throws SQLException, IOException {
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+    // Create time slices
+
+    while (rs.next()) {
+      TimelineMetric metric =
+        PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
+
+      Map<TimelineClusterMetric, Double> clusterMetrics =
+        sliceFromTimelineMetric(metric, timeSlices);
+
+      if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+        for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+            clusterMetrics.entrySet()) {
+          TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+          MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+          Double avgValue = clusterMetricEntry.getValue();
+
+          if (aggregate == null) {
+            aggregate = new MetricClusterAggregate(avgValue, 1, null,
+              avgValue, avgValue);
+            aggregateClusterMetrics.put(clusterMetric, aggregate);
+          } else {
+            aggregate.updateSum(avgValue);
+            aggregate.updateNumberOfHosts(1);
+            aggregate.updateMax(avgValue);
+            aggregate.updateMin(avgValue);
+          }
+        }
+      }
+    }
+    return aggregateClusterMetrics;
+  }
+
   @Override
   protected Long getSleepIntervalMillis() {
     return sleepIntervalMillis;
@@ -239,82 +212,4 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
     return -1l;
   }
 
-  public static class TimelineClusterMetric {
-    private String metricName;
-    private String appId;
-    private String instanceId;
-    private long timestamp;
-    private String type;
-
-    TimelineClusterMetric(String metricName, String appId, String instanceId,
-                          long timestamp, String type) {
-      this.metricName = metricName;
-      this.appId = appId;
-      this.instanceId = instanceId;
-      this.timestamp = timestamp;
-      this.type = type;
-    }
-
-    String getMetricName() {
-      return metricName;
-    }
-
-    String getAppId() {
-      return appId;
-    }
-
-    String getInstanceId() {
-      return instanceId;
-    }
-
-    long getTimestamp() {
-      return timestamp;
-    }
-
-    String getType() { return type; }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      TimelineClusterMetric that = (TimelineClusterMetric) o;
-
-      if (timestamp != that.timestamp) return false;
-      if (appId != null ? !appId.equals(that.appId) : that.appId != null)
-        return false;
-      if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
-        return false;
-      if (!metricName.equals(that.metricName)) return false;
-
-      return true;
-    }
-
-    public boolean equalsExceptTime(TimelineClusterMetric metric) {
-      if (!metricName.equals(metric.metricName)) return false;
-      if (!appId.equals(metric.appId)) return false;
-      if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
-        return false;
-
-      return true;
-    }
-    @Override
-    public int hashCode() {
-      int result = metricName.hashCode();
-      result = 31 * result + (appId != null ? appId.hashCode() : 0);
-      result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
-      result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-      return result;
-    }
-
-    @Override
-    public String toString() {
-      return "TimelineClusterMetric{" +
-        "metricName='" + metricName + '\'' +
-        ", appId='" + appId + '\'' +
-        ", instanceId='" + instanceId + '\'' +
-        ", timestamp=" + timestamp +
-        '}';
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
index e886b71..54d3fdd 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
@@ -24,11 +24,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -40,19 +37,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline.PhoenixTransactSQL.*;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration
-  .CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.TimelineMetricConfiguration
-  .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
 
 public class TimelineMetricClusterAggregatorHourly extends
   AbstractTimelineAggregator {
@@ -77,7 +62,8 @@ public class TimelineMetricClusterAggregatorHourly extends
 
     sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
       (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
-    checkpointCutOffIntervalMillis = 7200000l;
+    checkpointCutOffIntervalMillis =  SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
     checkpointCutOffMultiplier = metricsConf.getInt
       (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
   }
@@ -88,60 +74,31 @@ public class TimelineMetricClusterAggregatorHourly extends
   }
 
   @Override
-  protected boolean doWork(long startTime, long endTime) {
-    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
-      "startTime = " + new Date(startTime) + ", endTime = " + new Date
-      (endTime));
-
-    boolean success = true;
-    Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
-    Connection conn = null;
-    PreparedStatement stmt = null;
-
-    try {
-      conn = hBaseAccessor.getConnection();
-      stmt = prepareGetMetricsSqlStmt(conn, condition);
-
-      ResultSet rs = stmt.executeQuery();
+  protected void aggregate(ResultSet rs, long startTime, long endTime)
+    throws SQLException, IOException {
       Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
         aggregateMetricsFromResultSet(rs);
 
-      LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
-      hBaseAccessor.saveClusterAggregateHourlyRecords(
-        hostAggregateMap,
-        METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
-
-    } catch (SQLException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } catch (IOException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
+    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+    hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
+      METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+  }
 
-    LOG.info("End aggregation cycle @ " + new Date());
-    return success;
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime,
+                                                  long endTime) {
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setNoLimit();
+    condition.setFetchSize(resultsetFetchSize);
+    condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
+    condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("APP_ID");
+    condition.addOrderByColumn("INSTANCE_ID");
+    condition.addOrderByColumn("SERVER_TIME");
+    return condition;
   }
 
-  // should rewrite from host agg to cluster agg
-  //
   private Map<TimelineClusterMetric, MetricHostAggregate>
   aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException {
 
@@ -189,19 +146,6 @@ public class TimelineMetricClusterAggregatorHourly extends
     agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
   }
 
-  private Condition prepareMetricQueryCondition(long startTime, long endTime) {
-    Condition condition = new Condition
-      (null, null, null, null, startTime, endTime, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
-    condition.addOrderByColumn("METRIC_NAME");
-    condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
   @Override
   protected Long getSleepIntervalMillis() {
     return sleepIntervalMillis;

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 6b19847..e022e5e 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -86,6 +86,9 @@ public interface TimelineMetricConfiguration {
   public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
     "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier";
 
+  public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL =
+    "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffInterval";
+
   public static final String GLOBAL_RESULT_LIMIT =
     "timeline.metrics.service.default.result.limit";