You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/05/18 22:15:17 UTC

ambari git commit: AMBARI-11184. AMS: Incorrect value obtained for a datapoint in the metric data series queried from AMS. (swagle)

Repository: ambari
Updated Branches:
  refs/heads/trunk 2015be59a -> 8b0c964a8


AMBARI-11184. AMS: Incorrect value obtained for a datapoint in the metric data series queried from AMS. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8b0c964a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8b0c964a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8b0c964a

Branch: refs/heads/trunk
Commit: 8b0c964a89ffd4d352d0deac0756923242a252e9
Parents: 2015be5
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Mon May 18 13:07:06 2015 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Mon May 18 13:07:06 2015 -0700

----------------------------------------------------------------------
 .../metrics/timeline/PhoenixHBaseAccessor.java  |   8 +-
 .../timeline/TimelineMetricConfiguration.java   |   3 +
 .../aggregators/AbstractTimelineAggregator.java |   6 +-
 .../TimelineMetricClusterAggregatorMinute.java  | 131 +++++++++++++------
 .../aggregators/TimelineMetricReadHelper.java   |   4 +-
 .../timeline/AbstractMiniHBaseClusterTest.java  |  80 ++++++++++-
 .../metrics/timeline/ITClusterAggregator.java   | 130 ++++++++++++++----
 .../0.1.0/configuration/ams-site.xml            |   2 +-
 8 files changed, 290 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index b890171..bf1ae66 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -313,8 +313,7 @@ public class PhoenixHBaseAccessor {
     }
   }
 
-  public void insertMetricRecords(TimelineMetrics metrics)
-    throws SQLException, IOException {
+  public void insertMetricRecords(TimelineMetrics metrics) throws SQLException, IOException {
 
     List<TimelineMetric> timelineMetrics = metrics.getMetrics();
     if (timelineMetrics == null || timelineMetrics.isEmpty()) {
@@ -351,9 +350,8 @@ public class PhoenixHBaseAccessor {
         metricRecordStmt.setDouble(8, aggregates[0]);
         metricRecordStmt.setDouble(9, aggregates[1]);
         metricRecordStmt.setDouble(10, aggregates[2]);
-        metricRecordStmt.setLong(11, (long)aggregates[3]);
-        String json =
-          TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+        metricRecordStmt.setLong(11, (long) aggregates[3]);
+        String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
         metricRecordStmt.setString(12, json);
 
         try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 0595c20..0461261 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -157,6 +157,9 @@ public class TimelineMetricConfiguration {
   public static final String CLUSTER_AGGREGATOR_APP_IDS =
     "timeline.metrics.service.cluster.aggregator.appIds";
 
+  public static final String SERVER_SIDE_TIMESIFT_ADJUSTMENT =
+    "timeline.metrics.service.cluster.aggregator.timeshift.adjustment";
+
   public static final String HOST_APP_ID = "HOST";
 
   private Configuration hbaseConf;

http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index 415471d..37fb088 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -165,15 +165,15 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
       lastCheckPointTime = readCheckPoint();
       if (isLastCheckPointTooOld(lastCheckPointTime)) {
         LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
-          "lastCheckPointTime = " + lastCheckPointTime);
+          "lastCheckPointTime = " + new Date(lastCheckPointTime));
         lastCheckPointTime = -1;
       }
       if (lastCheckPointTime == -1) {
         // Assuming first run, save checkpoint and sleep.
         // Set checkpoint to 2 minutes in the past to allow the
         // agents/collectors to catch up
-        LOG.info("Saving checkpoint time on first run." +
-          (currentTime - checkpointDelayMillis));
+        LOG.info("Saving checkpoint time on first run. " +
+          new Date((currentTime - checkpointDelayMillis)));
         saveCheckPoint(currentTime - checkpointDelayMillis);
       }
     } catch (IOException io) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
index 293608e..9b51f98 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
@@ -33,9 +33,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 
@@ -49,6 +48,8 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
   private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
   // Aggregator to perform app-level aggregates for host metrics
   private final TimelineMetricAppAggregator appAggregator;
+  // 1 minute client side buffering adjustment
+  private final Long serverTimeShiftAdjustment;
 
   public TimelineMetricClusterAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor,
                                                Configuration metricsConf,
@@ -66,11 +67,14 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
 
     appAggregator = new TimelineMetricAppAggregator(metricsConf);
     this.timeSliceIntervalMillis = timeSliceInterval;
+    this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
   }
 
   @Override
   protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
-    List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+    // Account for time shift due to client side buffering by shifting the
+    // timestamps with the difference between server time and series start time
+    List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime);
     // Initialize app aggregates for host metrics
     appAggregator.init();
     Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
