You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/04/01 19:14:08 UTC

[ambari] 12/39: AMBARI-21458 Provide ability to shard Cluster second aggregation across appId. (dsen)

This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit 54e4a978240100cd760b993eb9569a1e9b314ab8
Author: Dmytro Sen <ds...@apache.org>
AuthorDate: Thu Aug 31 13:00:14 2017 +0300

    AMBARI-21458 Provide ability to shard Cluster second aggregation across appId. (dsen)
---
 .../availability/MetricCollectorHAHelper.java      |   8 +-
 .../ambari-metrics-timelineservice/pom.xml         |  10 +
 .../timeline/HBaseTimelineMetricsService.java      |  32 ++-
 .../metrics/timeline/PhoenixHBaseAccessor.java     |   2 +-
 .../timeline/TimelineMetricConfiguration.java      |  76 +++++
 ...ta.java => TimelineMetricDistributedCache.java} |  47 +---
 .../timeline/TimelineMetricsIgniteCache.java       | 305 +++++++++++++++++++++
 .../aggregators/AbstractTimelineAggregator.java    |  11 +-
 .../timeline/aggregators/AggregatorUtils.java      | 192 +++++++++++++
 .../TimelineMetricAggregatorFactory.java           |  30 +-
 .../aggregators/TimelineMetricAppAggregator.java   |  15 +-
 .../TimelineMetricClusterAggregatorSecond.java     | 231 ++--------------
 ...tricClusterAggregatorSecondWithCacheSource.java | 132 +++++++++
 .../availability/MetricCollectorHAController.java  |  19 +-
 .../discovery/TimelineMetricHostMetadata.java      |  19 +-
 .../discovery/TimelineMetricMetadataManager.java   |  20 +-
 .../discovery/TimelineMetricMetadataSync.java      |   2 +-
 .../timeline/uuid/HashBasedUuidGenStrategy.java    |   4 +
 .../metrics/timeline/ITPhoenixHBaseAccessor.java   |   4 +-
 .../timeline/TimelineMetricsIgniteCacheTest.java   | 296 ++++++++++++++++++++
 .../AbstractTimelineAggregatorTest.java            |  12 +-
 .../timeline/aggregators/ITClusterAggregator.java  |  10 +-
 .../TimelineMetricClusterAggregatorSecondTest.java |  32 +--
 ...ClusterAggregatorSecondWithCacheSourceTest.java | 178 ++++++++++++
 .../MetricCollectorHAControllerTest.java           |   1 +
 .../timeline/discovery/TestMetadataManager.java    |   8 +-
 ambari-metrics/pom.xml                             |   4 +-
 27 files changed, 1364 insertions(+), 336 deletions(-)

diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
index c6f6beb..3071cbc 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
@@ -38,7 +38,7 @@ import java.util.concurrent.Callable;
  * does not add a watcher on the znode.
  */
 public class MetricCollectorHAHelper {
-  private final String zookeeperQuorum;
+  private final String zookeeperConnectionURL;
   private final int tryCount;
   private final int sleepMsBetweenRetries;
 
@@ -52,8 +52,8 @@ public class MetricCollectorHAHelper {
 
   private static final Log LOG = LogFactory.getLog(MetricCollectorHAHelper.class);
 
-  public MetricCollectorHAHelper(String zookeeperQuorum, int tryCount, int sleepMsBetweenRetries) {
-    this.zookeeperQuorum = zookeeperQuorum;
+  public MetricCollectorHAHelper(String zookeeperConnectionURL, int tryCount, int sleepMsBetweenRetries) {
+    this.zookeeperConnectionURL = zookeeperConnectionURL;
     this.tryCount = tryCount;
     this.sleepMsBetweenRetries = sleepMsBetweenRetries;
   }
@@ -66,7 +66,7 @@ public class MetricCollectorHAHelper {
     Set<String> collectors = new HashSet<>();
 
     RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(sleepMsBetweenRetries, 10*sleepMsBetweenRetries, tryCount);
-    final CuratorZookeeperClient client = new CuratorZookeeperClient(zookeeperQuorum,
+    final CuratorZookeeperClient client = new CuratorZookeeperClient(zookeeperConnectionURL,
       SESSION_TIMEOUT, CONNECTION_TIMEOUT, null, retryPolicy);
 
     List<String> liveInstances = null;
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index f3e0041..d306ad3 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -703,6 +703,16 @@
       <version>1.0.0.0-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-core</artifactId>
+      <version>2.1.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-log4j</artifactId>
+      <version>2.1.0</version>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index 4318fd3..110b094 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -64,13 +64,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
 
@@ -78,6 +78,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
 
   static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class);
   private final TimelineMetricConfiguration configuration;
+  private TimelineMetricDistributedCache cache;
   private PhoenixHBaseAccessor hBaseAccessor;
   private static volatile boolean isInitialized = false;
   private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
@@ -103,6 +104,12 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
     initializeSubsystem();
   }
 
+  private TimelineMetricDistributedCache startCacheNode() throws MalformedURLException, URISyntaxException {
+    //TODO make configurable
+    return new TimelineMetricsIgniteCache();
+  }
+
+
   private synchronized void initializeSubsystem() {
     if (!isInitialized) {
       hBaseAccessor = new PhoenixHBaseAccessor(null);
@@ -142,6 +149,15 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
         throw new ExceptionInInitializerError("Cannot initialize configuration.");
       }
 
+      if (configuration.isCollectorInMemoryAggregationEnabled()) {
+        try {
+          cache = startCacheNode();
+        } catch (Exception e) {
+          throw new MetricsSystemInitializationException("Unable to " +
+              "start cache node", e);
+        }
+      }
+
       defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
       if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
         LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
@@ -150,7 +166,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
       // Start the cluster aggregator second
       TimelineMetricAggregator secondClusterAggregator =
         TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
-          hBaseAccessor, metricsConf, metricMetadataManager, haController);
+          hBaseAccessor, metricsConf, metricMetadataManager, haController, cache);
       scheduleAggregatorThread(secondClusterAggregator);
 
       // Start the minute cluster aggregator
@@ -172,7 +188,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
       scheduleAggregatorThread(dailyClusterAggregator);
 
       // Start the minute host aggregator
-      if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) {
+      if (configuration.isHostInMemoryAggregationEnabled()) {
         LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, switching to filtering host minute aggregation on collector");
         TimelineMetricAggregator minuteHostAggregator =
           TimelineMetricAggregatorFactory.createFilteringTimelineMetricAggregatorMinute(
@@ -383,6 +399,10 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
 
     hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
 
+    if (configuration.isCollectorInMemoryAggregationEnabled()) {
+      cache.putMetrics(metrics.getMetrics(), metricMetadataManager);
+    }
+
     return response;
   }
 
@@ -460,7 +480,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
     Map<String, TimelineMetricHostMetadata> hostsMetadata = metricMetadataManager.getHostedAppsCache();
     Map<String, Set<String>> hostAppMap = new HashMap<>();
     for (String hostname : hostsMetadata.keySet()) {
-      hostAppMap.put(hostname, hostsMetadata.get(hostname).getHostedApps());
+      hostAppMap.put(hostname, hostsMetadata.get(hostname).getHostedApps().keySet());
     }
     return hostAppMap;
   }
@@ -500,7 +520,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
     if (MapUtils.isEmpty(instanceHosts)) {
       Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
       for (String host : hostedApps.keySet()) {
-        for (String app : hostedApps.get(host).getHostedApps()) {
+        for (String app : hostedApps.get(host).getHostedApps().keySet()) {
           if (!appHostMap.containsKey(app)) {
             appHostMap.put(app, new HashSet<String>());
           }
@@ -519,7 +539,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
 
         Set<String> hostsWithInstance = instanceHosts.get(instance);
         for (String host : hostsWithInstance) {
-          for (String app : hostedApps.get(host).getHostedApps()) {
+          for (String app : hostedApps.get(host).getHostedApps().keySet()) {
             if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) {
               continue;
             }
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index d207775..da14fd1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -1570,7 +1570,7 @@ public class PhoenixHBaseAccessor {
         stmt.clearParameters();
         stmt.setString(1, hostedAppsEntry.getKey());
         stmt.setBytes(2, timelineMetricHostMetadata.getUuid());
-        stmt.setString(3, StringUtils.join(timelineMetricHostMetadata.getHostedApps(), ","));
+        stmt.setString(3, StringUtils.join(timelineMetricHostMetadata.getHostedApps().keySet(), ","));
         try {
           stmt.executeUpdate();
           rowCount++;
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 6083859..258e9c6 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -26,8 +26,10 @@ import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
@@ -57,6 +59,7 @@ public class TimelineMetricConfiguration {
   public static final String HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml";
   public static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
   public static final String METRICS_ENV_CONFIGURATION_FILE = "ams-env.xml";
+  public static final String METRICS_SSL_SERVER_CONFIGURATION_FILE = "ssl-server.xml";
 
   public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
     "timeline.metrics.aggregator.checkpoint.dir";
@@ -118,6 +121,9 @@ public class TimelineMetricConfiguration {
   public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL =
     "timeline.metrics.cluster.aggregator.second.timeslice.interval";
 
+  public static final String CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL =
+    "timeline.metrics.cluster.cache.aggregator.second.timeslice.interval";
+
   public static final String AGGREGATOR_CHECKPOINT_DELAY =
     "timeline.metrics.service.checkpointDelay";
 
@@ -262,6 +268,9 @@ public class TimelineMetricConfiguration {
   public static final String TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED =
     "timeline.metrics.cluster.aggregator.interpolation.enabled";
 
+  public static final String TIMELINE_METRICS_SINK_COLLECTION_PERIOD =
+    "timeline.metrics.sink.collection.period";
+
   public static final String TIMELINE_METRICS_PRECISION_TABLE_DURABILITY =
     "timeline.metrics.precision.table.durability";
 
@@ -331,6 +340,13 @@ public class TimelineMetricConfiguration {
   public static final String AMSHBASE_METRICS_WHITESLIST_FILE = "amshbase_metrics_whitelist";
 
   public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = "timeline.metrics.host.inmemory.aggregation";
+
+  public static final String TIMELINE_METRICS_COLLECTOR_INMEMORY_AGGREGATION = "timeline.metrics.collector.inmemory.aggregation";
+
+  public static final String TIMELINE_METRICS_COLLECTOR_IGNITE_NODES = "timeline.metrics.collector.ignite.nodes.list";
+
+  public static final String TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS = "timeline.metrics.collector.ignite.nodes.backups";
+
   public static final String INTERNAL_CACHE_HEAP_PERCENT =
     "timeline.metrics.service.cache.%s.heap.percent";
 
@@ -342,6 +358,7 @@ public class TimelineMetricConfiguration {
 
   private Configuration hbaseConf;
   private Configuration metricsConf;
+  private Configuration metricsSslConf;
   private Configuration amsEnvConf;
   private volatile boolean isInitialized = false;
 
@@ -386,6 +403,17 @@ public class TimelineMetricConfiguration {
       metricsConf = new Configuration(true);
       metricsConf.addResource(amsResUrl.toURI().toURL());
 
+      if (metricsConf.get("timeline.metrics.service.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTPS_ONLY")) {
+        URL amsSllResUrl = classLoader.getResource(METRICS_SSL_SERVER_CONFIGURATION_FILE);
+        LOG.info("Found metric ssl service configuration: " + amsResUrl);
+        if (amsSllResUrl == null) {
+          throw new IllegalStateException("Unable to initialize the metrics " +
+            "subsystem. No ams-ssl-server present in the classpath.");
+        }
+        metricsSslConf = new Configuration(true);
+        metricsSslConf.addResource(amsSllResUrl.toURI().toURL());
+      }
+
       isInitialized = true;
     }
   }
@@ -404,6 +432,13 @@ public class TimelineMetricConfiguration {
     return metricsConf;
   }
 
+  public Configuration getMetricsSslConf() throws URISyntaxException, MalformedURLException {
+    if (!isInitialized) {
+      initialize();
+    }
+    return metricsSslConf;
+  }
+
   public String getZKClientPort() throws MalformedURLException, URISyntaxException {
     return getHbaseConf().getTrimmed("hbase.zookeeper.property.clientPort", "2181");
   }
@@ -609,4 +644,45 @@ public class TimelineMetricConfiguration {
 
     return dirPath;
   }
+
+  public boolean isHostInMemoryAggregationEnabled() {
+    if (metricsConf != null) {
+      return Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "false"));
+    } else {
+      return false;
+    }
+  }
+
+  public boolean isCollectorInMemoryAggregationEnabled() {
+    if (metricsConf != null) {
+      return Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_COLLECTOR_INMEMORY_AGGREGATION, "false"));
+    } else {
+      return false;
+    }
+  }
+
+  public List<String> getAppIdsForHostAggregation() {
+    String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS);
+    if (!StringUtils.isEmpty(appIds)) {
+      return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
+    }
+    return Collections.emptyList();
+  }
+
+  public String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
+    StringBuilder sb = new StringBuilder();
+    String[] quorumParts = zkQuorum.split(",");
+    String prefix = "";
+    for (String part : quorumParts) {
+      sb.append(prefix);
+      sb.append(part.trim());
+      if (!part.contains(":")) {
+        sb.append(":");
+        sb.append(zkClientPort);
+      }
+      prefix = ",";
+    }
+
+    return sb.toString();
+  }
 }
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricDistributedCache.java
similarity index 50%
copy from ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java
copy to ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricDistributedCache.java
index 06e9279..3480545 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricDistributedCache.java
@@ -6,46 +6,27 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Collection;
+import java.util.Map;
 
-public class TimelineMetricHostMetadata {
-  private Set<String> hostedApps = new HashSet<>();
-  private byte[] uuid;
-
-  // Default constructor
-  public TimelineMetricHostMetadata() {
-  }
-
-  public TimelineMetricHostMetadata(Set<String> hostedApps) {
-    this.hostedApps = hostedApps;
-  }
-
-  public Set<String> getHostedApps() {
-    return hostedApps;
-  }
-
-  public void setHostedApps(Set<String> hostedApps) {
-    this.hostedApps = hostedApps;
-  }
-
-  public byte[] getUuid() {
-    return uuid;
-  }
-
-  public void setUuid(byte[] uuid) {
-    this.uuid = uuid;
-  }
+public interface TimelineMetricDistributedCache {
+  Map<TimelineClusterMetric, MetricClusterAggregate> evictMetricAggregates(Long startTime, Long endTime);
+  void putMetrics(Collection<TimelineMetric> elements, TimelineMetricMetadataManager metricMetadataManager);
+  Map<String, Double> getPointInTimeCacheMetrics();
 }
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java
new file mode 100644
index 0000000..aeaa4ba
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.ssl.SslContextFactory;
+
+import javax.cache.Cache;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_SINK_COLLECTION_PERIOD;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
+
+public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCache {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineMetricsIgniteCache.class);
+  private IgniteCache<TimelineClusterMetric, MetricClusterAggregate> igniteCache;
+  private long cacheSliceIntervalMillis;
+  private int collectionPeriodMillis;
+  private boolean interpolationEnabled;
+  private List<String> skipAggrPatternStrings = new ArrayList<>();
+  private List<String> appIdsToAggregate;
+
+
+  public TimelineMetricsIgniteCache() throws MalformedURLException, URISyntaxException {
+    TimelineMetricConfiguration timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
+    Configuration metricConf = timelineMetricConfiguration.getMetricsConf();
+    Configuration sslConf = timelineMetricConfiguration.getMetricsSslConf();
+
+    IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
+
+    //TODO add config to disable logging
+
+    //enable ssl for ignite requests
+    if (metricConf.get(TIMELINE_SERVICE_HTTP_POLICY) != null && metricConf.get(TIMELINE_SERVICE_HTTP_POLICY).equalsIgnoreCase("HTTPS_ONLY")) {
+      SslContextFactory sslContextFactory = new SslContextFactory();
+      String keyStorePath = sslConf.get("ssl.server.keystore.location");
+      String keyStorePassword = sslConf.get("ssl.server.keystore.password");
+      String trustStorePath = sslConf.get("ssl.server.truststore.location");
+      String trustStorePassword = sslConf.get("ssl.server.truststore.password");
+
+      sslContextFactory.setKeyStoreFilePath(keyStorePath);
+      sslContextFactory.setKeyStorePassword(keyStorePassword.toCharArray());
+      sslContextFactory.setTrustStoreFilePath(trustStorePath);
+      sslContextFactory.setTrustStorePassword(trustStorePassword.toCharArray());
+      igniteConfiguration.setSslContextFactory(sslContextFactory);
+    }
+
+    //aggregation parameters
+    appIdsToAggregate = timelineMetricConfiguration.getAppIdsForHostAggregation();
+    interpolationEnabled = Boolean.parseBoolean(metricConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
+    collectionPeriodMillis = (int) SECONDS.toMillis(metricConf.getInt(TIMELINE_METRICS_SINK_COLLECTION_PERIOD, 10));
+    cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL, 30));
+    Long aggregationInterval = metricConf.getLong(CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L);
+
+    String filteredMetricPatterns = metricConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
+    if (!StringUtils.isEmpty(filteredMetricPatterns)) {
+      LOG.info("Skipping aggregation for metric patterns : " + filteredMetricPatterns);
+      skipAggrPatternStrings.addAll(Arrays.asList(filteredMetricPatterns.split(",")));
+    }
+
+    if (metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES) != null) {
+      TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
+      TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+      ipFinder.setAddresses(Arrays.asList(metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES).split(",")));
+      LOG.info("Setting ignite nodes to : " + ipFinder.getRegisteredAddresses());
+      discoverySpi.setIpFinder(ipFinder);
+      igniteConfiguration.setDiscoverySpi(discoverySpi);
+    } else {
+      //get live nodes from ZK
+      String zkClientPort = timelineMetricConfiguration.getClusterZKClientPort();
+      String zkQuorum = timelineMetricConfiguration.getClusterZKQuorum();
+      String zkConnectionURL = timelineMetricConfiguration.getZkConnectionUrl(zkClientPort, zkQuorum);
+      MetricCollectorHAHelper metricCollectorHAHelper = new MetricCollectorHAHelper(zkConnectionURL, 5, 200);
+      Collection<String> liveCollectors = metricCollectorHAHelper.findLiveCollectorHostsFromZNode();
+      if (liveCollectors != null && !liveCollectors.isEmpty()) {
+        TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
+        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+        ipFinder.setAddresses(liveCollectors);
+        LOG.info("Setting ignite nodes to : " + ipFinder.getRegisteredAddresses());
+        discoverySpi.setIpFinder(ipFinder);
+        igniteConfiguration.setDiscoverySpi(discoverySpi);
+      }
+    }
+
+
+    //ignite cache configuration
+    CacheConfiguration<TimelineClusterMetric, MetricClusterAggregate> cacheConfiguration = new CacheConfiguration<>();
+    cacheConfiguration.setName("metrics_cache");
+    //set cache mode to partitioned with # of backups
+    cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
+    cacheConfiguration.setBackups(metricConf.getInt(TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS, 1));
+    //disable throttling due to cpu impact
+    cacheConfiguration.setRebalanceThrottle(0);
+    //enable locks
+    cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+    //expiry policy to remove lost keys, if any
+    cacheConfiguration.setEagerTtl(true);
+    cacheConfiguration.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, aggregationInterval * 3)));
+
+    Ignite igniteNode = Ignition.start(igniteConfiguration);
+    igniteCache = igniteNode.getOrCreateCache(cacheConfiguration);
+  }
+
+  /**
+   * Looks through the cache and evicts all elements within (startTime; endTime] half-interval
+   * All elements satisfying the half-interval will be removed from the cache.
+   * @param startTime
+   * @param endTime
+   * @return
+   */
+  @Override
+  public Map<TimelineClusterMetric, MetricClusterAggregate> evictMetricAggregates(Long startTime, Long endTime) {
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregatedMetricsMap = new HashMap<>();
+
+    //construct filter
+    IgniteBiPredicate<TimelineClusterMetric, MetricClusterAggregate> filter =
+        (IgniteBiPredicate<TimelineClusterMetric, MetricClusterAggregate>) (key, value) -> key.getTimestamp() > startTime && key.getTimestamp() <= endTime;
+
+    //get values from cache
+    try (QueryCursor<Cache.Entry<TimelineClusterMetric, MetricClusterAggregate>> cursor = igniteCache.query(new ScanQuery(filter))) {
+      for (Cache.Entry<TimelineClusterMetric, MetricClusterAggregate> e : cursor) {
+        aggregatedMetricsMap.put(e.getKey(), e.getValue());
+      }
+    }
+
+    //remove values from cache
+    igniteCache.removeAllAsync(aggregatedMetricsMap.keySet());
+
+    return aggregatedMetricsMap;
+  }
+
+  /**
+   * Iterates through elements skipping white-listed patterns;
+   * calculates average value for each slice of each metric (last slice values could be ignored in there is the possibility that values from this slice could be present in next post);
+   * updates/adds the value in the cache;
+   * calculates applications host metrics based on the metadata of hosted apps
+   * updates metadata of hosted apps if needed
+   * @param elements
+   * @param metadataManager
+   */
+  @Override
+  public void putMetrics(Collection<TimelineMetric> elements, TimelineMetricMetadataManager metadataManager) {
+    Map<String, TimelineMetricHostMetadata> hostMetadata = metadataManager.getHostedAppsCache();
+    for (TimelineMetric metric : elements) {
+      if (shouldBeSkipped(metric.getMetricName())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Skipping %s metric from being aggregated", metric.getMetricName()));
+        }
+        continue;
+      }
+      List<Long[]> timeSlices = getTimeSlices(getRoundedCheckPointTimeMillis(metric.getMetricValues().firstKey(), cacheSliceIntervalMillis), metric.getMetricValues().lastKey(), cacheSliceIntervalMillis);
+      Map<TimelineClusterMetric, Double> slicedClusterMetrics = sliceFromTimelineMetric(metric, timeSlices, interpolationEnabled);
+
+      if (slicedClusterMetrics != null) {
+        for (Map.Entry<TimelineClusterMetric, Double> metricDoubleEntry : slicedClusterMetrics.entrySet()) {
+          if (metricDoubleEntry.getKey().getTimestamp() == timeSlices.get(timeSlices.size()-1)[1] && metricDoubleEntry.getKey().getTimestamp() - metric.getMetricValues().lastKey() > collectionPeriodMillis) {
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("Last skipped timestamp @ " + new Date(metric.getMetricValues().lastKey()) + " slice timestamp @ " + new Date(metricDoubleEntry.getKey().getTimestamp()));
+            }
+            continue;
+          }
+          MetricClusterAggregate newMetricClusterAggregate  = new MetricClusterAggregate(
+              metricDoubleEntry.getValue(), 1, null, metricDoubleEntry.getValue(), metricDoubleEntry.getValue());
+          //put app metric into cache
+          putMetricIntoCache(metricDoubleEntry.getKey(), newMetricClusterAggregate);
+          if (hostMetadata != null) {
+            //calculate app host metric
+            if (metric.getAppId().equalsIgnoreCase(HOST_APP_ID)) {
+              // Candidate metric, update app aggregates
+              if (hostMetadata.containsKey(metric.getHostName())) {
+                updateAppAggregatesFromHostMetric(metricDoubleEntry.getKey(), newMetricClusterAggregate, hostMetadata.get(metric.getHostName()));
+              }
+            } else {
+              // Build the hostedapps map if not a host metric
+              // Check app candidacy for host aggregation
+              //TODO better to lock TimelineMetricHostMetadata instance to avoid dataloss, but generally the data could be lost only during initial collector start
+              if (appIdsToAggregate.contains(metric.getAppId())) {
+                TimelineMetricHostMetadata timelineMetricHostMetadata = hostMetadata.get(metric.getHostName());
+                ConcurrentHashMap<String, String> appIdsMap;
+                if (timelineMetricHostMetadata == null) {
+                  appIdsMap = new ConcurrentHashMap<>();
+                  hostMetadata.put(metric.getHostName(), new TimelineMetricHostMetadata(appIdsMap));
+                } else {
+                  appIdsMap = timelineMetricHostMetadata.getHostedApps();
+                }
+                if (!appIdsMap.containsKey(metric.getAppId())) {
+                  appIdsMap.put(metric.getAppId(), metric.getAppId());
+                  LOG.info("Adding appId to hosted apps: appId = " +
+                      metric.getAppId() + ", hostname = " + metric.getHostName());
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void updateAppAggregatesFromHostMetric(TimelineClusterMetric key, MetricClusterAggregate newMetricClusterAggregate, TimelineMetricHostMetadata timelineMetricHostMetadata) {
+    for (String appId : timelineMetricHostMetadata.getHostedApps().keySet()) {
+      TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(key.getMetricName(), appId, key.getInstanceId(), key.getTimestamp());
+      putMetricIntoCache(timelineClusterMetric, newMetricClusterAggregate);
+    }
+  }
+
+  private void putMetricIntoCache(TimelineClusterMetric metricKey, MetricClusterAggregate metricValue) {
+    Lock lock = igniteCache.lock(metricKey);
+    lock.lock();
+    try {
+      MetricClusterAggregate metricClusterAggregateFromCache = igniteCache.get(metricKey);
+      if (metricClusterAggregateFromCache == null) {
+        igniteCache.put(metricKey, metricValue);
+      } else {
+        metricClusterAggregateFromCache.updateAggregates(metricValue);
+        igniteCache.put(metricKey, metricClusterAggregateFromCache);
+      }
+    } catch (Exception e) {
+      LOG.error("Exception : ", e);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public Map<String, Double> getPointInTimeCacheMetrics() {
+    CacheMetrics clusterIgniteMetrics = igniteCache.metrics();
+    Map<String, Double> metricsMap = new HashMap<>();
+    metricsMap.put("Cluster_AverageGetTime", (double) clusterIgniteMetrics.getAverageGetTime());
+    metricsMap.put("Cluster_AveragePutTime", (double) clusterIgniteMetrics.getAveragePutTime());
+    metricsMap.put("Cluster_KeySize", (double) clusterIgniteMetrics.getKeySize());
+    metricsMap.put("Cluster_OffHeapAllocatedSize", (double) clusterIgniteMetrics.getOffHeapAllocatedSize());
+    return metricsMap;
+  }
+
+  private boolean shouldBeSkipped(String metricName) {
+    for (String pattern : skipAggrPatternStrings) {
+      if (metricName.matches(pattern)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index b2edb73..89428c0 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -43,6 +43,8 @@ import java.util.List;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
 
 /**
@@ -389,15 +391,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     return checkpointLocation;
   }
 
-  public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) {
-    return referenceTime - (referenceTime % aggregatorPeriod);
-  }
-
-  public static long getRoundedAggregateTimeMillis(long aggregatorPeriod) {
-    long currentTime = System.currentTimeMillis();
-    return currentTime - (currentTime % aggregatorPeriod);
-  }
-
   /**
    * Run 1 downsampler query.
    * @param conn
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
index 20f72c6..b12cb86 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
@@ -18,9 +18,19 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 
 /**
  *
@@ -59,4 +69,186 @@ public class AggregatorUtils {
 
     return values;
   }
+
+  public static Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
+      TimelineMetric timelineMetric, List<Long[]> timeSlices, boolean interpolationEnabled) {
+
+    if (timelineMetric.getMetricValues().isEmpty()) {
+      return null;
+    }
+
+    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
+        new HashMap<>();
+
+    Long prevTimestamp = -1l;
+    TimelineClusterMetric prevMetric = null;
+    int count = 0;
+    double sum = 0.0;
+
+    Map<Long,Double> timeSliceValueMap = new HashMap<>();
+    for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
+      if (metric.getValue() == null) {
+        continue;
+      }
+
+      Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString()));
+      if (timestamp != -1) {
+        // Metric is within desired time range
+        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
+            timelineMetric.getMetricName(),
+            timelineMetric.getAppId(),
+            timelineMetric.getInstanceId(),
+            timestamp);
+
+        if (prevTimestamp < 0 || timestamp.equals(prevTimestamp)) {
+          Double newValue = metric.getValue();
+          if (newValue > 0.0) {
+            sum += newValue;
+            count++;
+          }
+        } else {
+          double metricValue = (count > 0) ? (sum / count) : 0.0;
+          timelineClusterMetricMap.put(prevMetric, metricValue);
+          timeSliceValueMap.put(prevMetric.getTimestamp(), metricValue);
+          sum = metric.getValue();
+          count = sum > 0.0 ? 1 : 0;
+        }
+
+        prevTimestamp = timestamp;
+        prevMetric = clusterMetric;
+      }
+    }
+
+    if (prevTimestamp > 0) {
+      double metricValue = (count > 0) ? (sum / count) : 0.0;
+      timelineClusterMetricMap.put(prevMetric, metricValue);
+      timeSliceValueMap.put(prevTimestamp, metricValue);
+    }
+
+    if (interpolationEnabled) {
+      Map<Long, Double> interpolatedValues = interpolateMissingPeriods(timelineMetric.getMetricValues(), timeSlices, timeSliceValueMap, timelineMetric.getType());
+      for (Map.Entry<Long, Double> entry : interpolatedValues.entrySet()) {
+        TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), entry.getKey());
+        timelineClusterMetricMap.putIfAbsent(timelineClusterMetric, entry.getValue());
+      }
+    }
+
+    return timelineClusterMetricMap;
+  }
+
+  private static Map<Long, Double> interpolateMissingPeriods(TreeMap<Long, Double> metricValues,
+                                               List<Long[]> timeSlices,
+                                               Map<Long, Double> timeSliceValueMap, String type) {
+    Map<Long, Double> resultClusterMetricMap = new HashMap<>();
+
+    if (StringUtils.isNotEmpty(type) && "COUNTER".equalsIgnoreCase(type)) {
+      //For Counter Based metrics, ok to do interpolation and extrapolation
+
+      List<Long> requiredTimestamps = new ArrayList<>();
+      for (Long[] timeSlice : timeSlices) {
+        if (!timeSliceValueMap.containsKey(timeSlice[1])) {
+          requiredTimestamps.add(timeSlice[1]);
+        }
+      }
+      Map<Long, Double> interpolatedValuesMap = PostProcessingUtil.interpolate(metricValues, requiredTimestamps);
+
+      if (interpolatedValuesMap != null) {
+        for (Map.Entry<Long, Double> entry : interpolatedValuesMap.entrySet()) {
+          Double interpolatedValue = entry.getValue();
+
+          if (interpolatedValue != null) {
+            resultClusterMetricMap.put( entry.getKey(), interpolatedValue);
+          } else {
+            LOG.debug("Cannot compute interpolated value, hence skipping.");
+          }
+        }
+      }
+    } else {
+      //For other metrics, ok to do only interpolation
+
+      Double defaultNextSeenValue = null;
+      if (MapUtils.isEmpty(timeSliceValueMap) && MapUtils.isNotEmpty(metricValues)) {
+        //If no value was found within the start_time based slices, but the metric has value in the server_time range,
+        // use that.
+
+        Map.Entry<Long,Double> firstEntry  = metricValues.firstEntry();
+        defaultNextSeenValue = firstEntry.getValue();
+        LOG.debug("Found a data point outside timeslice range: " + new Date(firstEntry.getKey()) + ": " + defaultNextSeenValue);
+      }
+
+      for (int sliceNum = 0; sliceNum < timeSlices.size(); sliceNum++) {
+        Long[] timeSlice = timeSlices.get(sliceNum);
+
+        if (!timeSliceValueMap.containsKey(timeSlice[1])) {
+          LOG.debug("Found an empty slice : " + new Date(timeSlice[0]) + ", " + new Date(timeSlice[1]));
+
+          Double lastSeenValue = null;
+          int index = sliceNum - 1;
+          Long[] prevTimeSlice = null;
+          while (lastSeenValue == null && index >= 0) {
+            prevTimeSlice = timeSlices.get(index--);
+            lastSeenValue = timeSliceValueMap.get(prevTimeSlice[1]);
+          }
+
+          Double nextSeenValue = null;
+          index = sliceNum + 1;
+          Long[] nextTimeSlice = null;
+          while (nextSeenValue == null && index < timeSlices.size()) {
+            nextTimeSlice = timeSlices.get(index++);
+            nextSeenValue = timeSliceValueMap.get(nextTimeSlice[1]);
+          }
+
+          if (nextSeenValue == null) {
+            nextSeenValue = defaultNextSeenValue;
+          }
+
+          Double interpolatedValue = PostProcessingUtil.interpolate(timeSlice[1],
+              (prevTimeSlice != null ? prevTimeSlice[1] : null), lastSeenValue,
+              (nextTimeSlice != null ? nextTimeSlice[1] : null), nextSeenValue);
+
+          if (interpolatedValue != null) {
+            LOG.debug("Interpolated value : " + interpolatedValue);
+            resultClusterMetricMap.put(timeSlice[1], interpolatedValue);
+          } else {
+            LOG.debug("Cannot compute interpolated value, hence skipping.");
+          }
+        }
+      }
+    }
+    return resultClusterMetricMap;
+  }
+
+  /**
+   * Return end of the time slice into which the metric fits.
+   */
+  public static Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
+    for (Long[] timeSlice : timeSlices) {
+      if (timestamp > timeSlice[0] && timestamp <= timeSlice[1]) {
+        return timeSlice[1];
+      }
+    }
+    return -1l;
+  }
+
+  /**
+   * Return time slices to normalize the timeseries data.
+   */
+  public static  List<Long[]> getTimeSlices(long startTime, long endTime, long timeSliceIntervalMillis) {
+    List<Long[]> timeSlices = new ArrayList<Long[]>();
+    long sliceStartTime = startTime;
+    while (sliceStartTime < endTime) {
+      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis });
+      sliceStartTime += timeSliceIntervalMillis;
+    }
+    return timeSlices;
+  }
+
+  public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) {
+    return referenceTime - (referenceTime % aggregatorPeriod);
+  }
+
+  public static long getRoundedAggregateTimeMillis(long aggregatorPeriod) {
+    long currentTime = System.currentTimeMillis();
+    return currentTime - (currentTime % aggregatorPeriod);
+  }
 }
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index e90fa84..c27d712 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricDistributedCache;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 
@@ -39,6 +41,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED;
@@ -255,7 +258,8 @@ public class TimelineMetricAggregatorFactory {
   public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
     TimelineMetricMetadataManager metadataManager,
-    MetricCollectorHAController haController) {
+    MetricCollectorHAController haController,
+    TimelineMetricDistributedCache distributedCache) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -269,14 +273,36 @@ public class TimelineMetricAggregatorFactory {
     long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
       (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
 
+    long cacheTimeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
+      (CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL, 30));
+
     int checkpointCutOffMultiplier =
       metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
 
-    String inputTableName = METRICS_RECORD_TABLE_NAME;
     String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
     String aggregatorDisabledParam = CLUSTER_AGGREGATOR_SECOND_DISABLED;
 
     // Second based aggregation have added responsibility of time slicing
+    if (TimelineMetricConfiguration.getInstance().isCollectorInMemoryAggregationEnabled()) {
+      return new TimelineMetricClusterAggregatorSecondWithCacheSource(
+        METRIC_AGGREGATE_SECOND,
+        metadataManager,
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        aggregatorDisabledParam,
+        null,
+        outputTableName,
+        120000l,
+        timeSliceIntervalMillis,
+        haController,
+        distributedCache,
+        cacheTimeSliceIntervalMillis
+      );
+    }
+
+    String inputTableName = METRICS_RECORD_TABLE_NAME;
     return new TimelineMetricClusterAggregatorSecond(
       METRIC_AGGREGATE_SECOND,
       metadataManager,
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
index 55104de..09fbe81 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -31,10 +31,9 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
@@ -104,15 +103,15 @@ public class TimelineMetricAppAggregator {
       // Check app candidacy for host aggregation
       if (appIdsToAggregate.contains(appId)) {
         TimelineMetricHostMetadata timelineMetricHostMetadata = hostMetadata.get(hostname);
-        Set<String> appIds;
+        ConcurrentHashMap<String, String> appIds;
         if (timelineMetricHostMetadata == null) {
-          appIds = new HashSet<>();
+          appIds = new ConcurrentHashMap<>();
           hostMetadata.put(hostname, new TimelineMetricHostMetadata(appIds));
         } else {
           appIds = timelineMetricHostMetadata.getHostedApps();
         }
-        if (!appIds.contains(appId)) {
-          appIds.add(appId);
+        if (!appIds.containsKey(appId)) {
+          appIds.put(appId, appId);
           LOG.info("Adding appId to hosted apps: appId = " +
             clusterMetric.getAppId() + ", hostname = " + hostname);
         }
@@ -132,8 +131,8 @@ public class TimelineMetricAppAggregator {
     }
 
     TimelineMetricMetadataKey appKey =  new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId());
-    Set<String> apps = hostMetadata.get(hostname).getHostedApps();
-    for (String appId : apps) {
+    ConcurrentHashMap<String, String> apps = hostMetadata.get(hostname).getHostedApps();
+    for (String appId : apps.keySet()) {
       if (appIdsToAggregate.contains(appId)) {
 
         appKey.setAppId(appId);
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index a2f23de..773e372 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -22,6 +22,8 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_EVENT_METRIC_PATTERNS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 
@@ -30,7 +32,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -39,12 +40,10 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
@@ -65,16 +64,13 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
   // Aggregator to perform app-level aggregates for host metrics
   private final TimelineMetricAppAggregator appAggregator;
   // 1 minute client side buffering adjustment
-  private final Long serverTimeShiftAdjustment;
-  private final boolean interpolationEnabled;
+  protected final Long serverTimeShiftAdjustment;
+  protected final boolean interpolationEnabled;
   private TimelineMetricMetadataManager metadataManagerInstance;
   private String skipAggrPatternStrings;
-<<<<<<< HEAD
   private String skipInterpolationMetricPatternStrings;
   private Set<Pattern> skipInterpolationMetricPatterns = new HashSet<>();
-=======
   private final static String liveHostsMetricName = "live_hosts";
->>>>>>> AMBARI-21214 : Use a uuid vs long row key for metrics in AMS schema. (avijayan)
 
   public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
                                                TimelineMetricMetadataManager metadataManager,
@@ -99,7 +95,6 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
     this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
     this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
     this.skipAggrPatternStrings = metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
-<<<<<<< HEAD
     this.skipInterpolationMetricPatternStrings = metricsConf.get(TIMELINE_METRICS_EVENT_METRIC_PATTERNS, "");
 
     if (StringUtils.isNotEmpty(skipInterpolationMetricPatternStrings)) {
@@ -109,9 +104,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
         skipInterpolationMetricPatterns.add(Pattern.compile(javaPatternString));
       }
     }
-=======
     this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager, true);
->>>>>>> AMBARI-21214 : Use a uuid vs long row key for metrics in AMS schema. (avijayan)
   }
 
   @Override
@@ -120,7 +113,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
     // timestamps with the difference between server time and series start time
     // Also, we do not want to look at the shift time period from the end as well since we can interpolate those points
     // that come earlier than the expected, during the next run.
-    List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment);
+    List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis);
     // Initialize app aggregates for host metrics
     appAggregator.init();
     Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
@@ -156,19 +149,6 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
     return condition;
   }
 
-  /**
-   * Return time slices to normalize the timeseries data.
-   */
-  List<Long[]> getTimeSlices(long startTime, long endTime) {
-    List<Long[]> timeSlices = new ArrayList<Long[]>();
-    long sliceStartTime = startTime;
-    while (sliceStartTime < endTime) {
-      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis });
-      sliceStartTime += timeSliceIntervalMillis;
-    }
-    return timeSlices;
-  }
-
   Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
       throws SQLException, IOException {
     Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
@@ -241,9 +221,25 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
       return 0;
     }
 
-    Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices);
-    int numHosts = 0;
+    boolean skipInterpolationForMetric = false;
+    for (Pattern pattern : skipInterpolationMetricPatterns) {
+      Matcher m = pattern.matcher(metric.getMetricName());
+      if (m.matches()) {
+        skipInterpolationForMetric = true;
+        LOG.debug("Skipping interpolation for " + metric.getMetricName());
+      }
+    }
 
+    Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices, !skipInterpolationForMetric && interpolationEnabled);
+
+    return aggregateClusterMetricsFromSlices(clusterMetrics, aggregateClusterMetrics, metric.getHostName());
+  }
+
+  protected int aggregateClusterMetricsFromSlices(Map<TimelineClusterMetric, Double> clusterMetrics,
+                                                  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+                                                  String hostname) {
+
+    int numHosts = 0;
     if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
       for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) {
 
@@ -264,191 +260,14 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
 
         numHosts = aggregate.getNumberOfHosts();
         // Update app level aggregates
-        appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue);
+        appAggregator.processTimelineClusterMetric(clusterMetric, hostname, avgValue);
       }
     }
     return numHosts;
   }
 
-  protected Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
-    TimelineMetric timelineMetric, List<Long[]> timeSlices) {
-
-    if (timelineMetric.getMetricValues().isEmpty()) {
-      return null;
-    }
-
-    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
-      new HashMap<TimelineClusterMetric, Double>();
-
-    Long prevTimestamp = -1l;
-    TimelineClusterMetric prevMetric = null;
-    int count = 0;
-    double sum = 0.0;
-
-    Map<Long,Double> timeSliceValueMap = new HashMap<>();
-    for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
-      // TODO: investigate null values - pre filter
-      if (metric.getValue() == null) {
-        continue;
-      }
-
-      Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString()));
-      if (timestamp != -1) {
-        // Metric is within desired time range
-        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
-          timelineMetric.getMetricName(),
-          timelineMetric.getAppId(),
-          timelineMetric.getInstanceId(),
-          timestamp);
-
-        if (prevTimestamp < 0 || timestamp.equals(prevTimestamp)) {
-          Double newValue = metric.getValue();
-          if (newValue > 0.0) {
-            sum += newValue;
-            count++;
-          }
-        } else {
-          double metricValue = (count > 0) ? (sum / count) : 0.0;
-            timelineClusterMetricMap.put(prevMetric, metricValue);
-          timeSliceValueMap.put(prevMetric.getTimestamp(), metricValue);
-          sum = metric.getValue();
-          count = sum > 0.0 ? 1 : 0;
-        }
-
-        prevTimestamp = timestamp;
-        prevMetric = clusterMetric;
-      }
-    }
-
-    if (prevTimestamp > 0) {
-      double metricValue = (count > 0) ? (sum / count) : 0.0;
-      timelineClusterMetricMap.put(prevMetric, metricValue);
-      timeSliceValueMap.put(prevTimestamp, metricValue);
-    }
-
-    if (interpolationEnabled) {
-      interpolateMissingPeriods(timelineClusterMetricMap, timelineMetric, timeSlices, timeSliceValueMap);
-    }
-
-    return timelineClusterMetricMap;
-  }
-
-  private void interpolateMissingPeriods(Map<TimelineClusterMetric, Double> timelineClusterMetricMap,
-                                         TimelineMetric timelineMetric,
-                                         List<Long[]> timeSlices,
-                                         Map<Long, Double> timeSliceValueMap) {
-
-    for (Pattern pattern : skipInterpolationMetricPatterns) {
-      Matcher m = pattern.matcher(timelineMetric.getMetricName());
-      if (m.matches()) {
-        LOG.debug("Skipping interpolation for " + timelineMetric.getMetricName());
-        return;
-      }
-    }
-
-    if (StringUtils.isNotEmpty(timelineMetric.getType()) && "COUNTER".equalsIgnoreCase(timelineMetric.getType())) {
-      //For Counter Based metrics, ok to do interpolation and extrapolation
-
-      List<Long> requiredTimestamps = new ArrayList<>();
-      for (Long[] timeSlice : timeSlices) {
-        if (!timeSliceValueMap.containsKey(timeSlice[1])) {
-          requiredTimestamps.add(timeSlice[1]);
-        }
-      }
-      Map<Long, Double> interpolatedValuesMap = PostProcessingUtil.interpolate(timelineMetric.getMetricValues(), requiredTimestamps);
-
-      if (interpolatedValuesMap != null) {
-        for (Map.Entry<Long, Double> entry : interpolatedValuesMap.entrySet()) {
-          Double interpolatedValue = entry.getValue();
-
-          if (interpolatedValue != null) {
-            TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
-              timelineMetric.getMetricName(),
-              timelineMetric.getAppId(),
-              timelineMetric.getInstanceId(),
-              entry.getKey());
-
-            timelineClusterMetricMap.put(clusterMetric, interpolatedValue);
-          } else {
-            LOG.debug("Cannot compute interpolated value, hence skipping.");
-          }
-        }
-      }
-    } else {
-      //For other metrics, ok to do only interpolation
-
-      Double defaultNextSeenValue = null;
-      if (MapUtils.isEmpty(timeSliceValueMap) && MapUtils.isNotEmpty(timelineMetric.getMetricValues())) {
-        //If no value was found within the start_time based slices, but the metric has value in the server_time range,
-        // use that.
-
-        LOG.debug("No value found within range for metric : " + timelineMetric.getMetricName());
-        Map.Entry<Long,Double> firstEntry  = timelineMetric.getMetricValues().firstEntry();
-        defaultNextSeenValue = firstEntry.getValue();
-        LOG.debug("Found a data point outside timeslice range: " + new Date(firstEntry.getKey()) + ": " + defaultNextSeenValue);
-      }
-
-      for (int sliceNum = 0; sliceNum < timeSlices.size(); sliceNum++) {
-        Long[] timeSlice = timeSlices.get(sliceNum);
-
-        if (!timeSliceValueMap.containsKey(timeSlice[1])) {
-          LOG.debug("Found an empty slice : " + new Date(timeSlice[0]) + ", " + new Date(timeSlice[1]));
-
-          Double lastSeenValue = null;
-          int index = sliceNum - 1;
-          Long[] prevTimeSlice = null;
-          while (lastSeenValue == null && index >= 0) {
-            prevTimeSlice = timeSlices.get(index--);
-            lastSeenValue = timeSliceValueMap.get(prevTimeSlice[1]);
-          }
-
-          Double nextSeenValue = null;
-          index = sliceNum + 1;
-          Long[] nextTimeSlice = null;
-          while (nextSeenValue == null && index < timeSlices.size()) {
-            nextTimeSlice = timeSlices.get(index++);
-            nextSeenValue = timeSliceValueMap.get(nextTimeSlice[1]);
-          }
-
-          if (nextSeenValue == null) {
-            nextSeenValue = defaultNextSeenValue;
-          }
-
-          Double interpolatedValue = PostProcessingUtil.interpolate(timeSlice[1],
-            (prevTimeSlice != null ? prevTimeSlice[1] : null), lastSeenValue,
-            (nextTimeSlice != null ? nextTimeSlice[1] : null), nextSeenValue);
-
-          if (interpolatedValue != null) {
-            TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
-              timelineMetric.getMetricName(),
-              timelineMetric.getAppId(),
-              timelineMetric.getInstanceId(),
-              timeSlice[1]);
-
-            LOG.debug("Interpolated value : " + interpolatedValue);
-            timelineClusterMetricMap.put(clusterMetric, interpolatedValue);
-          } else {
-            LOG.debug("Cannot compute interpolated value, hence skipping.");
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Return end of the time slice into which the metric fits.
-   */
-  private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
-    for (Long[] timeSlice : timeSlices) {
-      if (timestamp > timeSlice[0] && timestamp <= timeSlice[1]) {
-        return timeSlice[1];
-      }
-    }
-    return -1l;
-  }
-
   /* Add cluster metric for number of hosts that are hosting an appId */
-  private void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
+  protected void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
       Map<String, MutableInt> appHostsCount, long timestamp) {
 
     for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) {
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
new file mode 100644
index 0000000..0c030b6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricDistributedCache;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getSliceTimeForMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices;
+
+public class TimelineMetricClusterAggregatorSecondWithCacheSource extends TimelineMetricClusterAggregatorSecond {
+  private TimelineMetricDistributedCache distributedCache;
+  private Long cacheTimeSliceIntervalMillis;
+  public TimelineMetricClusterAggregatorSecondWithCacheSource(AggregationTaskRunner.AGGREGATOR_NAME metricAggregateSecond, TimelineMetricMetadataManager metricMetadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, long sleepIntervalMillis, int checkpointCutOffMultiplier, String aggregatorDisabledParam, String inputTableName, String outputTableName,
+                                                              Long nativeTimeRangeDelay,
+                                                              Long timeSliceInterval,
+                                                              MetricCollectorHAController haController, TimelineMetricDistributedCache distributedCache, Long cacheTimeSliceIntervalMillis) {
+    super(metricAggregateSecond, metricMetadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, inputTableName, outputTableName, nativeTimeRangeDelay, timeSliceInterval, haController);
+    this.distributedCache = distributedCache;
+    this.cacheTimeSliceIntervalMillis = cacheTimeSliceIntervalMillis;
+  }
+
+  @Override
+  public boolean doWork(long startTime, long endTime) {
+    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
+          "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
+    try {
+      Map<String, Double> caheMetrics;
+      if (LOG.isDebugEnabled()) {
+        caheMetrics = distributedCache.getPointInTimeCacheMetrics();
+        LOG.debug("Ignite metrics before eviction : " + caheMetrics);
+      }
+
+      LOG.info("Trying to evict elements from cache");
+      Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = distributedCache.evictMetricAggregates(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment);
+      LOG.info(String.format("Evicted %s elements from cache.", metricsFromCache.size()));
+
+      if (LOG.isDebugEnabled()) {
+        caheMetrics = distributedCache.getPointInTimeCacheMetrics();
+        LOG.debug("Ignite metrics after eviction : " + caheMetrics);
+      }
+
+      List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis);
+      Map<TimelineClusterMetric, MetricClusterAggregate> result = aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeSlices);
+
+      LOG.info("Saving " + result.size() + " metric aggregates.");
+      hBaseAccessor.saveClusterAggregateRecords(result);
+      LOG.info("End aggregation cycle @ " + new Date());
+      return true;
+    } catch (Exception e) {
+      LOG.error("Exception during aggregation. ", e);
+      return false;
+    }
+  }
+
+  //Slices in cache could be different from aggregate slices, so need to recalculate. Counts hosted apps
+  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromMetricClusterAggregates(Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache, List<Long[]> timeSlices) {
+    Map<TimelineClusterMetric, MetricClusterAggregate> result = new HashMap<>();
+
+    //normalize if slices in cache are different from the aggregation slices
+    //TODO add basic interpolation, current implementation assumes that cacheTimeSliceIntervalMillis <= timeSliceIntervalMillis
+    if (cacheTimeSliceIntervalMillis.equals(timeSliceIntervalMillis)) {
+      result = metricsFromCache;
+    } else {
+      for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : metricsFromCache.entrySet()) {
+        Long timestamp = getSliceTimeForMetric(timeSlices, clusterMetricAggregateEntry.getKey().getTimestamp());
+        if (timestamp <= 0) {
+          LOG.warn("Entry doesn't match any slice. Slices : " + timeSlices + " metric timestamp : " + clusterMetricAggregateEntry.getKey().getTimestamp());
+          continue;
+        }
+        TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(clusterMetricAggregateEntry.getKey().getMetricName(), clusterMetricAggregateEntry.getKey().getAppId(), clusterMetricAggregateEntry.getKey().getInstanceId(), timestamp);
+        if (result.containsKey(timelineClusterMetric)) {
+          MetricClusterAggregate metricClusterAggregate = result.get(timelineClusterMetric);
+          metricClusterAggregate.updateMax(clusterMetricAggregateEntry.getValue().getMax());
+          metricClusterAggregate.updateMin(clusterMetricAggregateEntry.getValue().getMin());
+          metricClusterAggregate.setSum((metricClusterAggregate.getSum() + clusterMetricAggregateEntry.getValue().getSum()) / 2D);
+          metricClusterAggregate.setNumberOfHosts(Math.max(metricClusterAggregate.getNumberOfHosts(), clusterMetricAggregateEntry.getValue().getNumberOfHosts()));
+        } else {
+          result.put(timelineClusterMetric, clusterMetricAggregateEntry.getValue());
+        }
+      }
+    }
+
+    //TODO investigate if needed, maybe add config to disable/enable
+    //count hosted apps
+    Map<String, MutableInt> hostedAppCounter = new HashMap<>();
+    for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : result.entrySet()) {
+      int numHosts = clusterMetricAggregateEntry.getValue().getNumberOfHosts();
+      String appId = clusterMetricAggregateEntry.getKey().getAppId();
+      if (!hostedAppCounter.containsKey(appId)) {
+        hostedAppCounter.put(appId, new MutableInt(numHosts));
+      } else {
+        int currentHostCount = hostedAppCounter.get(appId).intValue();
+        if (currentHostCount < numHosts) {
+          hostedAppCounter.put(appId, new MutableInt(numHosts));
+        }
+      }
+    }
+
+    // Add liveHosts per AppId metrics.
+    processLiveAppCountMetrics(result, hostedAppCounter, timeSlices.get(timeSlices.size() - 1)[1]);
+
+    return result;
+  }
+
+}
\ No newline at end of file
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
index a06f4e8..d387394 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
@@ -94,7 +94,7 @@ public class MetricCollectorHAController {
           + zkClientPort +", quorum = " + zkQuorum);
       }
 
