You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/02/07 18:04:37 UTC

ambari git commit: AMBARI-19858 : Add nodeCount metric in AMS. (avijayan, swagle)

Repository: ambari
Updated Branches:
  refs/heads/trunk a6a7ba6d3 -> 7a072aafc


AMBARI-19858 : Add nodeCount metric in AMS. (avijayan,swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7a072aaf
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7a072aaf
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7a072aaf

Branch: refs/heads/trunk
Commit: 7a072aafc8b21f0c516213532de60ba64b5a1142
Parents: a6a7ba6
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Tue Feb 7 10:04:25 2017 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Tue Feb 7 10:04:25 2017 -0800

----------------------------------------------------------------------
 .../metrics/timeline/PhoenixHBaseAccessor.java  |   6 +-
 .../TimelineMetricAppAggregator.java            |   4 +-
 .../TimelineMetricClusterAggregatorSecond.java  |  92 ++++++++-------
 .../aggregators/TimelineMetricReadHelper.java   |   3 +-
 ...melineMetricClusterAggregatorSecondTest.java | 114 +++++++++++++++++--
 5 files changed, 161 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7a072aaf/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 3add411..04405f7 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -155,9 +155,9 @@ public class PhoenixHBaseAccessor {
   private static final int POINTS_PER_MINUTE = 6;
   public static int RESULTSET_LIMIT = (int)TimeUnit.HOURS.toMinutes(2) * METRICS_PER_MINUTE * POINTS_PER_MINUTE ;
 
-  private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
-  private static ObjectMapper mapper = new ObjectMapper();
-  private static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new TypeReference<TreeMap<Long, Double>>() {};
+  static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
+  static ObjectMapper mapper = new ObjectMapper();
+  static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new TypeReference<TreeMap<Long, Double>>() {};
 
   private final Configuration hbaseConf;
   private final Configuration metricsConf;

http://git-wip-us.apache.org/repos/asf/ambari/blob/7a072aaf/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
index d7b0d55..44aca03 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -48,7 +48,7 @@ public class TimelineMetricAppAggregator {
   // Lookup to check candidacy of an app
   private final List<String> appIdsToAggregate;
   private final Map<String, Set<String>> hostedAppsMap;
-  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics;
+  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>();
   TimelineMetricMetadataManager metadataManagerInstance;
 
   public TimelineMetricAppAggregator(TimelineMetricMetadataManager metadataManager,
@@ -64,7 +64,7 @@ public class TimelineMetricAppAggregator {
    */
   public void init() {
     LOG.debug("Initializing aggregation cycle.");
-    aggregateClusterMetrics = new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+    aggregateClusterMetrics = new HashMap<>();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/7a072aaf/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 6bc0f18..5310906 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -18,8 +18,25 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -32,26 +49,6 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
-
 /**
  * Aggregates a metric across all hosts in the cluster. Reads metrics from
  * the precision table and saves into the aggregate.
@@ -139,7 +136,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
   /**
    * Return time slices to normalize the timeseries data.
    */
-  protected List<Long[]> getTimeSlices(long startTime, long endTime) {
+  List<Long[]> getTimeSlices(long startTime, long endTime) {
     List<Long[]> timeSlices = new ArrayList<Long[]>();
     long sliceStartTime = startTime;
     while (sliceStartTime < endTime) {
@@ -149,13 +146,13 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
     return timeSlices;
   }
 
-  private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
-    throws SQLException, IOException {
+  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+      throws SQLException, IOException {
     Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
       new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-    int numLiveHosts = 0;
 
     TimelineMetric metric = null;
+    Map<String, MutableInt> hostedAppCounter = new HashMap<>();
     if (rs.next()) {
       metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
 
@@ -170,7 +167,14 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
         } else {
           // Process the current metric
           int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
-          numLiveHosts = Math.max(numHosts, numLiveHosts);
+          if (!hostedAppCounter.containsKey(metric.getAppId())) {
+            hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+          } else {
+            int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
+            if (currentHostCount < numHosts) {
+              hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+            }
+          }
           metric = nextMetric;
         }
       }
@@ -178,15 +182,22 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
     // Process last metric
     if (metric != null) {
       int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
-      numLiveHosts = Math.max(numHosts, numLiveHosts);
+      if (!hostedAppCounter.containsKey(metric.getAppId())) {
+        hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+      } else {
+        int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
+        if (currentHostCount < numHosts) {
+          hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+        }
+      }
     }
 
     // Add app level aggregates to save
     aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
 
-    // Add liveHosts metric.
+    // Add liveHosts per AppId metrics.
     long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
-    processLiveHostsMetric(aggregateClusterMetrics, numLiveHosts, timestamp);
+    processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp);
 
     return aggregateClusterMetrics;
   }
@@ -199,7 +210,6 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
   protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
                                               TimelineMetric metric, List<Long[]> timeSlices) {
     // Create time slices
-
     TimelineMetricMetadataKey appKey =  new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId());
     TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
 
@@ -212,8 +222,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
     int numHosts = 0;
 
     if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
-      for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
-        clusterMetrics.entrySet()) {
+      for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) {
 
         TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
         Double avgValue = clusterMetricEntry.getValue();
@@ -418,16 +427,21 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
     return -1l;
   }
 
-  private void processLiveHostsMetric(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
-                                     int numLiveHosts, long timestamp) {
+  /* Add cluster metric for number of hosts that are hosting an appId */
+  private void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+      Map<String, MutableInt> appHostsCount, long timestamp) {
 
-    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
-      "live_hosts", HOST_APP_ID, null, timestamp, null);
+    for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) {
+      TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
+        "live_hosts", appHostsEntry.getKey(), null, timestamp, null);
 
-    MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate((double) numLiveHosts,
-      1, null, (double) numLiveHosts, (double) numLiveHosts);
+      Integer numOfHosts = appHostsEntry.getValue().intValue();
 
-    aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
-  }
+      MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate(
+        (double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts);
 
+      aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7a072aaf/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index 7a74e24..b5f49fb 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -41,8 +41,7 @@ public class TimelineMetricReadHelper {
   public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
       throws SQLException, IOException {
     TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
-    TreeMap<Long, Double> sortedByTimeMetrics =
-      PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"));
+    TreeMap<Long, Double> sortedByTimeMetrics = PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"));
     metric.setMetricValues(sortedByTimeMetrics);
     return metric;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7a072aaf/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index 97b6258..78db11d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -17,20 +17,27 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
-import junit.framework.Assert;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import org.easymock.EasyMock;
-import org.junit.Test;
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import java.sql.ResultSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+
+ import junit.framework.Assert;
 
 public class TimelineMetricClusterAggregatorSecondTest {
 
@@ -42,7 +49,7 @@ public class TimelineMetricClusterAggregatorSecondTest {
     long metricInterval = 10000l;
 
     Configuration configuration = new Configuration();
-    TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
 
     TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
       METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null,
@@ -114,11 +121,11 @@ public class TimelineMetricClusterAggregatorSecondTest {
     long sliceInterval = 30000l;
 
     Configuration configuration = new Configuration();
-    TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
 
-    EasyMock.expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey)EasyMock.anyObject()))
+    expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject()))
       .andReturn(null).anyTimes();
-    EasyMock.replay(metricMetadataManagerMock);
+    replay(metricMetadataManagerMock);
 
     TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
       METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
@@ -313,4 +320,87 @@ public class TimelineMetricClusterAggregatorSecondTest {
 
   }
 
+  @Test
+  public void testLiveHostCounterMetrics() throws Exception {
+    long aggregatorInterval = 120000;
+    long sliceInterval = 30000;
+
+    Configuration configuration = new Configuration();
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+
+    expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
+      METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
+      aggregatorInterval, 2, "false", "", "", aggregatorInterval,
+      sliceInterval, null);
+
+    long now = System.currentTimeMillis();
+    long startTime = now - 120000;
+    long seconds = 1000;
+    List<Long[]> slices = secondAggregator.getTimeSlices(startTime, now);
+    ResultSet rs = createNiceMock(ResultSet.class);
+
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
+    metricValues.put(startTime + 15*seconds, 1.0);
+    metricValues.put(startTime + 45*seconds, 2.0);
+    metricValues.put(startTime + 75*seconds, 3.0);
+    metricValues.put(startTime + 105*seconds, 4.0);
+
+    expect(rs.next()).andReturn(true).times(6);
+    expect(rs.next()).andReturn(false);
+
+    /*
+    m1-h1-a1
+    m2-h1-a1
+    m2-h1-a2
+    m2-h2-a1
+    m2-h2-a2
+    m2-h3-a2
+
+    So live_hosts : a1 = 2
+       live_hosts : a2 = 3
+     */
+    expect(rs.getString("METRIC_NAME")).andReturn("m1").times(1);
+    expect(rs.getString("METRIC_NAME")).andReturn("m2").times(5);
+
+    expect(rs.getString("HOSTNAME")).andReturn("h1").times(3);
+    expect(rs.getString("HOSTNAME")).andReturn("h2").times(2);
+    expect(rs.getString("HOSTNAME")).andReturn("h3").times(1);
+
+    expect(rs.getString("APP_ID")).andReturn("a1").times(2);
+    expect(rs.getString("APP_ID")).andReturn("a2").times(1);
+    expect(rs.getString("APP_ID")).andReturn("a1").times(1);
+    expect(rs.getString("APP_ID")).andReturn("a2").times(2);
+
+    expect(rs.getLong("SERVER_TIME")).andReturn(now - 150000).times(6);
+    expect(rs.getLong("START_TIME")).andReturn(now - 150000).times(6);
+    expect(rs.getString("UNITS")).andReturn(null).times(6);
+
+    ObjectMapper mapper = new ObjectMapper();
+    expect(rs.getString("METRICS")).andReturn(mapper.writeValueAsString(metricValues)).times(6);
+
+    replay(rs);
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromResultSet(rs, slices);
+
+    Assert.assertNotNull(aggregates);
+
+    MetricClusterAggregate a1 = null, a2 = null;
+
+    for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> m : aggregates.entrySet()) {
+      if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a1")) {
+        a1 = m.getValue();
+      }
+      if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a2")) {
+        a2 = m.getValue();
+      }
+    }
+
+    Assert.assertNotNull(a1);
+    Assert.assertNotNull(a2);
+    Assert.assertEquals(2d, a1.getSum());
+    Assert.assertEquals(3d, a2.getSum());
+  }
 }