You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/11/14 20:06:20 UTC

ambari git commit: AMBARI-5707. Fix Average calculations in the timeline service.

Repository: ambari
Updated Branches:
  refs/heads/branch-metrics-dev c1b0d25af -> e5542aa5f


AMBARI-5707. Fix Average calculations in the timeline service.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e5542aa5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e5542aa5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e5542aa5

Branch: refs/heads/branch-metrics-dev
Commit: e5542aa5fa2fa9363a068fe4f18c86a23291162f
Parents: c1b0d25
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Fri Nov 14 11:01:25 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Fri Nov 14 11:01:25 2014 -0800

----------------------------------------------------------------------
 .../pom.xml                                     |   6 +-
 .../timeline/AbstractTimelineAggregator.java    |  30 +-
 .../metrics/timeline/Aggregator.java            |  60 ++++
 .../metrics/timeline/ConnectionProvider.java    |  29 ++
 .../timeline/DefaultPhoenixDataSource.java      |  80 +++++
 .../metrics/timeline/PhoenixHBaseAccessor.java  | 114 +++----
 .../metrics/timeline/PhoenixTransactSQL.java    | 161 ++++++----
 .../metrics/loadsimulator/data/TestMetric.java  |   6 +-
 .../util/TestTimeStampProvider.java             |   2 +-
 .../metrics/timeline/TestHBaseAccessor.java     | 294 +++++++++++++++++++
 .../timeline/TestMetricHostAggregate.java       |  50 ++++
 11 files changed, 691 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
index 346b89d..51a7de7 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
@@ -252,9 +252,9 @@
     </dependency>
 
     <dependency>
-      <groupId>org.easytesting</groupId>
-      <artifactId>fest-assert</artifactId>
-      <version>1.4</version>
+      <groupId>org.assertj</groupId>
+      <artifactId>assertj-core</artifactId>
+      <version>1.7.0</version>
       <scope>test</scope>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