@@ -91,17 +95,20 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
       METRICS_RECORD_TABLE_NAME));
     condition.addOrderByColumn("METRIC_NAME");
+    condition.addOrderByColumn("HOSTNAME");
     condition.addOrderByColumn("APP_ID");
-    condition.addOrderByColumn("INSTANCE_ID");
     condition.addOrderByColumn("SERVER_TIME");
     return condition;
   }
 
+  /**
+   * Return time slices to normalize the timeseries data.
+   */
   private List<Long[]> getTimeSlices(long startTime, long endTime) {
     List<Long[]> timeSlices = new ArrayList<Long[]>();
     long sliceStartTime = startTime;
     while (sliceStartTime < endTime) {
-      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
+      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis });
       sliceStartTime += timeSliceIntervalMillis;
     }
     return timeSlices;
@@ -111,45 +118,72 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
       throws SQLException, IOException {
     Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
       new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-    // Create time slices
 
-    while (rs.next()) {
-      TimelineMetric metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
-
-      Map<TimelineClusterMetric, Double> clusterMetrics =
-        sliceFromTimelineMetric(metric, timeSlices);
-
-      if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
-        for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
-            clusterMetrics.entrySet()) {
-
-          TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
-          Double avgValue = clusterMetricEntry.getValue();
-
-          MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
-
-          if (aggregate == null) {
-            aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
-            aggregateClusterMetrics.put(clusterMetric, aggregate);
-          } else {
-            aggregate.updateSum(avgValue);
-            aggregate.updateNumberOfHosts(1);
-            aggregate.updateMax(avgValue);
-            aggregate.updateMin(avgValue);
-          }
-          // Update app level aggregates
-          appAggregator.processTimelineClusterMetric(clusterMetric,
-            metric.getHostName(), avgValue);
+    TimelineMetric metric = null;
+    if (rs.next()) {
+      metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+
+      // Call slice after all rows for a host are read
+      while (rs.next()) {
+        TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
+        // If rows belong to same host combine them before slicing. This
+        // avoids issues across rows that belong to same hosts but get
+        // counted as coming from different ones.
+        if (metric.equalsExceptTime(nextMetric)) {
+          metric.addMetricValues(nextMetric.getMetricValues());
+        } else {
+          // Process the current metric
+          processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+          metric = nextMetric;
         }
       }
     }
+    // Process last metric
+    if (metric != null) {
+      processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
+    }
+
     // Add app level aggregates to save
     aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
     return aggregateClusterMetrics;
   }
 
+  /**
+   * Slice metric values into interval specified by :
+   * timeline.metrics.cluster.aggregator.minute.timeslice.interval
+   * Normalize value by averaging them within the interval
+   */
+  private void processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+                                              TimelineMetric metric, List<Long[]> timeSlices) {
+    // Create time slices
+    Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices);
+
+    if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+      for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+        clusterMetrics.entrySet()) {
+
+        TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+        Double avgValue = clusterMetricEntry.getValue();
+
+        MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+
+        if (aggregate == null) {
+          aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
+          aggregateClusterMetrics.put(clusterMetric, aggregate);
+        } else {
+          aggregate.updateSum(avgValue);
+          aggregate.updateNumberOfHosts(1);
+          aggregate.updateMax(avgValue);
+          aggregate.updateMin(avgValue);
+        }
+        // Update app level aggregates
+        appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue);
+      }
+    }
+  }
+
   private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
-        TimelineMetric timelineMetric, List<Long[]> timeSlices) {
+      TimelineMetric timelineMetric, List<Long[]> timeSlices) {
 
     if (timelineMetric.getMetricValues().isEmpty()) {
       return null;
@@ -158,13 +192,20 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
     Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
       new HashMap<TimelineClusterMetric, Double>();
 
+    Long timeShift = timelineMetric.getTimestamp() - timelineMetric.getStartTime();
+    if (timeShift < 0) {
+      LOG.debug("Invalid time shift found, possible discrepancy in clocks. " +
+        "timeShift = " + timeShift);
+      timeShift = 0l;
+    }
+
     for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
       // TODO: investigate null values - pre filter
       if (metric.getValue() == null) {
         continue;
       }
-      Long timestamp = getSliceTimeForMetric(timeSlices,
-                       Long.parseLong(metric.getKey().toString()));
+
+      Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString()));
       if (timestamp != -1) {
         // Metric is within desired time range
         TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
@@ -173,12 +214,24 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
           timelineMetric.getInstanceId(),
           timestamp,
           timelineMetric.getType());