-      zkConnectUrl = getZkConnectionUrl(zkClientPort, zkQuorum);
+      zkConnectUrl = configuration.getZkConnectionUrl(zkClientPort, zkQuorum);
 
     } catch (Exception e) {
       LOG.error("Unable to load hbase-site from classpath.", e);
@@ -228,23 +228,6 @@ public class MetricCollectorHAController {
     manager.addLiveInstanceChangeListener(controller);
   }
 
-  private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
-    StringBuilder sb = new StringBuilder();
-    String[] quorumParts = zkQuorum.split(",");
-    String prefix = "";
-    for (String part : quorumParts) {
-      sb.append(prefix);
-      sb.append(part.trim());
-      if (!part.contains(":")) {
-        sb.append(":");
-        sb.append(zkClientPort);
-      }
-      prefix = ",";
-    }
-
-    return sb.toString();
-  }
-
   public AggregationTaskRunner getAggregationTaskRunner() {
     return aggregationTaskRunner;
   }
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java
index 06e9279..37c6394 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java
@@ -18,26 +18,35 @@
 
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery;
 
-import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class TimelineMetricHostMetadata {
-  private Set<String> hostedApps = new HashSet<>();
+  //need concurrent data structure, only keys are used.
+  private ConcurrentHashMap<String, String> hostedApps = new ConcurrentHashMap<>();
   private byte[] uuid;
 
   // Default constructor
   public TimelineMetricHostMetadata() {
   }
 
-  public TimelineMetricHostMetadata(Set<String> hostedApps) {
+  public TimelineMetricHostMetadata(ConcurrentHashMap<String, String> hostedApps) {
     this.hostedApps = hostedApps;
   }
 
-  public Set<String> getHostedApps() {
+  public TimelineMetricHostMetadata(Set<String> hostedApps) {
+    ConcurrentHashMap<String, String> appIdsMap = new ConcurrentHashMap<>();
+    for (String appId : hostedApps) {
+      appIdsMap.put(appId, appId);
+    }
+    this.hostedApps = appIdsMap;
+  }
+
+  public ConcurrentHashMap<String, String> getHostedApps() {
     return hostedApps;
   }
 
-  public void setHostedApps(Set<String> hostedApps) {
+  public void setHostedApps(ConcurrentHashMap<String, String> hostedApps) {
     this.hostedApps = hostedApps;
   }
 
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
index bd508c4..f9ad773 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
@@ -213,9 +213,9 @@ public class TimelineMetricMetadataManager {
    */
   public void putIfModifiedHostedAppsMetadata(String hostname, String appId) {
     TimelineMetricHostMetadata timelineMetricHostMetadata = HOSTED_APPS_MAP.get(hostname);
-    Set<String> apps = (timelineMetricHostMetadata != null) ? timelineMetricHostMetadata.getHostedApps() : null;
+    ConcurrentHashMap<String, String> apps = (timelineMetricHostMetadata != null) ? timelineMetricHostMetadata.getHostedApps() : null;
     if (apps == null) {
-      apps = new HashSet<>();
+      apps = new ConcurrentHashMap<>();
       if (timelineMetricHostMetadata == null) {
         HOSTED_APPS_MAP.put(hostname, new TimelineMetricHostMetadata(apps));
       } else {
@@ -223,8 +223,8 @@ public class TimelineMetricMetadataManager {
       }
     }
 
-    if (!apps.contains(appId)) {
-      apps.add(appId);
+    if (!apps.containsKey(appId)) {
+      apps.put(appId, appId);
       SYNC_HOSTED_APPS_METADATA.set(true);
     }
   }
@@ -362,8 +362,9 @@ public class TimelineMetricMetadataManager {
 
     String uuidStr = new String(uuid);
     if (uuidHostMap.containsKey(uuidStr)) {
+      //TODO fix the collisions
       LOG.error("Duplicate key computed for " + hostname +", Collides with  " + uuidHostMap.get(uuidStr));
-      return null;
+      return uuid;
     }
 
     if (timelineMetricHostMetadata == null) {
@@ -398,8 +399,15 @@ public class TimelineMetricMetadataManager {
     String uuidStr = new String(uuid);
     if (uuidKeyMap.containsKey(uuidStr) && !uuidKeyMap.get(uuidStr).equals(key)) {
       TimelineMetricMetadataKey collidingKey = (TimelineMetricMetadataKey)uuidKeyMap.get(uuidStr);
+      //TODO fix the collisions
+      /**
+       * 2017-08-23 14:12:35,922 ERROR org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager:
+       * Duplicate key [52, 50, 51, 53, 50, 53, 53, 53, 49, 54, 57, 50, 50, 54, 0, 0]([B@278a93f9) computed for
+       * TimelineClusterMetric{metricName='sdisk_dm-11_write_count', appId='hbase', instanceId='', timestamp=1503497400000}, Collides with
+       * TimelineMetricMetadataKey{metricName='sdisk_dm-20_write_count', appId='hbase', instanceId=''}
+       */
       LOG.error("Duplicate key " + Arrays.toString(uuid) + "(" + uuid +  ") computed for " + timelineClusterMetric.toString() + ", Collides with  " + collidingKey.toString());
-      return null;
+      return uuid;
     }
 
     if (timelineMetricMetadata == null) {
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
index f808cd7..96af877 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java
@@ -134,7 +134,7 @@ public class TimelineMetricMetadataSync implements Runnable {
           // No persistence / stale data in store
           if (persistedData == null || persistedData.isEmpty() ||
             !persistedData.containsKey(cacheEntry.getKey()) ||
-            !persistedData.get(cacheEntry.getKey()).getHostedApps().containsAll(cacheEntry.getValue().getHostedApps())) {
+            !persistedData.get(cacheEntry.getKey()).getHostedApps().keySet().containsAll(cacheEntry.getValue().getHostedApps().keySet())) {
             dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue());
           }
         }
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
index f35c23a..10e9c61 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
@@ -192,6 +192,10 @@ public class HashBasedUuidGenStrategy implements MetricUuidGenStrategy {
       }
     }
 
+    if (numericValue != 0) {
+      seed+=numericValue;
+    }
+
     String seedStr = String.valueOf(seed);
     if (seedStr.length() < maxLength) {
       return null;
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index c60554c..57f9796 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -202,7 +202,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
-        hdb, new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        hdb, new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
@@ -242,7 +242,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
new file mode 100644
index 0000000..d3c6061
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TimelineMetricConfiguration.class)
+
+@PowerMockIgnore("javax.management.*")
+public class TimelineMetricsIgniteCacheTest {
+  private static TimelineMetricsIgniteCache timelineMetricsIgniteCache;
+  @BeforeClass
+  public static void setupConf() throws Exception {
+    TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
+      Configuration(), new Configuration());
+    mockStatic(TimelineMetricConfiguration.class);
+    expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+    conf.getMetricsConf().set(CLUSTER_AGGREGATOR_APP_IDS, "appIdForHostsAggr");
+    conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
+    replayAll();
+
+    timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
+  }
+
+  @Test
+  public void putEvictMetricsFromCacheSlicesMerging() throws Exception {
+    long cacheSliceIntervalMillis = 30000L;
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once();
+    replay(metricMetadataManagerMock);
+
+    long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
+
+    long seconds = 1000;
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    /*
+
+    0        +30s      +60s
+    |         |         |
+     (1)(2)(3) (4)(5)(6)  h1
+
+    */
+    // Case 1 : data points are distributed equally, no values are lost, single host.
+    metricValues.put(startTime + 4*seconds, 1.0);
+    metricValues.put(startTime + 14*seconds, 2.0);
+    metricValues.put(startTime + 24*seconds, 3.0);
+    metricValues.put(startTime + 34*seconds, 4.0);
+    metricValues.put(startTime + 44*seconds, 5.0);
+    metricValues.put(startTime + 54*seconds, 6.0);
+
+    TimelineMetric timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+    timelineMetric.setStartTime(metricValues.firstKey());
+    timelineMetric.addMetricValues(metricValues);
+
+    Collection<TimelineMetric> timelineMetrics = new ArrayList<>();
+    timelineMetrics.add(timelineMetric);
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
+
+    Assert.assertEquals(aggregateMap.size(), 2);
+    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
+      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
+
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(2.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    metricValues.clear();
+    timelineMetrics.clear();
+
+    /*
+
+    0        +30s      +60s
+    |         |         |
+     (1)(2)(3) (4)(5)(6)   h1, h2
+
+    */
+    // Case 2 : data points are distributed equally, no values are lost, two hosts.
+    metricValues.put(startTime + 4*seconds, 1.0);
+    metricValues.put(startTime + 14*seconds, 2.0);
+    metricValues.put(startTime + 24*seconds, 3.0);
+    metricValues.put(startTime + 34*seconds, 4.0);
+    metricValues.put(startTime + 44*seconds, 5.0);
+    metricValues.put(startTime + 54*seconds, 6.0);
+
+    timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 5*seconds, 2.0);
+    metricValues.put(startTime + 15*seconds, 4.0);
+    metricValues.put(startTime + 25*seconds, 6.0);
+    metricValues.put(startTime + 35*seconds, 8.0);
+    metricValues.put(startTime + 45*seconds, 10.0);
+    metricValues.put(startTime + 55*seconds, 12.0);
+    TimelineMetric timelineMetric2 = new TimelineMetric("metric1", "host2", "app1", "instance1");
+    timelineMetric2.setMetricValues(metricValues);
+
+    timelineMetrics = new ArrayList<>();
+    timelineMetrics.add(timelineMetric);
+    timelineMetrics.add(timelineMetric2);
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+    aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
+
+    Assert.assertEquals(aggregateMap.size(), 2);
+    timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
+      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
+
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(6.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(15.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    metricValues.clear();
+    timelineMetrics.clear();
+
+    /*
+
+    0      +30s    +60s    +90s
+    |       |       |       |
+     (1)      (2)                h1
+                (3)       (4)    h2
+                 (5)      (6)    h1
+
+    */
+    // Case 3 : merging host data points, ignore (2) for h1 as it will conflict with (5), two hosts.
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 15*seconds, 1.0);
+    metricValues.put(startTime + 45*seconds, 2.0);
+    timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 45*seconds, 3.0);
+    metricValues.put(startTime + 85*seconds, 4.0);
+    timelineMetric = new TimelineMetric("metric1", "host2", "app1", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 55*seconds, 5.0);
+    metricValues.put(startTime + 85*seconds, 6.0);
+    timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+
+    aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
+
+    Assert.assertEquals(aggregateMap.size(), 3);
+    timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
+      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
+
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(1.0, aggregateMap.get(timelineClusterMetric).getSum());
+    Assert.assertEquals(1, aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
+
+    timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(8.0, aggregateMap.get(timelineClusterMetric).getSum());
+    Assert.assertEquals(2, aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
+
+    timelineClusterMetric.setTimestamp(startTime + 3*30*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(10.0, aggregateMap.get(timelineClusterMetric).getSum());
+    Assert.assertEquals(2, aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
+
+    metricValues.clear();
+    timelineMetrics.clear();
+
+    Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
+  }
+
+  @Test
+  public void updateAppAggregatesFromHostMetricTest() {
+    //make sure hosts metrics are aggregated for appIds from "timeline.metrics.service.cluster.aggregator.appIds"
+
+    long cacheSliceIntervalMillis = 30000L;
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once();
+    expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
+
+    long seconds = 1000;
+
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    List<TimelineMetric> timelineMetrics = new ArrayList<>();
+    TimelineMetric timelineMetric;
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 15*seconds, 1.0);
+    metricValues.put(startTime + 55*seconds, 2.0);
+    timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 45*seconds, 3.0);
+    metricValues.put(startTime + 85*seconds, 4.0);
+    timelineMetric = new TimelineMetric("app_metric", "host1", "appIdForHostsAggr", "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 85*seconds, 5.0);
+    timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 85*seconds, 6.0);
+    timelineMetric = new TimelineMetric("host_metric", "host2", HOST_APP_ID, "instance1");
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetrics.add(timelineMetric);
+
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
+    Assert.assertEquals(aggregateMap.size(), 6);
+    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
+        timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 90*seconds);
+
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(11.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric = new TimelineClusterMetric("app_metric",
+        "appIdForHostsAggr", "instance1", startTime + 90*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(4.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    timelineClusterMetric = new TimelineClusterMetric("host_metric",
+        "appIdForHostsAggr", "instance1", startTime + 90*seconds);
+    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum());
+
+    Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
index ea947d0..b4d0f0a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import static junit.framework.Assert.assertEquals;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
 
 public class AbstractTimelineAggregatorTest {
 
@@ -114,7 +116,7 @@ public class AbstractTimelineAggregatorTest {
   public void testDoWorkOnZeroDelay() throws Exception {
 
     long currentTime = System.currentTimeMillis();
-    long roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime,
+    long roundedOffAggregatorTime = getRoundedCheckPointTimeMillis(currentTime,
       sleepIntervalMillis);
 
     //Test first run of aggregator with no checkpoint
@@ -138,7 +140,7 @@ public class AbstractTimelineAggregatorTest {
     currentTime = System.currentTimeMillis();
     checkPoint.set(currentTime - 16*60*1000); //Old checkpoint
     agg.runOnce(sleepIntervalMillis);
-    long checkPointTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(sleepIntervalMillis);
+    long checkPointTime = getRoundedAggregateTimeMillis(sleepIntervalMillis);
     assertEquals("startTime should be zero", checkPointTime - sleepIntervalMillis, startTimeInDoWork.get());
     assertEquals("endTime  should be zero", checkPointTime, endTimeInDoWork.get());
     assertEquals(roundedOffAggregatorTime, checkPoint.get());
@@ -147,10 +149,10 @@ public class AbstractTimelineAggregatorTest {
 
 //    //Test first run with perfect checkpoint (sleepIntervalMillis back)
     currentTime = System.currentTimeMillis();
-    roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime,
+    roundedOffAggregatorTime = getRoundedCheckPointTimeMillis(currentTime,
       sleepIntervalMillis);
     checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis;
-    long expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
+    long expectedCheckPoint = getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
     checkPoint.set(checkPointTime);
     agg.runOnce(sleepIntervalMillis);
     assertEquals("startTime should the lower rounded time of the checkpoint time",
@@ -165,7 +167,7 @@ public class AbstractTimelineAggregatorTest {
     currentTime = System.currentTimeMillis();
     checkPoint.set(currentTime - 2*sleepIntervalMillis + 5000);
     agg.runOnce(sleepIntervalMillis);
-    long expectedStartTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime - 2*sleepIntervalMillis + 5000, sleepIntervalMillis);
+    long expectedStartTime = getRoundedCheckPointTimeMillis(currentTime - 2*sleepIntervalMillis + 5000, sleepIntervalMillis);
     assertEquals("startTime should the lower rounded time of the checkpoint time",
       expectedStartTime, startTimeInDoWork.get());
     assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis",
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
index a9f2b4d..1c5f41f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -69,7 +69,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -121,7 +121,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -196,7 +196,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // here we put some metrics tha will be aggregated
@@ -479,7 +479,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        conf, new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        conf, new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -529,7 +529,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testClusterAggregateMetricNormalization() throws Exception {
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // Sample data
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index 937dd80..eb38625 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -18,12 +18,14 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
-import static org.easymock.EasyMock.anyBoolean;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 
 import java.sql.ResultSet;
@@ -51,23 +53,16 @@ public class TimelineMetricClusterAggregatorSecondTest {
     long sliceInterval = 30000l;
     long metricInterval = 10000l;
 
-    Configuration configuration = new Configuration();
     TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
     expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once();
     replay(metricMetadataManagerMock);
 
-    TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond(
-      METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null,
-      configuration, null, aggregatorInterval, 2, "false", "", "",
-      aggregatorInterval, sliceInterval, null);
-
-    secondAggregator.timeSliceIntervalMillis = sliceInterval;
-    long roundedEndTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(aggregatorInterval);
+    long roundedEndTime = getRoundedAggregateTimeMillis(aggregatorInterval);
     long roundedStartTime = roundedEndTime - aggregatorInterval;
-    List<Long[]> timeSlices = secondAggregator.getTimeSlices(roundedStartTime ,
-      roundedEndTime);
+    List<Long[]> timeSlices = getTimeSlices(roundedStartTime ,
+      roundedEndTime, sliceInterval);
 
-    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
 
     long startTime = roundedEndTime - aggregatorInterval;
 
@@ -85,7 +80,7 @@ public class TimelineMetricClusterAggregatorSecondTest {
     counterMetric.setMetricValues(metricValues);
     counterMetric.setType("COUNTER");
 
-    Map<TimelineClusterMetric, Double> timelineClusterMetricMap = secondAggregator.sliceFromTimelineMetric(counterMetric, timeSlices);
+    Map<TimelineClusterMetric, Double> timelineClusterMetricMap = sliceFromTimelineMetric(counterMetric, timeSlices, true);
 
     TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(counterMetric.getMetricName(), counterMetric.getAppId(),
       counterMetric.getInstanceId(), 0l);
@@ -104,7 +99,7 @@ public class TimelineMetricClusterAggregatorSecondTest {
     metric.setAppId("TestAppId");
     metric.setMetricValues(metricValues);
 
-    timelineClusterMetricMap = secondAggregator.sliceFromTimelineMetric(metric, timeSlices);
+    timelineClusterMetricMap = sliceFromTimelineMetric(metric, timeSlices, true);
 
     timelineClusterMetric = new TimelineClusterMetric(metric.getMetricName(), metric.getAppId(),
       metric.getInstanceId(), 0l);
@@ -116,7 +111,6 @@ public class TimelineMetricClusterAggregatorSecondTest {
     timelineClusterMetric.setTimestamp(roundedStartTime + 4*sliceInterval);
     Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric));
     Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 7.5);
-
   }
 
   @Test
@@ -137,8 +131,8 @@ public class TimelineMetricClusterAggregatorSecondTest {
       aggregatorInterval, 2, "false", "", "", aggregatorInterval, sliceInterval, null
     );
 
-    long startTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(System.currentTimeMillis(),aggregatorInterval);
-    List<Long[]> timeslices = secondAggregator.getTimeSlices(startTime, startTime + aggregatorInterval);
+    long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(),aggregatorInterval);
+    List<Long[]> timeslices = getTimeSlices(startTime, startTime + aggregatorInterval, sliceInterval);
 
     Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>();
     long seconds = 1000;
@@ -367,7 +361,7 @@ public class TimelineMetricClusterAggregatorSecondTest {
     long now = System.currentTimeMillis();
     long startTime = now - 120000;
     long seconds = 1000;
-    List<Long[]> slices = secondAggregator.getTimeSlices(startTime, now);
+    List<Long[]> slices = getTimeSlices(startTime, now, sliceInterval);
     ResultSet rs = createNiceMock(ResultSet.class);
 
     TreeMap<Long, Double> metricValues = new TreeMap<>();
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
new file mode 100644
index 0000000..7cddb00
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsIgniteCache;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TimelineMetricConfiguration.class)
+
+@PowerMockIgnore("javax.management.*")
+public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
+
+  private static TimelineMetricsIgniteCache timelineMetricsIgniteCache;
+  @BeforeClass
+  public static void setupConf() throws Exception {
+    TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
+        Configuration(), new Configuration());
+    mockStatic(TimelineMetricConfiguration.class);
+    expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+    conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
+    replayAll();
+
+    timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
+  }
+
+  @Test
+  public void testLiveHostCounterMetrics() throws Exception {
+    long aggregatorInterval = 120000;
+    long sliceInterval = 30000;
+
+    Configuration configuration = new Configuration();
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = new TimelineMetricClusterAggregatorSecondWithCacheSource(
+        METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
+        aggregatorInterval, 2, "false", "", "", aggregatorInterval,
+        sliceInterval, null, timelineMetricsIgniteCache, 30L);
+
+    long now = System.currentTimeMillis();
+    long startTime = now - 120000;
+    long seconds = 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = new HashMap<>();
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 15 * seconds),
+        new MetricClusterAggregate(1.0, 2, 1.0, 1.0, 1.0));
+    metricsFromCache.put(new TimelineClusterMetric("m2", "a2", "i1",startTime + 18 * seconds),
+        new MetricClusterAggregate(1.0, 5, 1.0, 1.0, 1.0));
+
+    List<Long[]> timeslices = getTimeSlices(startTime, startTime + 120*seconds, 30*seconds);
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeslices);
+
+    Assert.assertNotNull(aggregates);
+
+    MetricClusterAggregate a1 = null, a2 = null;
+
+    for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> m : aggregates.entrySet()) {
+      if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a1")) {
+        a1 = m.getValue();
+      }
+      if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a2")) {
+        a2 = m.getValue();
+      }
+    }
+
+    Assert.assertNotNull(a1);
+    Assert.assertNotNull(a2);
+    Assert.assertEquals(2d, a1.getSum());
+    Assert.assertEquals(5d, a2.getSum());
+  }
+
+  @Test
+  public void testSlicesRecalculation() throws Exception {
+    long aggregatorInterval = 120000;
+    long sliceInterval = 30000;
+
+    Configuration configuration = new Configuration();
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = new TimelineMetricClusterAggregatorSecondWithCacheSource(
+        METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
+        aggregatorInterval, 2, "false", "", "", aggregatorInterval,
+        sliceInterval, null, timelineMetricsIgniteCache, 30L);
+
+    long seconds = 1000;
+    long now = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), 120*seconds);
+    long startTime = now - 120*seconds;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = new HashMap<>();
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 5 * seconds),
+        new MetricClusterAggregate(1.0, 2, 1.0, 1.0, 1.0));
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 25 * seconds),
+        new MetricClusterAggregate(2.0, 2, 1.0, 2.0, 2.0));
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 45 * seconds),
+        new MetricClusterAggregate(3.0, 2, 1.0, 1.0, 1.0));
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 65 * seconds),
+        new MetricClusterAggregate(4.0, 2, 1.0, 4.0, 4.0));
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 85 * seconds),
+        new MetricClusterAggregate(5.0, 2, 1.0, 5.0, 5.0));
+
+    List<Long[]> timeslices = getTimeSlices(startTime, startTime + 120*seconds, 30*seconds);
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeslices);
+
+    Assert.assertNotNull(aggregates);
+    Assert.assertEquals(4, aggregates.size());
+
+    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric("m1", "a1", "i1", startTime + 30*seconds);
+    MetricClusterAggregate metricClusterAggregate = aggregates.get(timelineClusterMetric);
+    Assert.assertNotNull(metricClusterAggregate);
+    Assert.assertEquals(1.5, metricClusterAggregate.getSum());
+    Assert.assertEquals(1d, metricClusterAggregate.getMin());
+    Assert.assertEquals(2d, metricClusterAggregate.getMax());
+    Assert.assertEquals(2, metricClusterAggregate.getNumberOfHosts());
+
+    timelineClusterMetric.setTimestamp(startTime + 60*seconds);
+    metricClusterAggregate = aggregates.get(timelineClusterMetric);
+    Assert.assertNotNull(metricClusterAggregate);
+    Assert.assertEquals(3d, metricClusterAggregate.getSum());
+
+    timelineClusterMetric.setTimestamp(startTime + 90*seconds);
+    metricClusterAggregate = aggregates.get(timelineClusterMetric);
+    Assert.assertNotNull(metricClusterAggregate);
+    Assert.assertEquals(4.5d, metricClusterAggregate.getSum());
+
+    timelineClusterMetric = new TimelineClusterMetric("live_hosts", "a1", null, startTime + 120*seconds);
+    metricClusterAggregate = aggregates.get(timelineClusterMetric);
+    Assert.assertNotNull(metricClusterAggregate);
+    Assert.assertEquals(2d, metricClusterAggregate.getSum());
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
index a0bc77f..e14d069 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
@@ -52,6 +52,7 @@ public class MetricCollectorHAControllerTest extends AbstractMiniHBaseClusterTes
 
     expect(configuration.getClusterZKClientPort()).andReturn(port);
     expect(configuration.getClusterZKQuorum()).andReturn(quorum);
