You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2017/07/07 14:49:38 UTC

[24/29] ambari git commit: AMBARI-10145 : Add support for tee to another Storage service in Ambari Metrics System. (Jameel Naina Mohamed via avijayan)

AMBARI-10145 : Add support for tee to another Storage service in Ambari Metrics System. (Jameel Naina Mohamed via avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b3ec9aab
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b3ec9aab
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b3ec9aab

Branch: refs/heads/branch-feature-AMBARI-21348
Commit: b3ec9aab4a3cc8ef2899b5f6fb8347cda7438344
Parents: 0ac6a1e
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Wed Jul 5 11:50:01 2017 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Wed Jul 5 11:50:01 2017 -0700

----------------------------------------------------------------------
 .../metrics/timeline/PhoenixHBaseAccessor.java  |  69 ++++++++-
 .../timeline/TimelineMetricConfiguration.java   |   3 +
 .../timeline/TimelineMetricsAggregatorSink.java |  60 ++++++++
 .../timeline/PhoenixHBaseAccessorTest.java      |  73 ++++++++++
 .../TimelineMetricsAggregatorMemorySink.java    | 141 +++++++++++++++++++
 5 files changed, 345 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b3ec9aab/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 3ea6431..de41328 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
@@ -142,7 +143,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
-
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS;
 
 /**
  * Provides a facade over the Phoenix API to access HBase schema
@@ -177,6 +178,7 @@ public class PhoenixHBaseAccessor {
   private final BlockingQueue<TimelineMetrics> insertCache;
   private ScheduledExecutorService scheduledExecutorService;
   private MetricsCacheCommitterThread metricsCommiterThread;
+  private TimelineMetricsAggregatorSink aggregatorSink;
   private final int cacheCommitInterval;
   private final boolean skipBlockCacheForAggregatorsEnabled;
   private final String timelineMetricsTablesDurability;
@@ -241,6 +243,15 @@ public class PhoenixHBaseAccessor {
       scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
       scheduledExecutorService.scheduleWithFixedDelay(metricsCommiterThread, 0, cacheCommitInterval, TimeUnit.SECONDS);
     }
+
+    Class<? extends TimelineMetricsAggregatorSink> metricSinkClass =
+        metricsConf.getClass(TIMELINE_METRIC_AGGREGATOR_SINK_CLASS, null,
+            TimelineMetricsAggregatorSink.class);
+    if (metricSinkClass != null) {
+      aggregatorSink =
+          ReflectionUtils.newInstance(metricSinkClass, metricsConf);
+      LOG.info("Initialized aggregator sink class " + metricSinkClass);
+    }
   }
 
   public boolean isInsertCacheEmpty() {
@@ -1278,6 +1289,16 @@ public class PhoenixHBaseAccessor {
       LOG.info("Time to save map: " + (end - start) + ", " +
         "thread = " + Thread.currentThread().getClass());
     }
+    if (aggregatorSink != null) {
+      try {
+        aggregatorSink.saveHostAggregateRecords(hostAggregateMap,
+            getTablePrecision(phoenixTableName));
+      } catch (Exception e) {
+        LOG.warn(
+            "Error writing host aggregate records metrics to external sink. "
+                + e);
+      }
+    }
   }
 
   /**
@@ -1359,6 +1380,15 @@ public class PhoenixHBaseAccessor {
       LOG.info("Time to save: " + (end - start) + ", " +
         "thread = " + Thread.currentThread().getName());
     }
+    if (aggregatorSink != null) {
+      try {
+        aggregatorSink.saveClusterAggregateRecords(records);
+      } catch (Exception e) {
+        LOG.warn(
+            "Error writing cluster aggregate records metrics to external sink. "
+                + e);
+      }
+    }
   }
 
 
@@ -1439,6 +1469,43 @@ public class PhoenixHBaseAccessor {
       LOG.info("Time to save: " + (end - start) + ", " +
         "thread = " + Thread.currentThread().getName());
     }
+    if (aggregatorSink != null) {
+      try {
+        aggregatorSink.saveClusterTimeAggregateRecords(records,
+            getTablePrecision(tableName));
+      } catch (Exception e) {
+        LOG.warn(
+            "Error writing cluster time aggregate records metrics to external sink. "
+                + e);
+      }
+    }
+  }
+
+  /**
+   * Get precision for a table
+   * @param tableName
+   * @return precision
+   */
+  private Precision getTablePrecision(String tableName) {
+    Precision tablePrecision = null;
+    switch (tableName) {
+    case METRICS_RECORD_TABLE_NAME:
+      tablePrecision = Precision.SECONDS;
+      break;
+    case METRICS_AGGREGATE_MINUTE_TABLE_NAME:
+    case METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME:
+      tablePrecision = Precision.MINUTES;
+      break;
+    case METRICS_AGGREGATE_HOURLY_TABLE_NAME:
+    case METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME:
+      tablePrecision = Precision.HOURS;
+      break;
+    case METRICS_AGGREGATE_DAILY_TABLE_NAME:
+    case METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME:
+      tablePrecision = Precision.DAYS;
+      break;
+    }
+    return tablePrecision;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/b3ec9aab/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 474b11a..8f18aa9 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -53,6 +53,9 @@ public class TimelineMetricConfiguration {
   public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
     "timeline.metrics.aggregator.checkpoint.dir";
 
+  public static final String TIMELINE_METRIC_AGGREGATOR_SINK_CLASS =
+    "timeline.metrics.service.aggregator.sink.class";
+
   public static final String TIMELINE_METRICS_CACHE_SIZE =
     "timeline.metrics.cache.size";
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b3ec9aab/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
new file mode 100644
index 0000000..65d54c0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+
+/**
+ * This Interface for storing aggregated metrics to any external storage
+ */
+public interface TimelineMetricsAggregatorSink {
+
+  /**
+   * Save host aggregated metrics
+   *
+   * @param hostAggregateMap Map of host aggregated metrics
+   * @param precision SECOND, MINUTE, HOUR, DAY
+   */
+  void saveHostAggregateRecords(
+      Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
+      Precision precision);
+
+  /**
+   * Save cluster time aggregated metrics
+   *
+   * @param clusterTimeAggregateMap Map of cluster aggregated metrics
+   * @param precision SECOND, MINUTE, HOUR, DAY
+   */
+  void saveClusterTimeAggregateRecords(
+      Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap,
+      Precision precision);
+
+  /**
+   * Save cluster aggregated metrics
+   *
+   * @param clusterAggregateMap Map of cluster aggregated metrics
+   */
+  void saveClusterAggregateRecords(
+      Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMap);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/b3ec9aab/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index 0ea668a..a910cc2 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
@@ -245,4 +248,74 @@ public class PhoenixHBaseAccessorTest {
     EasyMock.verify(timelineMetrics, connection);
   }
 
+  @Test
+  public void testMetricsAggregatorSink() throws IOException, SQLException {
+    Configuration hbaseConf = new Configuration();
+    hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
+    Configuration metricsConf = new Configuration();
+    Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMap =
+        new HashMap<>();
+    Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap =
+        new HashMap<>();
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = new HashMap<>();
+
+    metricsConf.setStrings(
+        TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1");
+    metricsConf.setStrings(
+        TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL,
+        "100");
+    metricsConf.setStrings(
+        TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS,
+        "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsAggregatorMemorySink");
+
+    final Connection connection = EasyMock.createNiceMock(Connection.class);
+    final PreparedStatement statement =
+        EasyMock.createNiceMock(PreparedStatement.class);
+    EasyMock.expect(connection.prepareStatement(EasyMock.anyString()))
+        .andReturn(statement).anyTimes();
+    EasyMock.replay(statement);
+    EasyMock.replay(connection);
+
+    PhoenixConnectionProvider connectionProvider =
+        new PhoenixConnectionProvider() {
+          @Override
+          public HBaseAdmin getHBaseAdmin() throws IOException {
+            return null;
+          }
+
+          @Override
+          public Connection getConnection() throws SQLException {
+            return connection;
+          }
+        };
+
+    TimelineClusterMetric clusterMetric =
+        new TimelineClusterMetric("metricName", "appId", "instanceId",
+            System.currentTimeMillis(), "type");
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName("Metric1");
+    timelineMetric.setType("type1");
+    timelineMetric.setAppId("App1");
+    timelineMetric.setInstanceId("instance1");
+    timelineMetric.setHostName("host1");
+
+    clusterAggregateMap.put(clusterMetric, new MetricClusterAggregate());
+    clusterTimeAggregateMap.put(clusterMetric, new MetricHostAggregate());
+    hostAggregateMap.put(timelineMetric, new MetricHostAggregate());
+
+    PhoenixHBaseAccessor accessor =
+        new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider);
+    accessor.saveClusterAggregateRecords(clusterAggregateMap);
+    accessor.saveHostAggregateRecords(hostAggregateMap,
+        PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+    accessor.saveClusterTimeAggregateRecords(clusterTimeAggregateMap,
+        PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+
+    TimelineMetricsAggregatorMemorySink memorySink =
+        new TimelineMetricsAggregatorMemorySink();
+    assertEquals(1, memorySink.getClusterAggregateRecords().size());
+    assertEquals(1, memorySink.getClusterTimeAggregateRecords().size());
+    assertEquals(1, memorySink.getHostAggregateRecords().size());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/b3ec9aab/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
new file mode 100644
index 0000000..fa0cfe9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Aggregator Memory sink implementation to perform test
+ */
+public class TimelineMetricsAggregatorMemorySink
+    implements TimelineMetricsAggregatorSink {
+
+  private static Map<Precision, Map<TimelineMetric, MetricHostAggregate>> hostAggregateRecords =
+      new HashMap<>();
+  private static Map<Precision, Map<TimelineClusterMetric, MetricHostAggregate>> clusterTimeAggregateRecords =
+      new HashMap<>();
+  private static Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateRecords =
+      new HashMap<>();
+
+  @Override
+  public void saveHostAggregateRecords(
+      Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
+      Precision precision) {
+    if (hostAggregateMap == null || hostAggregateMap.size() == 0) {
+      return;
+    }
+
+    Map<TimelineMetric, MetricHostAggregate> aggregatedValue = null;
+    if (hostAggregateRecords.containsKey(precision)) {
+      aggregatedValue = hostAggregateRecords.get(precision);
+    } else {
+      aggregatedValue = new HashMap<>();
+      hostAggregateRecords.put(precision, aggregatedValue);
+    }
+
+    for (Entry<TimelineMetric, MetricHostAggregate> entry : hostAggregateMap
+        .entrySet()) {
+      TimelineMetric timelineMetricClone = new TimelineMetric(entry.getKey());
+      MetricHostAggregate hostAggregate = entry.getValue();
+      MetricHostAggregate hostAggregateClone = new MetricHostAggregate(
+          hostAggregate.getSum(), (int) hostAggregate.getNumberOfSamples(),
+          hostAggregate.getDeviation(), hostAggregate.getMax(),
+          hostAggregate.getMin());
+      aggregatedValue.put(timelineMetricClone, hostAggregateClone);
+    }
+  }
+
+  @Override
+  public void saveClusterTimeAggregateRecords(
+      Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap,
+      Precision precision) {
+    if (clusterTimeAggregateMap == null
+        || clusterTimeAggregateMap.size() == 0) {
+      return;
+    }
+
+    Map<TimelineClusterMetric, MetricHostAggregate> aggregatedValue = null;
+    if (clusterTimeAggregateRecords.containsKey(precision)) {
+      aggregatedValue = clusterTimeAggregateRecords.get(precision);
+    } else {
+      aggregatedValue = new HashMap<>();
+      clusterTimeAggregateRecords.put(precision, aggregatedValue);
+    }
+
+    for (Entry<TimelineClusterMetric, MetricHostAggregate> entry : clusterTimeAggregateMap
+        .entrySet()) {
+      TimelineClusterMetric clusterMetric = entry.getKey();
+      TimelineClusterMetric clusterMetricClone =
+          new TimelineClusterMetric(clusterMetric.getMetricName(),
+              clusterMetric.getAppId(), clusterMetric.getInstanceId(),
+              clusterMetric.getTimestamp(), clusterMetric.getType());
+      MetricHostAggregate hostAggregate = entry.getValue();
+      MetricHostAggregate hostAggregateClone = new MetricHostAggregate(
+          hostAggregate.getSum(), (int) hostAggregate.getNumberOfSamples(),
+          hostAggregate.getDeviation(), hostAggregate.getMax(),
+          hostAggregate.getMin());
+      aggregatedValue.put(clusterMetricClone, hostAggregateClone);
+    }
+  }
+
+  @Override
+  public void saveClusterAggregateRecords(
+      Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMaps) {
+
+    if (clusterAggregateMaps == null || clusterAggregateMaps.size() == 0) {
+      return;
+    }
+
+    for (Entry<TimelineClusterMetric, MetricClusterAggregate> entry : clusterAggregateMaps
+        .entrySet()) {
+      TimelineClusterMetric clusterMetric = entry.getKey();
+      TimelineClusterMetric clusterMetricClone =
+          new TimelineClusterMetric(clusterMetric.getMetricName(),
+              clusterMetric.getAppId(), clusterMetric.getInstanceId(),
+              clusterMetric.getTimestamp(), clusterMetric.getType());
+      MetricClusterAggregate clusterAggregate = entry.getValue();
+      MetricClusterAggregate clusterAggregateClone = new MetricClusterAggregate(
+          clusterAggregate.getSum(), (int) clusterAggregate.getNumberOfHosts(),
+          clusterAggregate.getDeviation(), clusterAggregate.getMax(),
+          clusterAggregate.getMin());
+      clusterAggregateRecords.put(clusterMetricClone, clusterAggregateClone);
+    }
+  }
+
+  public Map<Precision, Map<TimelineMetric, MetricHostAggregate>> getHostAggregateRecords() {
+    return Collections.unmodifiableMap(hostAggregateRecords);
+  }
+
+  public Map<Precision, Map<TimelineClusterMetric, MetricHostAggregate>> getClusterTimeAggregateRecords() {
+    return Collections.unmodifiableMap(clusterTimeAggregateRecords);
+  }
+
+  public Map<TimelineClusterMetric, MetricClusterAggregate> getClusterAggregateRecords() {
+    return Collections.unmodifiableMap(clusterAggregateRecords);
+  }
+
+}