+
+        // do a sum / count here to get average for all points in a slice
+        int count = 1;
+        Double sum;
         if (!timelineClusterMetricMap.containsKey(clusterMetric)) {
-          timelineClusterMetricMap.put(clusterMetric, metric.getValue());
+          sum = metric.getValue();
         } else {
+          count++;
           Double oldValue = timelineClusterMetricMap.get(clusterMetric);
-          Double newValue = (oldValue + metric.getValue()) / 2;
-          timelineClusterMetricMap.put(clusterMetric, newValue);
+          sum = oldValue + metric.getValue();
+        }
+        timelineClusterMetricMap.put(clusterMetric, (sum / count));
+      } else {
+        if (timelineMetric.getMetricName().equals("tserver.general.entries")) {
+          LOG.info("--- Fallen off: serverTs = " + timelineMetric.getTimestamp() +
+            ", timeShift: " + timeShift +
+            ", timestamp: " + Long.parseLong(metric.getKey().toString()) +
+            ", host = " + timelineMetric.getHostName());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index 398f4c3..573e09d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -54,7 +54,9 @@ public class TimelineMetricReadHelper {
     TimelineMetric metric = new TimelineMetric();
     metric.setMetricName(rs.getString("METRIC_NAME"));
     metric.setAppId(rs.getString("APP_ID"));
-    if (!ignoreInstance) metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    if (!ignoreInstance) {
+      metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    }
     metric.setHostName(rs.getString("HOSTNAME"));
     metric.setTimestamp(rs.getLong("SERVER_TIME"));
     metric.setStartTime(rs.getLong("START_TIME"));

http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index 6cfaa2e..643e5cc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -20,7 +20,11 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
@@ -30,16 +34,22 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.LOG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -116,8 +126,7 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
 
   protected PhoenixHBaseAccessor createTestableHBaseAccessor() {
     Configuration metricsConf = new Configuration();
-    metricsConf.set(
-        TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+    metricsConf.set(TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
 
     return
         new PhoenixHBaseAccessor(
@@ -136,4 +145,71 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
               }
             });
   }
+
+  protected void insertMetricRecords(Connection conn, TimelineMetrics metrics, long currentTime)
+                                    throws SQLException, IOException {
+
+    List<TimelineMetric> timelineMetrics = metrics.getMetrics();
+    if (timelineMetrics == null || timelineMetrics.isEmpty()) {
+      LOG.debug("Empty metrics insert request.");
+      return;
+    }
+
+    PreparedStatement metricRecordStmt = null;
+
+    try {
+      metricRecordStmt = conn.prepareStatement(String.format(
+        UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
+
+      for (TimelineMetric metric : timelineMetrics) {
+        metricRecordStmt.clearParameters();
+
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("host: " + metric.getHostName() + ", " +
+            "metricName = " + metric.getMetricName() + ", " +
+            "values: " + metric.getMetricValues());
+        }
+        double[] aggregates =  AggregatorUtils.calculateAggregates(
+          metric.getMetricValues());
+
+        metricRecordStmt.setString(1, metric.getMetricName());
+        metricRecordStmt.setString(2, metric.getHostName());
+        metricRecordStmt.setString(3, metric.getAppId());
+        metricRecordStmt.setString(4, metric.getInstanceId());
+        metricRecordStmt.setLong(5, currentTime);
+        metricRecordStmt.setLong(6, metric.getStartTime());
+        metricRecordStmt.setString(7, metric.getType());
+        metricRecordStmt.setDouble(8, aggregates[0]);
+        metricRecordStmt.setDouble(9, aggregates[1]);
+        metricRecordStmt.setDouble(10, aggregates[2]);
+        metricRecordStmt.setLong(11, (long) aggregates[3]);
+        String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+        metricRecordStmt.setString(12, json);
+
+        try {
+          metricRecordStmt.executeUpdate();
+        } catch (SQLException sql) {
+          LOG.error(sql);
+        }
+      }
+
+      conn.commit();
+
+    } finally {
+      if (metricRecordStmt != null) {
+        try {
+          metricRecordStmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
index fb3bc30..fb3bd31 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
@@ -18,7 +18,10 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
 
+import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
@@ -37,8 +40,10 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static junit.framework.Assert.assertEquals;
@@ -48,8 +53,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 
 public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
@@ -113,8 +120,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
       METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
 
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
     ResultSet rs = pstmt.executeQuery();
 
     int recordCount = 0;
@@ -144,7 +150,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
-    long minute = 60 * 1000;
+    long minute = 60 * 1000 * 2;
 
     /**
      * Here we have two nodes with two instances each:
@@ -153,27 +159,33 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
      *  instance i2 |   3    |   4    |
      *
      */
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+    // Four 1's at ctime - 100
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1",
       "i1", "disk_free", 1));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+    // Four 2's at ctime - 100: different host
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2",
       "i1", "disk_free", 2));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+    // Avoid overwrite
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1",
       "i2", "disk_free", 3));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2",
       "i2", "disk_free", 4));
