You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2017/07/07 14:49:38 UTC
[24/29] ambari git commit: AMBARI-10145 : Add support for tee to
another Storage service in Ambari Metrics System. (Jameel Naina Mohamed via
avijayan)
AMBARI-10145 : Add support for tee to another Storage service in Ambari Metrics System. (Jameel Naina Mohamed via avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b3ec9aab
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b3ec9aab
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b3ec9aab
Branch: refs/heads/branch-feature-AMBARI-21348
Commit: b3ec9aab4a3cc8ef2899b5f6fb8347cda7438344
Parents: 0ac6a1e
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Wed Jul 5 11:50:01 2017 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Wed Jul 5 11:50:01 2017 -0700
----------------------------------------------------------------------
.../metrics/timeline/PhoenixHBaseAccessor.java | 69 ++++++++-
.../timeline/TimelineMetricConfiguration.java | 3 +
.../timeline/TimelineMetricsAggregatorSink.java | 60 ++++++++
.../timeline/PhoenixHBaseAccessorTest.java | 73 ++++++++++
.../TimelineMetricsAggregatorMemorySink.java | 141 +++++++++++++++++++
5 files changed, 345 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/b3ec9aab/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 3ea6431..de41328 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
@@ -142,7 +143,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
-
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS;
/**
* Provides a facade over the Phoenix API to access HBase schema
@@ -177,6 +178,7 @@ public class PhoenixHBaseAccessor {
private final BlockingQueue<TimelineMetrics> insertCache;
private ScheduledExecutorService scheduledExecutorService;
private MetricsCacheCommitterThread metricsCommiterThread;
+ private TimelineMetricsAggregatorSink aggregatorSink;
private final int cacheCommitInterval;
private final boolean skipBlockCacheForAggregatorsEnabled;
private final String timelineMetricsTablesDurability;
@@ -241,6 +243,15 @@ public class PhoenixHBaseAccessor {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleWithFixedDelay(metricsCommiterThread, 0, cacheCommitInterval, TimeUnit.SECONDS);
}
+
+ Class<? extends TimelineMetricsAggregatorSink> metricSinkClass =
+ metricsConf.getClass(TIMELINE_METRIC_AGGREGATOR_SINK_CLASS, null,
+ TimelineMetricsAggregatorSink.class);
+ if (metricSinkClass != null) {
+ aggregatorSink =
+ ReflectionUtils.newInstance(metricSinkClass, metricsConf);
+ LOG.info("Initialized aggregator sink class " + metricSinkClass);
+ }
}
public boolean isInsertCacheEmpty() {
@@ -1278,6 +1289,16 @@ public class PhoenixHBaseAccessor {
LOG.info("Time to save map: " + (end - start) + ", " +
"thread = " + Thread.currentThread().getClass());
}
+ if (aggregatorSink != null) {
+ try {
+ aggregatorSink.saveHostAggregateRecords(hostAggregateMap,
+ getTablePrecision(phoenixTableName));
+ } catch (Exception e) {
+ LOG.warn(
+ "Error writing host aggregate records metrics to external sink. "
+ + e);
+ }
+ }
}
/**
@@ -1359,6 +1380,15 @@ public class PhoenixHBaseAccessor {
LOG.info("Time to save: " + (end - start) + ", " +
"thread = " + Thread.currentThread().getName());
}
+ if (aggregatorSink != null) {
+ try {
+ aggregatorSink.saveClusterAggregateRecords(records);
+ } catch (Exception e) {
+ LOG.warn(
+ "Error writing cluster aggregate records metrics to external sink. "
+ + e);
+ }
+ }
}
@@ -1439,6 +1469,43 @@ public class PhoenixHBaseAccessor {
LOG.info("Time to save: " + (end - start) + ", " +
"thread = " + Thread.currentThread().getName());
}
+ if (aggregatorSink != null) {
+ try {
+ aggregatorSink.saveClusterTimeAggregateRecords(records,
+ getTablePrecision(tableName));
+ } catch (Exception e) {
+ LOG.warn(
+ "Error writing cluster time aggregate records metrics to external sink. "
+ + e);
+ }
+ }
+ }
+
+ /**
+ * Get precision for a table
+ * @param tableName
+ * @return precision
+ */
+ private Precision getTablePrecision(String tableName) {
+ Precision tablePrecision = null;
+ switch (tableName) {
+ case METRICS_RECORD_TABLE_NAME:
+ tablePrecision = Precision.SECONDS;
+ break;
+ case METRICS_AGGREGATE_MINUTE_TABLE_NAME:
+ case METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME:
+ tablePrecision = Precision.MINUTES;
+ break;
+ case METRICS_AGGREGATE_HOURLY_TABLE_NAME:
+ case METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME:
+ tablePrecision = Precision.HOURS;
+ break;
+ case METRICS_AGGREGATE_DAILY_TABLE_NAME:
+ case METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME:
+ tablePrecision = Precision.DAYS;
+ break;
+ }
+ return tablePrecision;
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/b3ec9aab/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 474b11a..8f18aa9 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -53,6 +53,9 @@ public class TimelineMetricConfiguration {
public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
"timeline.metrics.aggregator.checkpoint.dir";
+ public static final String TIMELINE_METRIC_AGGREGATOR_SINK_CLASS =
+ "timeline.metrics.service.aggregator.sink.class";
+
public static final String TIMELINE_METRICS_CACHE_SIZE =
"timeline.metrics.cache.size";
http://git-wip-us.apache.org/repos/asf/ambari/blob/b3ec9aab/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
new file mode 100644
index 0000000..65d54c0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+
+/**
+ * This Interface for storing aggregated metrics to any external storage
+ */
+public interface TimelineMetricsAggregatorSink {
+
+ /**
+ * Save host aggregated metrics
+ *
+ * @param hostAggregateMap Map of host aggregated metrics
+ * @param precision SECOND, MINUTE, HOUR, DAY
+ */
+ void saveHostAggregateRecords(
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
+ Precision precision);
+
+ /**
+ * Save cluster time aggregated metrics
+ *
+ * @param clusterTimeAggregateMap Map of cluster aggregated metrics
+ * @param precision SECOND, MINUTE, HOUR, DAY
+ */
+ void saveClusterTimeAggregateRecords(
+ Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap,
+ Precision precision);
+
+ /**
+ * Save cluster aggregated metrics
+ *
+ * @param clusterAggregateMap Map of cluster aggregated metrics
+ */
+ void saveClusterAggregateRecords(
+ Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMap);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b3ec9aab/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index 0ea668a..a910cc2 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
@@ -245,4 +248,74 @@ public class PhoenixHBaseAccessorTest {
EasyMock.verify(timelineMetrics, connection);
}
+ @Test
+ public void testMetricsAggregatorSink() throws IOException, SQLException {
+ Configuration hbaseConf = new Configuration();
+ hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
+ Configuration metricsConf = new Configuration();
+ Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMap =
+ new HashMap<>();
+ Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap =
+ new HashMap<>();
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = new HashMap<>();
+
+ metricsConf.setStrings(
+ TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1");
+ metricsConf.setStrings(
+ TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL,
+ "100");
+ metricsConf.setStrings(
+ TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS,
+ "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsAggregatorMemorySink");
+
+ final Connection connection = EasyMock.createNiceMock(Connection.class);
+ final PreparedStatement statement =
+ EasyMock.createNiceMock(PreparedStatement.class);
+ EasyMock.expect(connection.prepareStatement(EasyMock.anyString()))
+ .andReturn(statement).anyTimes();
+ EasyMock.replay(statement);
+ EasyMock.replay(connection);
+
+ PhoenixConnectionProvider connectionProvider =
+ new PhoenixConnectionProvider() {
+ @Override
+ public HBaseAdmin getHBaseAdmin() throws IOException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return connection;
+ }
+ };
+
+ TimelineClusterMetric clusterMetric =
+ new TimelineClusterMetric("metricName", "appId", "instanceId",
+ System.currentTimeMillis(), "type");
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName("Metric1");
+ timelineMetric.setType("type1");
+ timelineMetric.setAppId("App1");
+ timelineMetric.setInstanceId("instance1");
+ timelineMetric.setHostName("host1");
+
+ clusterAggregateMap.put(clusterMetric, new MetricClusterAggregate());
+ clusterTimeAggregateMap.put(clusterMetric, new MetricHostAggregate());
+ hostAggregateMap.put(timelineMetric, new MetricHostAggregate());
+
+ PhoenixHBaseAccessor accessor =
+ new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider);
+ accessor.saveClusterAggregateRecords(clusterAggregateMap);
+ accessor.saveHostAggregateRecords(hostAggregateMap,
+ PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+ accessor.saveClusterTimeAggregateRecords(clusterTimeAggregateMap,
+ PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+
+ TimelineMetricsAggregatorMemorySink memorySink =
+ new TimelineMetricsAggregatorMemorySink();
+ assertEquals(1, memorySink.getClusterAggregateRecords().size());
+ assertEquals(1, memorySink.getClusterTimeAggregateRecords().size());
+ assertEquals(1, memorySink.getHostAggregateRecords().size());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b3ec9aab/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
new file mode 100644
index 0000000..fa0cfe9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Aggregator Memory sink implementation to perform test
+ */
+public class TimelineMetricsAggregatorMemorySink
+ implements TimelineMetricsAggregatorSink {
+
+ private static Map<Precision, Map<TimelineMetric, MetricHostAggregate>> hostAggregateRecords =
+ new HashMap<>();
+ private static Map<Precision, Map<TimelineClusterMetric, MetricHostAggregate>> clusterTimeAggregateRecords =
+ new HashMap<>();
+ private static Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateRecords =
+ new HashMap<>();
+
+ @Override
+ public void saveHostAggregateRecords(
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
+ Precision precision) {
+ if (hostAggregateMap == null || hostAggregateMap.size() == 0) {
+ return;
+ }
+
+ Map<TimelineMetric, MetricHostAggregate> aggregatedValue = null;
+ if (hostAggregateRecords.containsKey(precision)) {
+ aggregatedValue = hostAggregateRecords.get(precision);
+ } else {
+ aggregatedValue = new HashMap<>();
+ hostAggregateRecords.put(precision, aggregatedValue);
+ }
+
+ for (Entry<TimelineMetric, MetricHostAggregate> entry : hostAggregateMap
+ .entrySet()) {
+ TimelineMetric timelineMetricClone = new TimelineMetric(entry.getKey());
+ MetricHostAggregate hostAggregate = entry.getValue();
+ MetricHostAggregate hostAggregateClone = new MetricHostAggregate(
+ hostAggregate.getSum(), (int) hostAggregate.getNumberOfSamples(),
+ hostAggregate.getDeviation(), hostAggregate.getMax(),
+ hostAggregate.getMin());
+ aggregatedValue.put(timelineMetricClone, hostAggregateClone);
+ }
+ }
+
+ @Override
+ public void saveClusterTimeAggregateRecords(
+ Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap,
+ Precision precision) {
+ if (clusterTimeAggregateMap == null
+ || clusterTimeAggregateMap.size() == 0) {
+ return;
+ }
+
+ Map<TimelineClusterMetric, MetricHostAggregate> aggregatedValue = null;
+ if (clusterTimeAggregateRecords.containsKey(precision)) {
+ aggregatedValue = clusterTimeAggregateRecords.get(precision);
+ } else {
+ aggregatedValue = new HashMap<>();
+ clusterTimeAggregateRecords.put(precision, aggregatedValue);
+ }
+
+ for (Entry<TimelineClusterMetric, MetricHostAggregate> entry : clusterTimeAggregateMap
+ .entrySet()) {
+ TimelineClusterMetric clusterMetric = entry.getKey();
+ TimelineClusterMetric clusterMetricClone =
+ new TimelineClusterMetric(clusterMetric.getMetricName(),
+ clusterMetric.getAppId(), clusterMetric.getInstanceId(),
+ clusterMetric.getTimestamp(), clusterMetric.getType());
+ MetricHostAggregate hostAggregate = entry.getValue();
+ MetricHostAggregate hostAggregateClone = new MetricHostAggregate(
+ hostAggregate.getSum(), (int) hostAggregate.getNumberOfSamples(),
+ hostAggregate.getDeviation(), hostAggregate.getMax(),
+ hostAggregate.getMin());
+ aggregatedValue.put(clusterMetricClone, hostAggregateClone);
+ }
+ }
+
+ @Override
+ public void saveClusterAggregateRecords(
+ Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMaps) {
+
+ if (clusterAggregateMaps == null || clusterAggregateMaps.size() == 0) {
+ return;
+ }
+
+ for (Entry<TimelineClusterMetric, MetricClusterAggregate> entry : clusterAggregateMaps
+ .entrySet()) {
+ TimelineClusterMetric clusterMetric = entry.getKey();
+ TimelineClusterMetric clusterMetricClone =
+ new TimelineClusterMetric(clusterMetric.getMetricName(),
+ clusterMetric.getAppId(), clusterMetric.getInstanceId(),
+ clusterMetric.getTimestamp(), clusterMetric.getType());
+ MetricClusterAggregate clusterAggregate = entry.getValue();
+ MetricClusterAggregate clusterAggregateClone = new MetricClusterAggregate(
+ clusterAggregate.getSum(), (int) clusterAggregate.getNumberOfHosts(),
+ clusterAggregate.getDeviation(), clusterAggregate.getMax(),
+ clusterAggregate.getMin());
+ clusterAggregateRecords.put(clusterMetricClone, clusterAggregateClone);
+ }
+ }
+
+ public Map<Precision, Map<TimelineMetric, MetricHostAggregate>> getHostAggregateRecords() {
+ return Collections.unmodifiableMap(hostAggregateRecords);
+ }
+
+ public Map<Precision, Map<TimelineClusterMetric, MetricHostAggregate>> getClusterTimeAggregateRecords() {
+ return Collections.unmodifiableMap(clusterTimeAggregateRecords);
+ }
+
+ public Map<TimelineClusterMetric, MetricClusterAggregate> getClusterAggregateRecords() {
+ return Collections.unmodifiableMap(clusterAggregateRecords);
+ }
+
+}