index 5b11b51..a13b591 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -273,6 +273,7 @@ public abstract class AbstractTimelineAggregator implements Runnable {
 
     @Override
     public String toString() {
+//    MetricClusterAggregate
       return "MetricAggregate{" +
         "sum=" + sum +
         ", numberOfHosts=" + numberOfHosts +
@@ -289,11 +290,30 @@ public abstract class AbstractTimelineAggregator implements Runnable {
    */
   public static class MetricHostAggregate extends MetricAggregate {
 
+    private long numberOfSamples = 0;
+
     @JsonCreator
     public MetricHostAggregate() {
       super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
     }
 
+    @JsonProperty("numberOfSamples")
+    long getNumberOfSamples() {
+      return numberOfSamples == 0 ? 1 : numberOfSamples;
+    }
+
+    void updateNumberOfSamples(long count) {
+      this.numberOfSamples += count;
+    }
+
+    public void setNumberOfSamples(long numberOfSamples) {
+      this.numberOfSamples = numberOfSamples;
+    }
+
+    public double getAvg() {
+      return sum / numberOfSamples;
+    }
+
     /**
      * Find and update min, max and avg for a minute
      */
@@ -301,20 +321,14 @@ public abstract class AbstractTimelineAggregator implements Runnable {
       updateMax(hostAggregate.getMax());
       updateMin(hostAggregate.getMin());
       updateSum(hostAggregate.getSum());
-    }
-
-    /**
-     * Reuse sum to indicate average for a host for the hour
-     */
-    @Override
-    void updateSum(Double sum) {
-      this.sum = (this.sum + sum) / 2;
+      updateNumberOfSamples(hostAggregate.getNumberOfSamples());
     }
 
     @Override
     public String toString() {
       return "MetricHostAggregate{" +
         "sum=" + sum +
+        ", numberOfSamples=" + numberOfSamples +
         ", deviation=" + deviation +
         ", max=" + max +
         ", min=" + min +

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java
new file mode 100644
index 0000000..f514298
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class Aggregator {
+
+  public double[] calculateAggregates(Map<Long, Double> metricValues) {
+    double[] values = new double[4];
+    double max = Double.MIN_VALUE;
+    double min = Double.MAX_VALUE;
+    double sum = 0.0;
+    int metricCount = 0;
+
+    if (metricValues != null && !metricValues.isEmpty()) {
+      for (Double value : metricValues.values()) {
+        // TODO: Some nulls in data - need to investigate null values from host
+        if (value != null) {
+          if (value > max) {
+            max = value;
+          }
+          if (value < min) {
+            min = value;
+          }
+          sum += value;
+        }
+      }
+      metricCount = metricValues.values().size();
+    }
+    // BR: WHY ZERO is a good idea?
+    values[0] = sum;
+    values[1] = max != Double.MIN_VALUE ? max : 0.0;
+    values[2] = min != Double.MAX_VALUE ? min : 0.0;
+    values[3] = metricCount;
+
+    return values;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
new file mode 100644
index 0000000..87650af
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+
+import java.sql.Connection;
+
+/**
+ *
+ */
+public interface ConnectionProvider {
+  public Connection getConnection();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
new file mode 100644
index 0000000..c20dd14
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class DefaultPhoenixDataSource implements ConnectionProvider {
+
+  static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
+  private static final String ZOOKEEPER_CLIENT_PORT =
+    "hbase.zookeeper.property.clientPort";
+  private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+  private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
+  private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+  private final String url;
+
+  public DefaultPhoenixDataSource(Configuration hbaseConf) {
+    String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT,
+      "2181");
+    String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
+    String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase");
+    if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+      throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+        "access HBase store using Phoenix.");
+    }
+
+    url = String.format(connectionUrl,
+      zookeeperQuorum,
+      zookeeperClientPort,
+      znodeParent);
+  }
+
+  /**
+   * Get JDBC connection to HBase store. Assumption is that the hbase
+   * configuration is present on the classpath and loaded by the caller into
+   * the Configuration object.
+   * Phoenix already caches the HConnection between the client and HBase
+   * cluster.
+   *
+   * @return @java.sql.Connection
+   */
+  public Connection getConnection() {
+    Connection connection = null;
+
+    LOG.debug("Metric store connection url: " + url);
+    try {
+      // TODO: Exception is swallowed, it should be thrown - discuss it
+      connection = DriverManager.getConnection(url);
+    } catch (SQLException e) {
+      LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+    }
+
+    return connection;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 1298973..2d53a0b 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -36,6 +36,7 @@ import java.sql.Statement;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricClusterAggregate;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricHostAggregate;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
@@ -68,26 +69,31 @@ public class PhoenixHBaseAccessor {
   private final Configuration hbaseConf;
   private final Configuration metricsConf;
   static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
-  private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
-  private static final String ZOOKEEPER_CLIENT_PORT =
-    "hbase.zookeeper.property.clientPort";
-  private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
-  private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
   static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
   /**
    * 4 metrics/min * 60 * 24: Retrieve data for 1 day.
    */
-  public static int RESULTSET_LIMIT = 5760;
+  private static final int METRICS_PER_MINUTE = 4;
+  public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) *
+    METRICS_PER_MINUTE;
   private static ObjectMapper mapper;
 
   static {
     mapper = new ObjectMapper();
   }
-
   private static TypeReference<Map<Long, Double>> metricValuesTypeRef =
     new TypeReference<Map<Long, Double>>() {};
+  private final ConnectionProvider dataSource;
 
-  public PhoenixHBaseAccessor(Configuration hbaseConf, Configuration metricsConf) {
+  public PhoenixHBaseAccessor(Configuration hbaseConf,
+                              Configuration metricsConf){
+    this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf));
+  }
+
+  public PhoenixHBaseAccessor(Configuration hbaseConf,
+                              Configuration metricsConf,
+                              ConnectionProvider dataSource) {
     this.hbaseConf = hbaseConf;
     this.metricsConf = metricsConf;
     RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, 5760);
@@ -97,6 +103,7 @@ public class PhoenixHBaseAccessor {
       LOG.error("Phoenix client jar not found in the classpath.", e);
       throw new IllegalStateException(e);
     }
+    this.dataSource = dataSource;
   }
 
   /**
@@ -105,31 +112,11 @@ public class PhoenixHBaseAccessor {
    * the Configuration object.
    * Phoenix already caches the HConnection between the client and HBase
    * cluster.
+   *
    * @return @java.sql.Connection
    */
   public Connection getConnection() {
-    Connection connection = null;
-    String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181");
-    String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
-    String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase");
-
-    if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
-      throw new IllegalStateException("Unable to find Zookeeper quorum to " +
-        "access HBase store using Phoenix.");
-    }
-
-    String url = String.format(connectionUrl, zookeeperQuorum,
-      zookeeperClientPort, znodeParent);
-
-    LOG.debug("Metric store connection url: " + url);
-
-    try {
-      connection = DriverManager.getConnection(url);
-    } catch (SQLException e) {
-      LOG.warn("Unable to connect to HBase store using Phoenix.", e);
-    }
-
-    return connection;
+    return dataSource.getConnection();
   }
 
   public static Map readMetricFromJSON(String json) throws IOException {
@@ -138,7 +125,7 @@ public class PhoenixHBaseAccessor {
 
   @SuppressWarnings("unchecked")
   static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
-      throws SQLException, IOException {
+    throws SQLException, IOException {
     TimelineMetric metric = new TimelineMetric();
     metric.setMetricName(rs.getString("METRIC_NAME"));
     metric.setAppId(rs.getString("APP_ID"));
@@ -153,7 +140,7 @@ public class PhoenixHBaseAccessor {
   }
 
   static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
-      throws SQLException, IOException {
+    throws SQLException, IOException {
     TimelineMetric metric = new TimelineMetric();
     metric.setMetricName(rs.getString("METRIC_NAME"));
     metric.setAppId(rs.getString("APP_ID"));
@@ -165,11 +152,13 @@ public class PhoenixHBaseAccessor {
   }
 
   static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
-      throws SQLException {
+    throws SQLException {
     MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
-    metricHostAggregate.setSum(rs.getDouble("METRIC_AVG"));
+    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
     metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
     metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+
     metricHostAggregate.setDeviation(0.0);
     return metricHostAggregate;
   }
@@ -199,7 +188,7 @@ public class PhoenixHBaseAccessor {
       stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
         encoding, clusterMinTtl, compression));
       stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
-          encoding, clusterHourTtl, compression));
+        encoding, clusterHourTtl, compression));
       conn.commit();
     } catch (SQLException sql) {
       LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", sql);
@@ -222,7 +211,7 @@ public class PhoenixHBaseAccessor {
   }
 
   public void insertMetricRecords(TimelineMetrics metrics)
-      throws SQLException, IOException {
+    throws SQLException, IOException {
 
     List<TimelineMetric> timelineMetrics = metrics.getMetrics();
     if (timelineMetrics == null || timelineMetrics.isEmpty()) {
@@ -247,7 +236,9 @@ public class PhoenixHBaseAccessor {
         LOG.trace("host: " + metric.getHostName() + ", " +
           "metricName = " + metric.getMetricName() + ", " +
           "values: " + metric.getMetricValues());
-        Double[] aggregates = calculateAggregates(metric.getMetricValues());
+        Aggregator agg = new Aggregator();
+        double[] aggregates =  agg.calculateAggregates(
+          metric.getMetricValues());
 
         metricRecordStmt.setString(1, metric.getMetricName());
         //metricRecordTmpStmt.setString(1, metric.getMetricName());
@@ -268,10 +259,11 @@ public class PhoenixHBaseAccessor {
         metricRecordStmt.setDouble(9, aggregates[1]);
         //metricRecordTmpStmt.setDouble(9, aggregates[1]);
         metricRecordStmt.setDouble(10, aggregates[2]);
+        metricRecordStmt.setLong(11, (long)aggregates[3]);
         //metricRecordTmpStmt.setDouble(10, aggregates[2]);
         String json =
           TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
-        metricRecordStmt.setString(11, json);
+        metricRecordStmt.setString(12, json);
         //metricRecordTmpStmt.setString(11, json);
 
         try {
@@ -309,35 +301,10 @@ public class PhoenixHBaseAccessor {
     }
   }
 
-  private Double[] calculateAggregates(Map<Long, Double> metricValues) {
-    Double[] values = new Double[3];
-    Double max = Double.MIN_VALUE;
-    Double min = Double.MAX_VALUE;
-    Double avg = 0.0;
-    if (metricValues != null && !metricValues.isEmpty()) {
-      for (Double value : metricValues.values()) {
-        // TODO: Some nulls in data - need to investigate null values from host
-        if (value != null) {
-          if (value > max) {
-            max  = value;
-          }
-          if (value < min) {
-            min = value;
-          }
-          avg += value;
-        }
-      }
-      avg /= metricValues.values().size();
-    }
-    values[0] = max != Double.MIN_VALUE ? max : 0.0;
-    values[1] = min != Double.MAX_VALUE ? min : 0.0;
-    values[2] = avg;
-    return values;
-  }
 
   @SuppressWarnings("unchecked")
   public TimelineMetrics getMetricRecords(final Condition condition)
-      throws SQLException, IOException {
+    throws SQLException, IOException {
 
     if (condition.isEmpty()) {
       throw new SQLException("No filter criteria specified.");
@@ -382,8 +349,8 @@ public class PhoenixHBaseAccessor {
   }
 
   public void saveHostAggregateRecords(Map<TimelineMetric,
-      MetricHostAggregate> hostAggregateMap, String phoenixTableName)
-      throws SQLException {
+    MetricHostAggregate> hostAggregateMap, String phoenixTableName)
+    throws SQLException {
 
     if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) {
       Connection conn = getConnection();
@@ -397,7 +364,7 @@ public class PhoenixHBaseAccessor {
           String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
 
         for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
-            hostAggregateMap.entrySet()) {
+          hostAggregateMap.entrySet()) {
 
           TimelineMetric metric = metricAggregate.getKey();
           MetricHostAggregate hostAggregate = metricAggregate.getValue();
@@ -413,8 +380,10 @@ public class PhoenixHBaseAccessor {
           stmt.setDouble(7, hostAggregate.getSum());
           stmt.setDouble(8, hostAggregate.getMax());
           stmt.setDouble(9, hostAggregate.getMin());
+          stmt.setDouble(10, hostAggregate.getNumberOfSamples());
 
           try {
+            // TODO: Why this exception is swallowed
             stmt.executeUpdate();
           } catch (SQLException sql) {
             LOG.error(sql);
@@ -457,10 +426,11 @@ public class PhoenixHBaseAccessor {
 
   /**
    * Save Metric aggregate records.
+   *
    * @throws SQLException
    */
   public void saveClusterAggregateRecords(Map<TimelineClusterMetric,
-      MetricClusterAggregate> records) throws SQLException {
+    MetricClusterAggregate> records) throws SQLException {
     if (records == null || records.isEmpty()) {
       LOG.debug("Empty aggregate records.");
       return;
@@ -475,7 +445,7 @@ public class PhoenixHBaseAccessor {
       int rowCount = 0;
 
       for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
-          aggregateEntry : records.entrySet()) {
+        aggregateEntry : records.entrySet()) {
         TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
         MetricClusterAggregate aggregate = aggregateEntry.getValue();
 
@@ -533,7 +503,7 @@ public class PhoenixHBaseAccessor {
 
 
   public TimelineMetrics getAggregateMetricRecords(final Condition condition)
-      throws SQLException {
+    throws SQLException {
 
     if (condition.isEmpty()) {
       throw new SQLException("No filter criteria specified.");
@@ -556,8 +526,8 @@ public class PhoenixHBaseAccessor {
         metric.setTimestamp(rs.getLong("SERVER_TIME"));
         metric.setStartTime(rs.getLong("SERVER_TIME"));
         Map<Long, Double> valueMap = new HashMap<Long, Double>();
-        valueMap.put(rs.getLong("SERVER_TIME"), rs.getDouble("METRIC_SUM") /
-                                              rs.getInt("HOSTS_COUNT"));
+        valueMap.put(rs.getLong("SERVER_TIME"),
+          rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
         metric.setMetricValues(valueMap);
 
         if (condition.isGrouped()) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
index db70913..8264c41 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
@@ -15,7 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,14 +36,20 @@ public class PhoenixTransactSQL {
   static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
   // TODO: Configurable TTL values
   /**
-  * Create table to store individual metric records.
-  */
+   * Create table to store individual metric records.
+   */
   public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " +
     "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " +
-    "HOSTNAME VARCHAR, SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-    "APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
-    "START_TIME UNSIGNED_LONG, UNITS CHAR(20), " +
-    "METRIC_AVG DOUBLE, METRIC_MAX DOUBLE, METRIC_MIN DOUBLE, " +
+    "HOSTNAME VARCHAR, " +
+    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+    "APP_ID VARCHAR, " +
+    "INSTANCE_ID VARCHAR, " +
+    "START_TIME UNSIGNED_LONG, " +
+    "UNITS CHAR(20), " +
+    "METRIC_SUM DOUBLE, " +
+    "METRIC_COUNT UNSIGNED_INT, " +
+    "METRIC_MAX DOUBLE, " +
+    "METRIC_MIN DOUBLE, " +
     "METRICS VARCHAR CONSTRAINT pk " +
     "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " +
     "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
@@ -50,82 +57,128 @@ public class PhoenixTransactSQL {
 
   public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " +
-    "(METRIC_NAME VARCHAR, HOSTNAME VARCHAR, " +
-    "APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
-    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-    "UNITS CHAR(20), METRIC_AVG DOUBLE, METRIC_MAX DOUBLE," +
-    "METRIC_MIN DOUBLE CONSTRAINT pk " +
-    "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
-    "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-    "TTL=%s, COMPRESSION='%s'";
+      "(METRIC_NAME VARCHAR, " +
+      "HOSTNAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE," +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE," +
+      "METRIC_MIN DOUBLE CONSTRAINT pk " +
+      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
 
   public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " +
-    "(METRIC_NAME VARCHAR, HOSTNAME VARCHAR, " +
-    "APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
-    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-    "UNITS CHAR(20), METRIC_AVG DOUBLE, METRIC_MAX DOUBLE," +
-    "METRIC_MIN DOUBLE CONSTRAINT pk " +
-    "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
-    "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
-    " COMPRESSION='%s'";
+      "(METRIC_NAME VARCHAR, " +
+      "HOSTNAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE," +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE," +
+      "METRIC_MIN DOUBLE CONSTRAINT pk " +
+      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
+      " COMPRESSION='%s'";
 
   public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " +
-    "(METRIC_NAME VARCHAR, APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
-    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-    "UNITS CHAR(20), METRIC_SUM DOUBLE, " +
-    "HOSTS_COUNT UNSIGNED_INT, METRIC_MAX DOUBLE, METRIC_MIN DOUBLE " +
-    "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
-    "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-    "TTL=%s, COMPRESSION='%s'";
+      "(METRIC_NAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_SUM DOUBLE, " +
+      "HOSTS_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE, " +
+      "METRIC_MIN DOUBLE " +
+      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
 
   public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " +
-    "(METRIC_NAME VARCHAR, APP_ID VARCHAR, INSTANCE_ID VARCHAR, " +
-    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-    "UNITS CHAR(20), METRIC_AVG DOUBLE, " +
-    "METRIC_MAX DOUBLE, METRIC_MIN DOUBLE " +
-    "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
-    "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-    "TTL=%s, COMPRESSION='%s'";
+      "(METRIC_NAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+      "UNITS CHAR(20), " +
+      "METRIC_AVG DOUBLE, " +
+      "METRIC_MAX DOUBLE, " +
+      "METRIC_MIN DOUBLE " +
+      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
 
   /**
    * Insert into metric records table.
    */
   public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
     "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " +
-    "UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN, METRICS) VALUES " +
-    "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT, " +
+    "METRICS) VALUES " +
+    "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
   public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
     "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
-    "UNITS, METRIC_SUM, HOSTS_COUNT, METRIC_MAX, METRIC_MIN) " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "HOSTS_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN) " +
     "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
   public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
     "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
-    "SERVER_TIME, UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN) " +
-    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+    "SERVER_TIME, " +
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN," +
+    "METRIC_COUNT) " +
+    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
   /**
    * Retrieve a set of rows from metrics records table.
    */
   public static final String GET_METRIC_SQL = "SELECT METRIC_NAME, " +
-    "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, METRIC_AVG, " +
-    "METRIC_MAX, METRIC_MIN, METRICS FROM %s";
+    "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT, " +
+    "METRICS " +
+    "FROM %s";
 
   public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT " +
     "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " +
-    "UNITS, METRIC_AVG, METRIC_MAX, METRIC_MIN FROM %s";
+    "UNITS, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT " +
+    "FROM %s";
 
   public static final String GET_CLUSTER_AGGREGATE_SQL =
     "SELECT METRIC_NAME, APP_ID, " +
-    "INSTANCE_ID, SERVER_TIME, METRIC_SUM, HOSTS_COUNT, METRIC_MAX, " +
-    "METRIC_MIN FROM METRIC_AGGREGATE";
-
-  public static final String METRICS_RECORD_TABLE_NAME =
-    "METRIC_RECORD";
+      "INSTANCE_ID, SERVER_TIME, " +
+      "METRIC_SUM, " +
+      "HOSTS_COUNT, " +
+      "METRIC_MAX, " +
+      "METRIC_MIN " +
+      "FROM METRIC_AGGREGATE";
+
+  public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
   public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
     "METRIC_RECORD_MINUTE";
   public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
@@ -134,7 +187,7 @@ public class PhoenixTransactSQL {
   public static final String DEFAULT_ENCODING = "FAST_DIFF";
 
   public static PreparedStatement prepareGetMetricsSqlStmt(
-      Connection connection, Condition condition) throws SQLException {
+    Connection connection, Condition condition) throws SQLException {
 
     if (condition.isEmpty()) {
       throw new IllegalArgumentException("Condition is empty.");
@@ -162,7 +215,7 @@ public class PhoenixTransactSQL {
     PreparedStatement stmt = connection.prepareStatement(sb.toString());
     int pos = 1;
     if (condition.getMetricNames() != null) {
-      for ( ; pos <= condition.getMetricNames().size(); pos++) {
+      for (; pos <= condition.getMetricNames().size(); pos++) {
         stmt.setString(pos, condition.getMetricNames().get(pos - 1));
       }
     }
@@ -190,7 +243,7 @@ public class PhoenixTransactSQL {
   }
 
   public static PreparedStatement prepareGetAggregateSqlStmt(
-      Connection connection, Condition condition) throws SQLException {
+    Connection connection, Condition condition) throws SQLException {
 
     if (condition.isEmpty()) {
       throw new IllegalArgumentException("Condition is empty.");
@@ -208,7 +261,7 @@ public class PhoenixTransactSQL {
     PreparedStatement stmt = connection.prepareStatement(sb.toString());
     int pos = 1;
     if (condition.getMetricNames() != null) {
-      for ( ; pos <= condition.getMetricNames().size(); pos++) {
+      for (; pos <= condition.getMetricNames().size(); pos++) {
         stmt.setString(pos, condition.getMetricNames().get(pos - 1));
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/TestMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/TestMetric.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/TestMetric.java
index 2e07419..a0572a2 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/TestMetric.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/TestMetric.java
@@ -22,8 +22,8 @@ import org.junit.Test;
 
 import java.io.IOException;
 
-import static org.fest.assertions.Assertions.assertThat;
-import static org.fest.assertions.MapAssert.entry;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
 import static org.junit.Assert.assertEquals;
 
 public class TestMetric {
@@ -71,7 +71,7 @@ public class TestMetric {
     assertEquals("disk_free", m.getMetricname());
     assertEquals("0", m.getStarttime());
 
-    assertThat(m.getMetrics()).isNotEmpty().hasSize(4).includes(
+    assertThat(m.getMetrics()).isNotEmpty().hasSize(4).contains(
       entry("0", "5.35"),
       entry("5000", "5.35"),
       entry("10000", "5.35"),

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java
index 49272d8..dd513aa 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimu
 
 import org.junit.Test;
 
-import static org.fest.assertions.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 
 public class TestTimeStampProvider {

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
new file mode 100644
index 0000000..7958055
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
@@ -0,0 +1,294 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.sql.*;
+import java.util.*;
+
+import static junit.framework.Assert.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.PhoenixTransactSQL.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Ignore
+public class TestHBaseAccessor {
+  private static String MY_LOCAL_URL =
+    "jdbc:phoenix:c6503.ambari.apache.org:" + 2181 + ":/hbase";
+
+  @After
+  public void tearDown() throws Exception {
+    Connection conn = getConnection(getUrl());
+    Statement stmt = conn.createStatement();
+
+    stmt.execute("DROP TABLE METRIC_AGGREGATE");
+    stmt.execute("DROP TABLE METRIC_AGGREGATE_HOURLY");
+    stmt.execute("DROP TABLE METRIC_RECORD");
+    stmt.execute("DROP TABLE METRIC_RECORD_HOURLY");
+    stmt.execute("DROP TABLE METRIC_RECORD_MINUTE");
+  }
+
+  /**
+   * A canary test.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testClusterOK() throws Exception {
+    Connection conn = getConnection(getUrl());
+    Statement stmt = conn.createStatement();
+    String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
+      "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
+      "DATA_BLOCK_ENCODING='FAST_DIFF', " +
+      "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
+
+    stmt.executeUpdate(sampleDDL);
+    ResultSet rs = stmt.executeQuery(
+      "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
+
+    rs.next();
+    long l = rs.getLong(1);
+    assertThat(l).isGreaterThanOrEqualTo(0);
+
+    stmt.execute("DROP TABLE TEST_METRICS");
+  }
+
+  @Test
+  public void testShouldInsertMetrics() throws Exception {
+    // GIVEN
+    PhoenixHBaseAccessor sut = createTestableHBaseAccessor();
+
+    // WHEN
+    long startTime = System.currentTimeMillis();
+    TimelineMetrics metricsSent = prepareTimelineMetrics(startTime);
+    sut.insertMetricRecords(metricsSent);
+
+    Condition queryCondition = new Condition(null, "local", null, null,
+      startTime, startTime + (15 * 60 * 1000), null, false);
+    TimelineMetrics recordRead = sut.getMetricRecords(queryCondition);
+
+    // THEN
+    assertThat(recordRead.getMetrics()).hasSize(2)
+      .extracting("metricName")
+      .containsOnly("mem_free", "disk_free");
+
+    assertThat(metricsSent.getMetrics())
+      .usingElementComparator(TIME_IGNORING_COMPARATOR)
+      .containsExactlyElementsOf(recordRead.getMetrics());
+  }
+
+  @Test
+  public void testShouldAggregateMinuteProperly() throws Exception {
+    // GIVEN
+    PhoenixHBaseAccessor hdb = createTestableHBaseAccessor();
+    Connection conn = getConnection(getUrl());
+    hdb.initMetricSchema();
+    TimelineMetricAggregatorMinute aggregatorMinute =
+      new TimelineMetricAggregatorMinute(hdb, new Configuration());
+
+    long startTime = System.currentTimeMillis();
+    TimelineMetrics metricsSent = prepareTimelineMetrics(startTime);
+    hdb.insertMetricRecords(metricsSent);
+    hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 1 * 60 * 1000));
+    hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 2 * 60 * 1000));
+    hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 3 * 60 * 1000));
+    hdb.insertMetricRecords(prepareTimelineMetrics(startTime + 4 * 60 * 1000));
+
+    // WHEN
+    long endTime = startTime + 1000 * 60 * 4;
+    boolean success = aggregatorMinute.doWork(startTime,
+      endTime);
+
+    //THEN
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+      (conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+    AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
+      createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+      AbstractTimelineAggregator.MetricHostAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+      } else if ("mem_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+    assertEquals("Two aggregated entries expected", 2, count);
+  }
+
+  @Test
+  public void testShouldAggregateHourProperly() throws Exception {
+    // GIVEN
+    PhoenixHBaseAccessor hdb = createTestableHBaseAccessor();
+    Connection conn = getConnection(getUrl());
+    hdb.initMetricSchema();
+    TimelineMetricAggregatorHourly aggregator =
+      new TimelineMetricAggregatorHourly(hdb, new Configuration());
+    long startTime = System.currentTimeMillis();
+
+    AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
+      createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+    Map<TimelineMetric, AbstractTimelineAggregator.MetricHostAggregate>
+      aggMap = new HashMap<TimelineMetric,
+      AbstractTimelineAggregator.MetricHostAggregate>();
+
+    int min_5 = 5 * 60 * 1000;
+    long ctime = startTime - min_5;
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+
+    hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+    //WHEN
+    long endTime = ctime + min_5;
+    boolean success = aggregator.doWork(startTime, endTime);
+    assertTrue(success);
+
+    //THEN
+    Condition condition = new Condition(null, null, null, null, startTime,
+      endTime, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      METRICS_AGGREGATE_HOURLY_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+      (conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+      AbstractTimelineAggregator.MetricHostAggregate currentHostAggregate =
+        PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_used".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(12 * 15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+      }
+    }
+  }
+
+  private TimelineMetric createEmptyTimelineMetric(long startTime) {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName("disk_used");
+    metric.setAppId("test_app");
+    metric.setHostName("test_host");
+    metric.setTimestamp(startTime);
+
+    return metric;
+  }
+
+  private AbstractTimelineAggregator.MetricHostAggregate
+  createMetricHostAggregate(double max, double min, int numberOfSamples,
+                            double sum) {
+    AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
+      new AbstractTimelineAggregator.MetricHostAggregate();
+    expectedAggregate.setMax(max);
+    expectedAggregate.setMin(min);
+    expectedAggregate.setNumberOfSamples(numberOfSamples);
+    expectedAggregate.setSum(sum);
+
+    return expectedAggregate;
+  }
+
+  private PhoenixHBaseAccessor createTestableHBaseAccessor() {
+    return
+      new PhoenixHBaseAccessor(
+        new Configuration(),
+        new Configuration(),
+        new ConnectionProvider() {
+          @Override
+          public Connection getConnection() {
+            Connection connection = null;
+            try {
+              connection = DriverManager.getConnection(getUrl());
+            } catch (SQLException e) {
+              LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+            }
+            return connection;
+          }
+        });
+  }
+
+  private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
+    new Comparator<TimelineMetric>() {
+      @Override
+      public int compare(TimelineMetric o1, TimelineMetric o2) {
+        return o1.equalsExceptTime(o2) ? 0 : 1;
+      }
+    };
+
+  private TimelineMetrics prepareTimelineMetrics(long startTime) {
+    TimelineMetrics metrics = new TimelineMetrics();
+    List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
+    allMetrics.add(createMetric(startTime, "disk_free"));
+    allMetrics.add(createMetric(startTime, "mem_free"));
+    metrics.setMetrics(allMetrics);
+
+    return metrics;
+  }
+
+  private TimelineMetric createMetric(long startTime, String metricName) {
+    TimelineMetric m = new TimelineMetric();
+    m.setAppId("host");
+    m.setHostName("local");
+    m.setMetricName(metricName);
+    m.setStartTime(startTime);
+    Map<Long, Double> vals = new HashMap<Long, Double>();
+    vals.put(startTime + 15000l, 0.0);
+    vals.put(startTime + 30000l, 0.0);
+    vals.put(startTime + 45000l, 1.0);
+    vals.put(startTime + 60000l, 2.0);
+
+    m.setMetricValues(vals);
+
+    return m;
+  }
+
+  protected static String getUrl() {
+    return MY_LOCAL_URL;
+//    return  TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
+  }
+
+  private static Connection getConnection(String url) throws SQLException {
+    return DriverManager.getConnection(url);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e5542aa5/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
new file mode 100644
index 0000000..aebbdb3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
@@ -0,0 +1,50 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline;
+
+import org.junit.Test;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+  .timeline.AbstractTimelineAggregator.MetricHostAggregate;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestMetricHostAggregate {
+
+  @Test
+  public void testCreateAggregate() throws Exception {
+    // given
+    MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2);
+
+    //then
+    assertThat(aggregate.getSum()).isEqualTo(3.0);
+    assertThat(aggregate.getMin()).isEqualTo(1.0);
+    assertThat(aggregate.getMax()).isEqualTo(2.0);
+    assertThat(aggregate.getAvg()).isEqualTo(3.0 / 2);
+  }
+
+  @Test
+  public void testUpdateAggregates() throws Exception {
+    // given
+    MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2);
+
+    //when
+    aggregate.updateAggregates(createAggregate(8.0, 0.5, 7.5, 2));
+    aggregate.updateAggregates(createAggregate(1.0, 1.0, 1.0, 1));
+
+    //then
+    assertThat(aggregate.getSum()).isEqualTo(12.0);
+    assertThat(aggregate.getMin()).isEqualTo(0.5);
+    assertThat(aggregate.getMax()).isEqualTo(7.5);
+    assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
+  }
+
+  private MetricHostAggregate createAggregate
+    (double sum, double min, double max, int samplesCount) {
+    MetricHostAggregate aggregate = new MetricHostAggregate();
+    aggregate.setSum(sum);
+    aggregate.setMax(max);
+    aggregate.setMin(min);
+    aggregate.setDeviation(0.0);
+    aggregate.setNumberOfSamples(samplesCount);
+    return aggregate;
+  }
+}
\ No newline at end of file