You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2017/05/24 22:43:09 UTC

[2/3] ambari git commit: AMBARI-21079. Add ability to sink Raw metrics to external system via Http. (swagle)

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
new file mode 100644
index 0000000..967d819
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache.InternalMetricsCache;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache.InternalMetricsCacheProvider;
+
+public class RawMetricsSource implements InternalMetricsSource {
+  private static final Log LOG = LogFactory.getLog(RawMetricsSource.class);
+  private final int internalCacheInterval;
+  private final ExternalMetricsSink rawMetricsSink;
+  private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+  private final InternalMetricsCache cache;
+  static final String RAW_METRICS_CACHE = "RAW_METRICS_CACHE_INSTANCE";
+
+  public RawMetricsSource(int internalCacheInterval, ExternalMetricsSink rawMetricsSink) {
+    this.internalCacheInterval = internalCacheInterval;
+    this.rawMetricsSink = rawMetricsSink;
+    this.cache = InternalMetricsCacheProvider.getInstance().getCacheInstance(RAW_METRICS_CACHE);
+    if (rawMetricsSink.getFlushSeconds() > internalCacheInterval) {
+      initializeFixedRateScheduler();
+    }
+  }
+
+  @Override
+  public void publishTimelineMetrics(Collection<TimelineMetrics> metrics) {
+    // TODO: Adjust default flush to reasonable defaults > 3 seconds
+    if (rawMetricsSink.getFlushSeconds() > internalCacheInterval) {
+      // Need to cache only if external sink cannot keep up and thereby has
+      // different flush interval as compared to HBase flush
+      cache.putAll(metrics); // Scheduler initialized already for flush
+    } else {
+      submitDataWithTimeout(metrics);
+    }
+  }
+
+  private void initializeFixedRateScheduler() {
+    executorService.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        rawMetricsSink.sinkMetricData(cache.evictAll());
+      }
+    }, rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS);
+  }
+
+  private void submitDataWithTimeout(final Collection<TimelineMetrics> metrics) {
+    Future f = executorService.submit(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        rawMetricsSink.sinkMetricData(metrics);
+        return null;
+      }
+    });
+    try {
+      f.get(rawMetricsSink.getSinkTimeOutSeconds(), TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn("Raw metrics sink interrupted.");
+    } catch (ExecutionException e) {
+      LOG.warn("Exception on sinking metrics", e);
+    } catch (TimeoutException e) {
+      LOG.warn("Timeout exception on sinking metrics", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java
new file mode 100644
index 0000000..28d457d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache;
+
+public class InternalMetricCacheKey {
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private String hostname;
+  private long startTime; // Useful for debugging
+
+  public InternalMetricCacheKey(String metricName, String appId, String instanceId, String hostname, long startTime) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.hostname = hostname;
+    this.startTime = startTime;
+  }
+
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public void setMetricName(String metricName) {
+    this.metricName = metricName;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public void setAppId(String appId) {
+    this.appId = appId;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  public void setHostname(String hostname) {
+    this.hostname = hostname;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    InternalMetricCacheKey that = (InternalMetricCacheKey) o;
+
+    if (!getMetricName().equals(that.getMetricName())) return false;
+    if (!getAppId().equals(that.getAppId())) return false;
+    if (getInstanceId() != null ? !getInstanceId().equals(that.getInstanceId()) : that.getInstanceId() != null)
+      return false;
+    return getHostname() != null ? getHostname().equals(that.getHostname()) : that.getHostname() == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = getMetricName().hashCode();
+    result = 31 * result + getAppId().hashCode();
+    result = 31 * result + (getInstanceId() != null ? getInstanceId().hashCode() : 0);
+    result = 31 * result + (getHostname() != null ? getHostname().hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "InternalMetricCacheKey{" +
+      "metricName='" + metricName + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", hostname='" + hostname + '\'' +
+      ", startTime=" + startTime +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java
new file mode 100644
index 0000000..a4dabe7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class InternalMetricCacheValue {
+  private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+  public TreeMap<Long, Double> getMetricValues() {
+    return metricValues;
+  }
+
+  public void setMetricValues(TreeMap<Long, Double> metricValues) {
+    this.metricValues = metricValues;
+  }
+
+  public void addMetricValues(Map<Long, Double> metricValues) {
+    this.metricValues.putAll(metricValues);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java
new file mode 100644
index 0000000..a4ed9bc
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheException;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.config.PersistenceConfiguration;
+import net.sf.ehcache.config.SizeOfPolicyConfiguration;
+import net.sf.ehcache.event.CacheEventListener;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+
+public class InternalMetricsCache {
+  private static final Log LOG = LogFactory.getLog(InternalMetricsCache.class);
+  private final String instanceName;
+  private final String maxHeapPercent;
+  private volatile boolean isCacheInitialized = false;
+  private Cache cache;
+  static final String TIMELINE_METRIC_CACHE_MANAGER_NAME = "internalMetricsCacheManager";
+  private final Lock lock = new ReentrantLock();
+  private static final int LOCK_TIMEOUT_SECONDS = 2;
+
+  public InternalMetricsCache(String instanceName, String maxHeapPercent) {
+    this.instanceName = instanceName;
+    this.maxHeapPercent = maxHeapPercent;
+    initialize();
+  }
+
+  private void initialize() {
+    // Check in case of contention to avoid ObjectExistsException
+    if (isCacheInitialized) {
+      throw new RuntimeException("Cannot initialize internal cache twice");
+    }
+
+    System.setProperty("net.sf.ehcache.skipUpdateCheck", "true");
+    System.setProperty("net.sf.ehcache.sizeofengine." + TIMELINE_METRIC_CACHE_MANAGER_NAME,
+      "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache.InternalMetricsCacheSizeOfEngine");
+
+    net.sf.ehcache.config.Configuration managerConfig =
+      new net.sf.ehcache.config.Configuration();
+    managerConfig.setName(TIMELINE_METRIC_CACHE_MANAGER_NAME);
+
+    // Set max heap available to the cache manager
+    managerConfig.setMaxBytesLocalHeap(maxHeapPercent);
+
+    //Create a singleton CacheManager using defaults
+    CacheManager manager = CacheManager.create(managerConfig);
+
+    LOG.info("Creating Metrics Cache with maxHeapPercent => " + maxHeapPercent);
+
+    // Create a Cache specifying its configuration.
+    CacheConfiguration cacheConfiguration = new CacheConfiguration()
+      .name(instanceName)
+      .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
+      .sizeOfPolicy(new SizeOfPolicyConfiguration() // Set sizeOf policy to continue on max depth reached - avoid OOM
+        .maxDepth(10000)
+        .maxDepthExceededBehavior(SizeOfPolicyConfiguration.MaxDepthExceededBehavior.CONTINUE))
+      .eternal(true) // infinite time until eviction
+      .persistence(new PersistenceConfiguration()
+        .strategy(PersistenceConfiguration.Strategy.NONE.name()));
+
+    cache = new Cache(cacheConfiguration);
+    cache.getCacheEventNotificationService().registerListener(new InternalCacheEvictionListener());
+
+    LOG.info("Registering internal metrics cache with provider: name = " +
+      cache.getName() + ", guid: " + cache.getGuid());
+
+    manager.addCache(cache);
+
+    isCacheInitialized = true;
+  }
+
+  public InternalMetricCacheValue getInternalMetricCacheValue(InternalMetricCacheKey key) {
+    Element ele = cache.get(key);
+    if (ele != null) {
+      return (InternalMetricCacheValue) ele.getObjectValue();
+    }
+    return null;
+  }
+
+  public Collection<TimelineMetrics> evictAll() {
+    TimelineMetrics metrics = new TimelineMetrics();
+    try {
+      if (lock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+        try{
+          List keys = cache.getKeys();
+          for (Object obj : keys) {
+            TimelineMetric metric = new TimelineMetric();
+            InternalMetricCacheKey key = (InternalMetricCacheKey) obj;
+            metric.setMetricName(key.getMetricName());
+            metric.setAppId(key.getAppId());
+            metric.setInstanceId(key.getInstanceId());
+            metric.setHostName(key.getHostname());
+            metric.setStartTime(key.getStartTime());
+            metric.setTimestamp(key.getStartTime());
+            Element ele = cache.get(key);
+            metric.setMetricValues(((InternalMetricCacheValue) ele.getObjectValue()).getMetricValues());
+            metrics.getMetrics().add(metric);
+          }
+          cache.removeAll();
+        } finally {
+          lock.unlock();
+        }
+      } else {
+        LOG.warn("evictAll: Unable to acquire lock on the cache instance. " +
+          "Giving up after " + LOCK_TIMEOUT_SECONDS + " seconds.");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting to acquire lock");
+    }
+
+    return Collections.singletonList(metrics);
+  }
+
+  public void putAll(Collection<TimelineMetrics> metrics) {
+    try {
+      if (lock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+        try {
+          if (metrics != null) {
+            for (TimelineMetrics timelineMetrics : metrics) {
+              for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+                InternalMetricCacheKey key = new InternalMetricCacheKey(
+                  timelineMetric.getMetricName(),
+                  timelineMetric.getAppId(),
+                  timelineMetric.getInstanceId(),
+                  timelineMetric.getHostName(),
+                  timelineMetric.getStartTime()
+                );
+
+                Element ele = cache.get(key);
+                if (ele != null) {
+                  InternalMetricCacheValue value = (InternalMetricCacheValue) ele.getObjectValue();
+                  value.addMetricValues(timelineMetric.getMetricValues());
+                } else {
+                  InternalMetricCacheValue value = new InternalMetricCacheValue();
+                  value.setMetricValues(timelineMetric.getMetricValues());
+                  cache.put(new Element(key, value));
+                }
+              }
+            }
+          }
+        } finally {
+          lock.unlock();
+        }
+      } else {
+        LOG.warn("putAll: Unable to acquire lock on the cache instance. " +
+          "Giving up after " + LOCK_TIMEOUT_SECONDS + " seconds.");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting to acquire lock");
+    }
+  }
+
+  class InternalCacheEvictionListener implements CacheEventListener {
+
+    @Override
+    public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException {
+      // expected
+    }
+
+    @Override
+    public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
+      // do nothing
+    }
+
+    @Override
+    public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException {
+      // do nothing
+    }
+
+    @Override
+    public void notifyElementExpired(Ehcache cache, Element element) {
+      // do nothing
+    }
+
+    @Override
+    public void notifyElementEvicted(Ehcache cache, Element element) {
+      // Bad - Remote endpoint cannot keep up resulting in flooding
+      InternalMetricCacheKey key = (InternalMetricCacheKey) element.getObjectKey();
+      LOG.warn("Evicting element from internal metrics cache, metric => " + key
+        .getMetricName() + ", startTime = " + new Date(key.getStartTime()));
+    }
+
+    @Override
+    public void notifyRemoveAll(Ehcache cache) {
+      // expected
+    }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+      return null;
+    }
+
+    @Override
+    public void dispose() {
+      // do nothing
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java
new file mode 100644
index 0000000..3e0dc1b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+
+public class InternalMetricsCacheProvider {
+  private Map<String, InternalMetricsCache> metricsCacheMap = new ConcurrentHashMap<>();
+  private static final InternalMetricsCacheProvider instance = new InternalMetricsCacheProvider();
+
+  private InternalMetricsCacheProvider() {
+  }
+
+  public static InternalMetricsCacheProvider getInstance() {
+    return instance;
+  }
+
+  public InternalMetricsCache getCacheInstance(String instanceName) {
+    if (metricsCacheMap.containsKey(instanceName)) {
+      return metricsCacheMap.get(instanceName);
+    } else {
+      TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
+      InternalMetricsCache cache = new InternalMetricsCache(instanceName,
+        conf.getInternalCacheHeapPercent(instanceName));
+
+      metricsCacheMap.put(instanceName, cache);
+      return cache;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java
new file mode 100644
index 0000000..d1a1a89
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache;
+
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsEhCacheSizeOfEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import net.sf.ehcache.pool.Size;
+import net.sf.ehcache.pool.SizeOfEngine;
+
+public class InternalMetricsCacheSizeOfEngine extends TimelineMetricsEhCacheSizeOfEngine {
+  private final static Logger LOG = LoggerFactory.getLogger(InternalMetricsCacheSizeOfEngine.class);
+
+  private InternalMetricsCacheSizeOfEngine(SizeOfEngine underlying) {
+    super(underlying);
+  }
+
+  public InternalMetricsCacheSizeOfEngine() {
+    // Invoke default constructor in base class
+  }
+
+  @Override
+  public Size sizeOf(Object key, Object value, Object container) {
+    try {
+      LOG.debug("BEGIN - Sizeof, key: {}, value: {}", key, value);
+      long size = 0;
+      if (key instanceof InternalMetricCacheKey) {
+        InternalMetricCacheKey metricCacheKey = (InternalMetricCacheKey) key;
+        size += reflectionSizeOf.sizeOf(metricCacheKey.getMetricName());
+        size += reflectionSizeOf.sizeOf(metricCacheKey.getAppId());
+        size += reflectionSizeOf.sizeOf(metricCacheKey.getInstanceId()); // null safe
+        size += reflectionSizeOf.sizeOf(metricCacheKey.getHostname());
+      }
+      if (value instanceof InternalMetricCacheValue) {
+        size += getValueMapSize(((InternalMetricCacheValue) value).getMetricValues());
+      }
+      // Mark size as not being exact
+      return new Size(size, false);
+    } finally {
+      LOG.debug("END - Sizeof, key: {}", key);
+    }
+  }
+
+  @Override
+  public SizeOfEngine copyWith(int maxDepth, boolean abortWhenMaxDepthExceeded) {
+    LOG.debug("Copying tracing sizeof engine, maxdepth: {}, abort: {}",
+      maxDepth, abortWhenMaxDepthExceeded);
+
+    return new InternalMetricsCacheSizeOfEngine(underlying.copyWith(maxDepth, abortWhenMaxDepthExceeded));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
index 3688630..41ddef5 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricsService;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
@@ -73,7 +73,7 @@ import static org.powermock.api.support.membermodification.MemberMatcher.method;
 import static org.powermock.api.support.membermodification.MemberModifier.suppress;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ PhoenixHBaseAccessor.class, HBaseTimelineMetricStore.class, UserGroupInformation.class,
+@PrepareForTest({ PhoenixHBaseAccessor.class, HBaseTimelineMetricsService.class, UserGroupInformation.class,
   ClientCnxn.class, DefaultPhoenixDataSource.class, ConnectionFactory.class,
   TimelineMetricConfiguration.class, ApplicationHistoryServer.class })
 @PowerMockIgnore( {"javax.management.*"})

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index 611d82e..fbf7b09 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -17,10 +17,32 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -41,24 +63,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.assertj.core.api.Assertions.assertThat;
-
 public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
 
   protected static final long BATCH_SIZE = 3;
@@ -200,11 +204,8 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
     metricsConf.setLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, 600000);
 
     return
-      new PhoenixHBaseAccessor(
-        new Configuration(),
-        metricsConf,
+      new PhoenixHBaseAccessor(new TimelineMetricConfiguration(new Configuration(), metricsConf),
         new PhoenixConnectionProvider() {
-
           @Override
           public HBaseAdmin getHBaseAdmin() throws IOException {
             try {
@@ -229,7 +230,7 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
   }
 
   protected void insertMetricRecords(Connection conn, TimelineMetrics metrics, long currentTime)
-                                    throws SQLException, IOException {
+    throws SQLException, IOException {
 
     List<TimelineMetric> timelineMetrics = metrics.getMetrics();
     if (timelineMetrics == null || timelineMetrics.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
index aae1d4b..f035678 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
@@ -33,7 +33,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
 import static org.assertj.core.api.Assertions.assertThat;
 
-public class HBaseTimelineMetricStoreTest {
+public class HBaseTimelineMetricsServiceTest {
 
   public static final String MEM_METRIC = "mem";
   public static final String BYTES_IN_METRIC = "bytes_in";
@@ -51,7 +51,7 @@ public class HBaseTimelineMetricStoreTest {
 
     //when
     Multimap<String, List<Function>> multimap =
-      HBaseTimelineMetricStore.parseMetricNamesToAggregationFunctions(metricNames);
+      HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames);
 
     //then
     Assert.assertEquals(multimap.keySet().size(), 3);
@@ -104,7 +104,7 @@ public class HBaseTimelineMetricStoreTest {
     metricValues.put(1454016728371L, 1011.25);
 
     // Calculate rate
-    Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues), false);
+    Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false);
 
     // Make sure rate is zero
     for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) {
@@ -122,7 +122,7 @@ public class HBaseTimelineMetricStoreTest {
     metricValues.put(1454016548371L, 1010.25);
     metricValues.put(1454016608371L, 1010.25);
 
-    Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues), true);
+    Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true);
 
     Assert.assertTrue(rates.size()==4);
     Assert.assertTrue(rates.containsValue(-1.0));

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index d5baaef..f6d69f6 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -17,9 +17,33 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import junit.framework.Assert;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.DATE_TIERED_COMPACTION_POLICY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_ENGINE_CLASS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -40,35 +64,10 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.sql.SQLException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.DATE_TIERED_COMPACTION_POLICY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_ENGINE_CLASS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
+import junit.framework.Assert;
 
 
 
@@ -93,7 +92,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     // WHEN
     long endTime = ctime + minute;
     Condition condition = new DefaultCondition(
-      Collections.singletonList("disk_free"), Collections.singletonList("local1"),
+      new ArrayList<String>() {{ add("disk_free"); }},
+      Collections.singletonList("local1"),
       null, null, startTime, endTime, Precision.SECONDS, null, true);
     TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
       singletonValueFunctionMap("disk_free"));
@@ -117,18 +117,19 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     long ctime = startTime;
     long minute = 60 * 1000;
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-        "disk_free", 1));
+      "disk_free", 1));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + minute, "local1",
-        "disk_free", 2));
+      "disk_free", 2));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
-        "disk_free", 2));
+      "disk_free", 2));
     long endTime = ctime + minute;
     boolean success = aggregatorMinute.doWork(startTime, endTime);
     assertTrue(success);
 
     // WHEN
     Condition condition = new DefaultCondition(
-      Collections.singletonList("disk_free"), Collections.singletonList("local1"),
+      new ArrayList<String>() {{ add("disk_free"); }},
+      Collections.singletonList("local1"),
       null, null, startTime, endTime, Precision.MINUTES, null, false);
     TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
       singletonValueFunctionMap("disk_free"));
@@ -151,10 +152,10 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
       TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration(), null);
 
     MetricHostAggregate expectedAggregate =
-        createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+      createMetricHostAggregate(2.0, 0.0, 20, 15.0);
     Map<TimelineMetric, MetricHostAggregate>
-        aggMap = new HashMap<TimelineMetric,
-        MetricHostAggregate>();
+      aggMap = new HashMap<TimelineMetric,
+      MetricHostAggregate>();
 
     long startTime = System.currentTimeMillis();
     int min_5 = 5 * 60 * 1000;
@@ -179,7 +180,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
 
     // WHEN
     Condition condition = new DefaultCondition(
-      Collections.singletonList("disk_used"), Collections.singletonList("test_host"),
+      new ArrayList<String>() {{ add("disk_free"); }},
+      Collections.singletonList("test_host"),
       "test_app", null, startTime, endTime, Precision.HOURS, null, true);
     TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
       singletonValueFunctionMap("disk_used"));
@@ -200,20 +202,20 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
-        hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+        hdb, new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
     long minute = 60 * 1000;
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-        "disk_free", 1));
+      "disk_free", 1));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
-        "disk_free", 2));
+      "disk_free", 2));
     ctime += minute;
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-        "disk_free", 2));
+      "disk_free", 2));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
-        "disk_free", 1));
+      "disk_free", 1));
 
     long endTime = ctime + minute + 1;
     boolean success = agg.doWork(startTime, endTime);
