You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/02/07 18:05:36 UTC
ambari git commit: AMBARI-19858 : Add nodeCount metric in AMS.
(avijayan, swagle) [Forced Update!]
Repository: ambari
Updated Branches:
refs/heads/branch-2.5 c16fd52ce -> e5a7f2a50 (forced update)
AMBARI-19858 : Add nodeCount metric in AMS. (avijayan,swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e5a7f2a5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e5a7f2a5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e5a7f2a5
Branch: refs/heads/branch-2.5
Commit: e5a7f2a50cd3a590fe1c8638b09c2fb34ef6f47c
Parents: 79e676c
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Tue Feb 7 09:58:30 2017 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Tue Feb 7 10:05:12 2017 -0800
----------------------------------------------------------------------
.../metrics/timeline/PhoenixHBaseAccessor.java | 6 +-
.../TimelineMetricAppAggregator.java | 4 +-
.../TimelineMetricClusterAggregatorSecond.java | 90 +++++++++------
.../aggregators/TimelineMetricReadHelper.java | 3 +-
...melineMetricClusterAggregatorSecondTest.java | 114 +++++++++++++++++--
5 files changed, 162 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5a7f2a5/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 8d567ce..ad05025 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -153,9 +153,9 @@ public class PhoenixHBaseAccessor {
private static final int POINTS_PER_MINUTE = 6;
public static int RESULTSET_LIMIT = (int)TimeUnit.HOURS.toMinutes(2) * METRICS_PER_MINUTE * POINTS_PER_MINUTE ;
- private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
- private static ObjectMapper mapper = new ObjectMapper();
- private static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new TypeReference<TreeMap<Long, Double>>() {};
+ static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
+ static ObjectMapper mapper = new ObjectMapper();
+ static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new TypeReference<TreeMap<Long, Double>>() {};
private final Configuration hbaseConf;
private final Configuration metricsConf;
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5a7f2a5/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
index d7b0d55..44aca03 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -48,7 +48,7 @@ public class TimelineMetricAppAggregator {
// Lookup to check candidacy of an app
private final List<String> appIdsToAggregate;
private final Map<String, Set<String>> hostedAppsMap;
- Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics;
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>();
TimelineMetricMetadataManager metadataManagerInstance;
public TimelineMetricAppAggregator(TimelineMetricMetadataManager metadataManager,
@@ -64,7 +64,7 @@ public class TimelineMetricAppAggregator {
*/
public void init() {
LOG.debug("Initializing aggregation cycle.");
- aggregateClusterMetrics = new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+ aggregateClusterMetrics = new HashMap<>();
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5a7f2a5/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 6f3d8bc..6683c0d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -18,8 +18,25 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -31,29 +48,13 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
/**
* Aggregates a metric across all hosts in the cluster. Reads metrics from
* the precision table and saves into the aggregate.
*/
public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator {
- public Long timeSliceIntervalMillis;
+ Long timeSliceIntervalMillis;
private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
// Aggregator to perform app-level aggregates for host metrics
private final TimelineMetricAppAggregator appAggregator;
@@ -136,7 +137,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
/**
* Return time slices to normalize the timeseries data.
*/
- protected List<Long[]> getTimeSlices(long startTime, long endTime) {
+ List<Long[]> getTimeSlices(long startTime, long endTime) {
List<Long[]> timeSlices = new ArrayList<Long[]>();
long sliceStartTime = startTime;
while (sliceStartTime < endTime) {
@@ -146,13 +147,13 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
return timeSlices;
}
- private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
- throws SQLException, IOException {
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+ throws SQLException, IOException {
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
- int numLiveHosts = 0;
TimelineMetric metric = null;
+ Map<String, MutableInt> hostedAppCounter = new HashMap<>();
if (rs.next()) {
metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
@@ -167,7 +168,14 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
} else {
// Process the current metric
int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
- numLiveHosts = Math.max(numHosts, numLiveHosts);
+ if (!hostedAppCounter.containsKey(metric.getAppId())) {
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+ } else {
+ int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
+ if (currentHostCount < numHosts) {
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+ }
+ }
metric = nextMetric;
}
}
@@ -175,15 +183,22 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
// Process last metric
if (metric != null) {
int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
- numLiveHosts = Math.max(numHosts, numLiveHosts);
+ if (!hostedAppCounter.containsKey(metric.getAppId())) {
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+ } else {
+ int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
+ if (currentHostCount < numHosts) {
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
+ }
+ }
}
// Add app level aggregates to save
aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
- // Add liveHosts metric.
+ // Add liveHosts per AppId metrics.
long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
- processLiveHostsMetric(aggregateClusterMetrics, numLiveHosts, timestamp);
+ processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp);
return aggregateClusterMetrics;
}
@@ -196,7 +211,6 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
TimelineMetric metric, List<Long[]> timeSlices) {
// Create time slices
-
TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId());
TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
@@ -209,8 +223,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
int numHosts = 0;
if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
- for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
- clusterMetrics.entrySet()) {
+ for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) {
TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
Double avgValue = clusterMetricEntry.getValue();
@@ -415,16 +428,21 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
return -1l;
}
- private void processLiveHostsMetric(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
- int numLiveHosts, long timestamp) {
+ /* Add cluster metric for number of hosts that are hosting an appId */
+ private void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+ Map<String, MutableInt> appHostsCount, long timestamp) {
- TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
- "live_hosts", HOST_APP_ID, null, timestamp, null);
+ for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) {
+ TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
+ "live_hosts", appHostsEntry.getKey(), null, timestamp, null);
- MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate((double) numLiveHosts,
- 1, null, (double) numLiveHosts, (double) numLiveHosts);
+ Integer numOfHosts = appHostsEntry.getValue().intValue();
- aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
- }
+ MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate(
+ (double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts);
+ aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5a7f2a5/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index 7a74e24..b5f49fb 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -41,8 +41,7 @@ public class TimelineMetricReadHelper {
public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
throws SQLException, IOException {
TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
- TreeMap<Long, Double> sortedByTimeMetrics =
- PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"));
+ TreeMap<Long, Double> sortedByTimeMetrics = PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"));
metric.setMetricValues(sortedByTimeMetrics);
return metric;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5a7f2a5/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index 58d908a..2297036 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -17,19 +17,26 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-import junit.framework.Assert;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import org.easymock.EasyMock;
+import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Test;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+
+import junit.framework.Assert;
public class TimelineMetricClusterAggregatorSecondTest {
@@ -41,7 +48,7 @@ public class TimelineMetricClusterAggregatorSecondTest {
long metricInterval = 10000l;
Configuration configuration = new Configuration();
- TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+ TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null,
@@ -113,11 +120,11 @@ public class TimelineMetricClusterAggregatorSecondTest {
long sliceInterval = 30000l;
Configuration configuration = new Configuration();
- TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class);
+ TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
- EasyMock.expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey)EasyMock.anyObject()))
+ expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject()))
.andReturn(null).anyTimes();
- EasyMock.replay(metricMetadataManagerMock);
+ replay(metricMetadataManagerMock);
TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
@@ -312,4 +319,87 @@ public class TimelineMetricClusterAggregatorSecondTest {
}
+ @Test
+ public void testLiveHostCounterMetrics() throws Exception {
+ long aggregatorInterval = 120000;
+ long sliceInterval = 30000;
+
+ Configuration configuration = new Configuration();
+ TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+
+ expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
+ replay(metricMetadataManagerMock);
+
+ TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
+ METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
+ aggregatorInterval, 2, "false", "", "", aggregatorInterval,
+ sliceInterval, null);
+
+ long now = System.currentTimeMillis();
+ long startTime = now - 120000;
+ long seconds = 1000;
+ List<Long[]> slices = secondAggregator.getTimeSlices(startTime, now);
+ ResultSet rs = createNiceMock(ResultSet.class);
+
+ TreeMap<Long, Double> metricValues = new TreeMap<>();
+ metricValues.put(startTime + 15*seconds, 1.0);
+ metricValues.put(startTime + 45*seconds, 2.0);
+ metricValues.put(startTime + 75*seconds, 3.0);
+ metricValues.put(startTime + 105*seconds, 4.0);
+
+ expect(rs.next()).andReturn(true).times(6);
+ expect(rs.next()).andReturn(false);
+
+ /*
+ m1-h1-a1
+ m2-h1-a1
+ m2-h1-a2
+ m2-h2-a1
+ m2-h2-a2
+ m2-h3-a2
+
+ So live_hosts : a1 = 2
+ live_hosts : a2 = 3
+ */
+ expect(rs.getString("METRIC_NAME")).andReturn("m1").times(1);
+ expect(rs.getString("METRIC_NAME")).andReturn("m2").times(5);
+
+ expect(rs.getString("HOSTNAME")).andReturn("h1").times(3);
+ expect(rs.getString("HOSTNAME")).andReturn("h2").times(2);
+ expect(rs.getString("HOSTNAME")).andReturn("h3").times(1);
+
+ expect(rs.getString("APP_ID")).andReturn("a1").times(2);
+ expect(rs.getString("APP_ID")).andReturn("a2").times(1);
+ expect(rs.getString("APP_ID")).andReturn("a1").times(1);
+ expect(rs.getString("APP_ID")).andReturn("a2").times(2);
+
+ expect(rs.getLong("SERVER_TIME")).andReturn(now - 150000).times(6);
+ expect(rs.getLong("START_TIME")).andReturn(now - 150000).times(6);
+ expect(rs.getString("UNITS")).andReturn(null).times(6);
+
+ ObjectMapper mapper = new ObjectMapper();
+ expect(rs.getString("METRICS")).andReturn(mapper.writeValueAsString(metricValues)).times(6);
+
+ replay(rs);
+
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromResultSet(rs, slices);
+
+ Assert.assertNotNull(aggregates);
+
+ MetricClusterAggregate a1 = null, a2 = null;
+
+ for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> m : aggregates.entrySet()) {
+ if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a1")) {
+ a1 = m.getValue();
+ }
+ if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a2")) {
+ a2 = m.getValue();
+ }
+ }
+
+ Assert.assertNotNull(a1);
+ Assert.assertNotNull(a2);
+ Assert.assertEquals(2d, a1.getSum());
+ Assert.assertEquals(3d, a2.getSum());
+ }
}