+
     ctime += minute;
 
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+    // Four 1's at ctime + 2 min
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1",
       "i1", "disk_free", 1));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+    // Four 1's at ctime + 2 min - different host
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2",
       "i1", "disk_free", 3));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1",
       "i2", "disk_free", 2));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2",
       "i2", "disk_free", 4));
     // WHEN
     long endTime = ctime + minute;
-    boolean success = agg.doWork(startTime, endTime);
+    boolean success = agg.doWork(startTime - 1000, endTime + 1000);
 
     //THEN
     Condition condition = new DefaultCondition(null, null, null, null, startTime,
@@ -182,29 +194,26 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
       METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
 
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
     ResultSet rs = pstmt.executeQuery();
 
     int recordCount = 0;
     while (rs.next()) {
       TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
-//        PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
       MetricClusterAggregate currentHostAggregate =
         readHelper.getMetricClusterAggregateFromResultSet(rs);
 
       if ("disk_free".equals(currentMetric.getMetricName())) {
-        System.out.println("OUTPUT: " + currentMetric+" - " +
-          ""+currentHostAggregate);
-        assertEquals(4, currentHostAggregate.getNumberOfHosts());
-        assertEquals(4.0, currentHostAggregate.getMax());
-        assertEquals(1.0, currentHostAggregate.getMin());
-        assertEquals(10.0, currentHostAggregate.getSum());
+        System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate);
+        assertEquals(2, currentHostAggregate.getNumberOfHosts());
+        assertEquals(5.0, currentHostAggregate.getSum());
         recordCount++;
       } else {
         fail("Unexpected entry");
       }
     }
+
+    Assert.assertEquals(8, recordCount);
   }
 
   @Test
@@ -244,8 +253,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
       METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
 
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
     ResultSet rs = pstmt.executeQuery();
 
     int recordCount = 0;
@@ -476,6 +484,82 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     assertEquals(1.0d, currentHostAggregate.getSum());
   }
 
+  @Test
+  public void testClusterAggregateMetricNormalization() throws Exception {
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+
+    // Sample data
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
+    metric1.setAppId("resourcemanager");
+    metric1.setHostName("h1");
+    metric1.setStartTime(1431372311811l);
+    metric1.setMetricValues(new HashMap<Long, Double>() {{
+      put(1431372311811l, 1.0);
+      put(1431372321811l, 1.0);
+      put(1431372331811l, 1.0);
+      put(1431372341811l, 1.0);
+      put(1431372351811l, 1.0);
+      put(1431372361811l, 1.0);
+      put(1431372371810l, 1.0);
+    }});
+
+    TimelineMetric metric2 = new TimelineMetric();
+    metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
+    metric2.setAppId("resourcemanager");
+    metric2.setHostName("h1");
+    metric2.setStartTime(1431372381810l);
+    metric2.setMetricValues(new HashMap<Long, Double>() {{
+      put(1431372381810l, 1.0);
+      put(1431372391811l, 1.0);
+      put(1431372401811l, 1.0);
+      put(1431372411811l, 1.0);
+      put(1431372421811l, 1.0);
+      put(1431372431811l, 1.0);
+      put(1431372441810l, 1.0);
+    }});
+
+    TimelineMetrics metrics = new TimelineMetrics();
+    metrics.setMetrics(Collections.singletonList(metric1));
+    insertMetricRecords(conn, metrics, 1431372371810l);
+
+    metrics.setMetrics(Collections.singletonList(metric2));
+    insertMetricRecords(conn, metrics, 1431372441810l);
+
+    long startTime = 1431372055000l;
+    long endTime = 1431372655000l;
+
+    agg.doWork(startTime, endTime);
+
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    while (rs.next()) {
+      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
+      MetricClusterAggregate currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs);
+
+      if ("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) {
+        assertEquals(1, currentHostAggregate.getNumberOfHosts());
+        assertEquals(1.0, currentHostAggregate.getMax());
+        assertEquals(1.0, currentHostAggregate.getMin());
+        assertEquals(1.0, currentHostAggregate.getSum());
+        recordCount++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+    Assert.assertEquals(9, recordCount);
+  }
+
   private ResultSet executeQuery(String query) throws SQLException {
     Connection conn = getConnection(getUrl());
     Statement stmt = conn.createStatement();

http://git-wip-us.apache.org/repos/asf/ambari/blob/8b0c964a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index fc52e5a..c716bea 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -198,7 +198,7 @@
   </property>
   <property>
     <name>timeline.metrics.cluster.aggregator.minute.timeslice.interval</name>
-    <value>15</value>
+    <value>30</value>
     <description>
       Lowest resolution of desired data for cluster level minute aggregates.
     </description>