@@ -221,8 +223,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
 
     // WHEN
     Condition condition = new DefaultCondition(
-        Collections.singletonList("disk_free"), null, null, null,
-        startTime, endTime, Precision.SECONDS, null, true);
+      new ArrayList<String>() {{ add("disk_free"); }},
+      null, null, null, startTime, endTime, Precision.SECONDS, null, true);
     TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition,
       singletonValueFunctionMap("disk_free"));
 
@@ -240,7 +242,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+        new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
@@ -261,8 +263,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
 
     // WHEN
     Condition condition = new DefaultCondition(
-      Collections.singletonList("disk_free"), null, null, null,
-      null, null, Precision.SECONDS, null, true);
+      new ArrayList<String>() {{ add("disk_free"); }},
+      null, null, null, null, null, Precision.SECONDS, null, true);
 
     Multimap<String, List<Function>> mmap = ArrayListMultimap.create();
     mmap.put("disk_free", Collections.singletonList(new Function(Function.ReadFunction.SUM, null)));
@@ -288,14 +290,14 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     long minute = 60 * 1000;
 
     Map<TimelineClusterMetric, MetricClusterAggregate> records =
-        new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
 
     records.put(createEmptyTimelineClusterMetric(ctime),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
     records.put(createEmptyTimelineClusterMetric(ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
     records.put(createEmptyTimelineClusterMetric(ctime += minute),
-        new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
     records.put(createEmptyTimelineClusterMetric(ctime += minute),
       new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
 
@@ -305,8 +307,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
 
     // WHEN
     Condition condition = new DefaultCondition(
-        Collections.singletonList("disk_used"), null, null, null,
-        startTime, ctime + minute, Precision.HOURS, null, true);
+      new ArrayList<String>() {{ add("disk_free"); }},
+      null, null, null, startTime, ctime + minute, Precision.HOURS, null, true);
     TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition,
       singletonValueFunctionMap("disk_used"));
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index d668178..bf9246d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.easymock.EasyMock;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
@@ -42,6 +43,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -53,22 +55,36 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.powermock.api.easymock.PowerMock.*;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(PhoenixTransactSQL.class)
+@PrepareForTest({PhoenixTransactSQL.class, TimelineMetricConfiguration.class})
 public class PhoenixHBaseAccessorTest {
   private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
 
-  @Test
-  public void testGetMetricRecords() throws SQLException, IOException {
+  PhoenixConnectionProvider connectionProvider;
+  PhoenixHBaseAccessor accessor;
 
+  @Before
+  public void setupConf() throws Exception {
     Configuration hbaseConf = new Configuration();
     hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
     Configuration metricsConf = new Configuration();
+    metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1");
+    metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "100");
+    metricsConf.setStrings(
+      TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS,
+      "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsAggregatorMemorySink");
+
+    TimelineMetricConfiguration conf = new TimelineMetricConfiguration(hbaseConf, metricsConf);
+    mockStatic(TimelineMetricConfiguration.class);
+    expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+    replayAll();
 
-    PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
+    connectionProvider = new PhoenixConnectionProvider() {
       @Override
       public HBaseAdmin getHBaseAdmin() throws IOException {
         return null;
@@ -80,21 +96,24 @@ public class PhoenixHBaseAccessorTest {
       }
     };
 
-    PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider);
+    accessor = new PhoenixHBaseAccessor(connectionProvider);
+  }
 
+  @Test
+  public void testGetMetricRecords() throws SQLException, IOException {
     List<String> metricNames = new LinkedList<>();
     List<String> hostnames = new LinkedList<>();
     Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create();
 
-    PowerMock.mockStatic(PhoenixTransactSQL.class);
+    mockStatic(PhoenixTransactSQL.class);
     PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class);
     Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
-    EasyMock.expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
+    expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
     ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
-    EasyMock.expect(preparedStatementMock.executeQuery()).andReturn(rsMock);
+    expect(preparedStatementMock.executeQuery()).andReturn(rsMock);
 
 
-    PowerMock.replayAll();
+    replayAll();
     EasyMock.replay(preparedStatementMock, rsMock);
 
     // Check when startTime < endTime
@@ -105,104 +124,64 @@ public class PhoenixHBaseAccessorTest {
     TimelineMetrics tml2 = accessor.getMetricRecords(condition2, metricFunctions);
     assertEquals(0, tml2.getMetrics().size());
 
-    PowerMock.verifyAll();
+    verifyAll();
     EasyMock.verify(preparedStatementMock, rsMock);
   }
 
   @Test
-  public void testGetMetricRecordsIOException()
-    throws SQLException, IOException {
-
-    Configuration hbaseConf = new Configuration();
-    hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
-    Configuration metricsConf = new Configuration();
-
-    PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
-      @Override
-      public HBaseAdmin getHBaseAdmin() throws IOException {
-        return null;
-      }
-
-      @Override
-      public Connection getConnection() throws SQLException {
-        return null;
-      }
-    };
-
-    PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider);
-
+  public void testGetMetricRecordsIOException() throws SQLException, IOException {
     List<String> metricNames = new LinkedList<>();
     List<String> hostnames = new LinkedList<>();
     Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create();
 
-    PowerMock.mockStatic(PhoenixTransactSQL.class);
+    mockStatic(PhoenixTransactSQL.class);
     PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class);
     Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
-    EasyMock.expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
+    expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
     ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
     RuntimeException runtimeException = EasyMock.createNiceMock(RuntimeException.class);
     IOException io = EasyMock.createNiceMock(IOException.class);
-    EasyMock.expect(preparedStatementMock.executeQuery()).andThrow(runtimeException);
-    EasyMock.expect(runtimeException.getCause()).andReturn(io).atLeastOnce();
+    expect(preparedStatementMock.executeQuery()).andThrow(runtimeException);
+    expect(runtimeException.getCause()).andReturn(io).atLeastOnce();
     StackTraceElement stackTrace[] = new StackTraceElement[]{new StackTraceElement("TimeRange","method","file",1)};
-    EasyMock.expect(io.getStackTrace()).andReturn(stackTrace).atLeastOnce();
+    expect(io.getStackTrace()).andReturn(stackTrace).atLeastOnce();
 
 
-    PowerMock.replayAll();
+    replayAll();
     EasyMock.replay(preparedStatementMock, rsMock, io, runtimeException);
 
     TimelineMetrics tml = accessor.getMetricRecords(condition, metricFunctions);
 
     assertEquals(0, tml.getMetrics().size());
 
-    PowerMock.verifyAll();
+    verifyAll();
     EasyMock.verify(preparedStatementMock, rsMock, io, runtimeException);
   }
 
   @Test
-  public void testGetMetricRecordsPhoenixIOExceptionDoNotRetryException()
-    throws SQLException, IOException {
-
-    Configuration hbaseConf = new Configuration();
-    hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
-    Configuration metricsConf = new Configuration();
-
-    PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
-      @Override
-      public HBaseAdmin getHBaseAdmin() throws IOException {
-        return null;
-      }
-
-      @Override
-      public Connection getConnection() throws SQLException {
-        return null;
-      }
-    };
-
-    PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider);
-
+  public void testGetMetricRecordsPhoenixIOExceptionDoNotRetryException() throws SQLException, IOException {
     List<String> metricNames = new LinkedList<>();
     List<String> hostnames = new LinkedList<>();
     Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create();
 
-    PowerMock.mockStatic(PhoenixTransactSQL.class);
+    mockStatic(PhoenixTransactSQL.class);
     PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class);
     Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", null, null, Precision.SECONDS, 10, true);
-    EasyMock.expect(PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
+    expect(PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
     PhoenixTransactSQL.setSortMergeJoinEnabled(true);
     EasyMock.expectLastCall();
     ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
     PhoenixIOException pioe1 = EasyMock.createNiceMock(PhoenixIOException.class);
     PhoenixIOException pioe2 = EasyMock.createNiceMock(PhoenixIOException.class);
     DoNotRetryIOException dnrioe = EasyMock.createNiceMock(DoNotRetryIOException.class);
-    EasyMock.expect(preparedStatementMock.executeQuery()).andThrow(pioe1);
-    EasyMock.expect(pioe1.getCause()).andReturn(pioe2).atLeastOnce();
-    EasyMock.expect(pioe2.getCause()).andReturn(dnrioe).atLeastOnce();
+    expect(preparedStatementMock.executeQuery()).andThrow(pioe1);
+    expect(pioe1.getCause()).andReturn(pioe2).atLeastOnce();
+    expect(pioe2.getCause()).andReturn(dnrioe).atLeastOnce();
     StackTraceElement stackTrace[] = new StackTraceElement[]{new StackTraceElement("HashJoinRegionScanner","method","file",1)};
-    EasyMock.expect(dnrioe.getStackTrace()).andReturn(stackTrace).atLeastOnce();
+    expect(dnrioe.getStackTrace()).andReturn(stackTrace).atLeastOnce();
 
 
-    PowerMock.replayAll();
+    replayAll();
     EasyMock.replay(preparedStatementMock, rsMock, pioe1, pioe2, dnrioe);
     try {
       accessor.getMetricRecords(condition, metricFunctions);
@@ -210,20 +189,17 @@ public class PhoenixHBaseAccessorTest {
     } catch (Exception e) {
       //NOP
     }
-    PowerMock.verifyAll();
+    verifyAll();
   }
 
   @Test
   public void testMetricsCacheCommittingWhenFull() throws IOException, SQLException {
     Configuration hbaseConf = new Configuration();
     hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
-    Configuration metricsConf = new Configuration();
-    metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1");
-    metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "100");
-    final Connection connection = EasyMock.createNiceMock(Connection.class);
 
+    final Connection connection = EasyMock.createNiceMock(Connection.class);
 
-    PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf) {
+    accessor = new PhoenixHBaseAccessor(connectionProvider) {
       @Override
       public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) {
         try {
@@ -235,7 +211,7 @@ public class PhoenixHBaseAccessorTest {
     };
 
     TimelineMetrics timelineMetrics = EasyMock.createNiceMock(TimelineMetrics.class);
-    EasyMock.expect(timelineMetrics.getMetrics()).andReturn(Collections.singletonList(new TimelineMetric())).anyTimes();
+    expect(timelineMetrics.getMetrics()).andReturn(Collections.singletonList(new TimelineMetric())).anyTimes();
     connection.commit();
     EasyMock.expectLastCall().once();
 
@@ -250,44 +226,33 @@ public class PhoenixHBaseAccessorTest {
 
   @Test
   public void testMetricsAggregatorSink() throws IOException, SQLException {
-    Configuration hbaseConf = new Configuration();
-    hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
-    Configuration metricsConf = new Configuration();
     Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMap =
         new HashMap<>();
     Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap =
         new HashMap<>();
     Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = new HashMap<>();
 
-    metricsConf.setStrings(
-        TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1");
-    metricsConf.setStrings(
-        TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL,
-        "100");
-    metricsConf.setStrings(
-        TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS,
-        "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsAggregatorMemorySink");
 
     final Connection connection = EasyMock.createNiceMock(Connection.class);
-    final PreparedStatement statement =
-        EasyMock.createNiceMock(PreparedStatement.class);
-    EasyMock.expect(connection.prepareStatement(EasyMock.anyString()))
-        .andReturn(statement).anyTimes();
+    final PreparedStatement statement = EasyMock.createNiceMock(PreparedStatement.class);
+    expect(connection.prepareStatement(EasyMock.anyString())).andReturn(statement).anyTimes();
     EasyMock.replay(statement);
     EasyMock.replay(connection);
 
-    PhoenixConnectionProvider connectionProvider =
-        new PhoenixConnectionProvider() {
-          @Override
-          public HBaseAdmin getHBaseAdmin() throws IOException {
-            return null;
-          }
+    connectionProvider = new PhoenixConnectionProvider() {
+
+      @Override
+      public HBaseAdmin getHBaseAdmin() throws IOException {
+        return null;
+      }
+
+      @Override
+      public Connection getConnection() throws SQLException {
+        return connection;
+      }
+    };
 
-          @Override
-          public Connection getConnection() throws SQLException {
-            return connection;
-          }
-        };
+    accessor = new PhoenixHBaseAccessor(connectionProvider);
 
     TimelineClusterMetric clusterMetric =
         new TimelineClusterMetric("metricName", "appId", "instanceId",
@@ -303,12 +268,10 @@ public class PhoenixHBaseAccessorTest {
     clusterTimeAggregateMap.put(clusterMetric, new MetricHostAggregate());
     hostAggregateMap.put(timelineMetric, new MetricHostAggregate());
 
-    PhoenixHBaseAccessor accessor =
-        new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider);
     accessor.saveClusterAggregateRecords(clusterAggregateMap);
     accessor.saveHostAggregateRecords(hostAggregateMap,
         PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
-    accessor.saveClusterTimeAggregateRecords(clusterTimeAggregateMap,
+    accessor.saveClusterAggregateRecordsSecond(clusterTimeAggregateMap,
         PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
 
     TimelineMetricsAggregatorMemorySink memorySink =

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
index 54b8442..dd0378d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
@@ -66,7 +66,7 @@ public class TimelineMetricStoreWatcherTest {
     replay(metricStore);
 
     TimelineMetricStoreWatcher timelineMetricStoreWatcher =
-      new TimelineMetricStoreWatcher(metricStore, new TimelineMetricConfiguration());
+      new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance());
     timelineMetricStoreWatcher.run();
     timelineMetricStoreWatcher.run();
     timelineMetricStoreWatcher.run();
@@ -97,7 +97,7 @@ public class TimelineMetricStoreWatcherTest {
     replayAll();
 
     TimelineMetricStoreWatcher timelineMetricStoreWatcher =
-      new TimelineMetricStoreWatcher(metricStore, new TimelineMetricConfiguration());
+      new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance());
     timelineMetricStoreWatcher.run();
     timelineMetricStoreWatcher.run();
     timelineMetricStoreWatcher.run();

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
index 07fd85d..86c9b40 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -18,7 +18,29 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
-import junit.framework.Assert;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
@@ -26,42 +48,13 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+import junit.framework.Assert;
 
 public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   private final TimelineMetricReadHelper metricReader = new TimelineMetricReadHelper(false);
@@ -77,7 +70,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -130,7 +123,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -206,7 +199,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     // GIVEN
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // here we put some metrics tha will be aggregated
@@ -290,7 +283,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
       MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
 
 
-    hdb.saveClusterTimeAggregateRecords(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+    hdb.saveClusterAggregateRecordsSecond(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
 
     // WHEN
     agg.doWork(startTime, ctime + hour + 1000);
@@ -490,7 +483,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        conf, new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+        conf, new TimelineMetricMetadataManager(new Configuration(), hdb), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -511,14 +504,13 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
 
     //THEN
     Condition condition = new DefaultCondition(
-      Collections.singletonList("cpu_user"), null, "app1", null,
+      new ArrayList<String>() {{ add("cpu_user"); }}, null, "app1", null,
       startTime, endTime, null, null, true);
     condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
       METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
 
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
     ResultSet rs = pstmt.executeQuery();
 
     int recordCount = 0;
@@ -542,7 +534,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testClusterAggregateMetricNormalization() throws Exception {
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
-        getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+        getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // Sample data

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
index c62fd34..3adf770 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
@@ -45,7 +45,7 @@ public class TestMetadataManager extends AbstractMiniHBaseClusterTest {
   @Before
   public void insertDummyRecords() throws IOException, SQLException, URISyntaxException {
     // Initialize new manager
-    metadataManager = new TimelineMetricMetadataManager(hdb, new Configuration());
+    metadataManager = new TimelineMetricMetadataManager(new Configuration(), hdb);
     final long now = System.currentTimeMillis();
 
     TimelineMetrics timelineMetrics = new TimelineMetrics();

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
index 181abca..a524b13 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
@@ -68,8 +68,7 @@ public class TestMetadataSync {
 
     replay(configuration, hBaseAccessor);
 
-    TimelineMetricMetadataManager metadataManager = new
-      TimelineMetricMetadataManager(hBaseAccessor, configuration);
+    TimelineMetricMetadataManager metadataManager = new TimelineMetricMetadataManager(new Configuration(), hBaseAccessor);
 
     metadataManager.metricMetadataSync = new TimelineMetricMetadataSync(metadataManager);
 
@@ -110,8 +109,7 @@ public class TestMetadataSync {
 
     replay(configuration, hBaseAccessor);
 
-    TimelineMetricMetadataManager metadataManager = new
-      TimelineMetricMetadataManager(hBaseAccessor, configuration);
+    TimelineMetricMetadataManager metadataManager = new TimelineMetricMetadataManager(configuration, hBaseAccessor);
 
     metadataManager.putIfModifiedTimelineMetricMetadata(metadata1);
     metadataManager.putIfModifiedTimelineMetricMetadata(metadata2);

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java
new file mode 100644
index 0000000..5d3aacb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import junit.framework.Assert;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TimelineMetricConfiguration.class)
+public class RawMetricsSourceTest {
+
+  @Before
+  public void setupConf() throws Exception {
+    TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
+      Configuration(), new Configuration());
+    mockStatic(TimelineMetricConfiguration.class);
+    expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+    replayAll();
+  }
+
+  @Test
+  public void testRawMetricsSourcedAtFlushInterval() throws Exception {
+    InternalSourceProvider internalSourceProvider = new DefaultInternalMetricsSourceProvider();
+    ExternalMetricsSink rawMetricsSink = createNiceMock(ExternalMetricsSink.class);
+    expect(rawMetricsSink.getFlushSeconds()).andReturn(1);
+    expect(rawMetricsSink.getSinkTimeOutSeconds()).andReturn(1);
+    Capture<Collection<TimelineMetrics>> metricsCapture = new Capture<>();
+    rawMetricsSink.sinkMetricData(capture(metricsCapture));
+    expectLastCall();
+    replay(rawMetricsSink);
+
+    InternalMetricsSource rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, 1, rawMetricsSink);
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+    final long now = System.currentTimeMillis();
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("m1");
+    metric1.setAppId("a1");
+    metric1.setInstanceId("i1");
+    metric1.setHostName("h1");
+    metric1.setStartTime(now - 200);
+    metric1.setMetricValues(new TreeMap<Long, Double>() {{
+      put(now - 100, 1.0);
+      put(now - 200, 2.0);
+    }});
+    timelineMetrics.getMetrics().add(metric1);
+
+    rawMetricsSource.publishTimelineMetrics(Collections.singletonList(timelineMetrics));
+
+    verify(rawMetricsSink);
+  }
+
+  @Test(timeout = 10000)
+  public void testRawMetricsCachedAndSourced() throws Exception {
+    ExternalMetricsSink rawMetricsSink = createNiceMock(ExternalMetricsSink.class);
+    expect(rawMetricsSink.getFlushSeconds()).andReturn(2).anyTimes();
+    expect(rawMetricsSink.getSinkTimeOutSeconds()).andReturn(1).anyTimes();
+
+    class CaptureOnce<T> extends Capture<T> {
+      @Override
+      public void setValue(T value) {
+        if (!hasCaptured()) {
+          super.setValue(value);
+        }
+      }
+    }
+    Capture<Collection<TimelineMetrics>> metricsCapture = new CaptureOnce<>();
+
+    rawMetricsSink.sinkMetricData(capture(metricsCapture));
+    expectLastCall();
+    replay(rawMetricsSink);
+
+    InternalSourceProvider internalSourceProvider = new DefaultInternalMetricsSourceProvider();
+    InternalMetricsSource rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, 1, rawMetricsSink);
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+    final long now = System.currentTimeMillis();
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("m1");
+    metric1.setAppId("a1");
+    metric1.setInstanceId("i1");
+    metric1.setHostName("h1");
+    metric1.setStartTime(now - 200);
+    metric1.setTimestamp(now - 200);
+    metric1.setMetricValues(new TreeMap<Long, Double>() {{
+      put(now - 100, 1.0);
+      put(now - 200, 2.0);
+    }});
+    timelineMetrics.getMetrics().add(metric1);
+
+    rawMetricsSource.publishTimelineMetrics(Collections.singletonList(timelineMetrics));
+
+    // Wait on eviction
+    Thread.sleep(5000);
+
+    verify(rawMetricsSink);
+
+    Assert.assertTrue(metricsCapture.hasCaptured());
+    Assert.assertTrue(metricsCapture.getValue().iterator().next().getMetrics().iterator().next().equals(metric1));
+  }
+
+}