+    expect(configuration.getZkConnectionUrl(port, quorum)).andReturn(quorum + ":" + port);
 
     replay(configuration);
   }
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
index f9a1036..3e3b91f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
@@ -120,10 +120,10 @@ public class TestMetadataManager extends AbstractMiniHBaseClusterTest {
     Map<String, TimelineMetricHostMetadata> cachedHostData = metadataManager.getHostedAppsCache();
     Map<String, TimelineMetricHostMetadata> savedHostData = metadataManager.getHostedAppsFromStore();
     Assert.assertEquals(cachedData.size(), savedData.size());
-    Assert.assertEquals("dummy_app1", cachedHostData.get("dummy_host1").getHostedApps().iterator().next());
-    Assert.assertEquals("dummy_app2", cachedHostData.get("dummy_host2").getHostedApps().iterator().next());
-    Assert.assertEquals("dummy_app1", savedHostData.get("dummy_host1").getHostedApps().iterator().next());
-    Assert.assertEquals("dummy_app2", savedHostData.get("dummy_host2").getHostedApps().iterator().next());
+    Assert.assertEquals("dummy_app1", cachedHostData.get("dummy_host1").getHostedApps().keySet().iterator().next());
+    Assert.assertEquals("dummy_app2", cachedHostData.get("dummy_host2").getHostedApps().keySet().iterator().next());
+    Assert.assertEquals("dummy_app1", savedHostData.get("dummy_host1").getHostedApps().keySet().iterator().next());
+    Assert.assertEquals("dummy_app2", savedHostData.get("dummy_host2").getHostedApps().keySet().iterator().next());
 
     Map<String, Set<String>> cachedHostInstanceData = metadataManager.getHostedInstanceCache();
     Map<String, Set<String>> savedHostInstanceData = metadataManager.getHostedInstancesFromStore();
diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml
index b4b070a..0d4767d 100644
--- a/ambari-metrics/pom.xml
+++ b/ambari-metrics/pom.xml
@@ -206,8 +206,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
       <plugin>

-- 
To stop receiving notification emails like this one, please contact
avijayan@apache.org.