You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2017/05/30 23:45:36 UTC

ambari git commit: AMBARI-21079. Add ability to sink Raw metrics to external system via Http. Renamed files to fix build. (swagle)

Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-ams 0e5094fdb -> 7e970233a


AMBARI-21079. Add ability to sink Raw metrics to external system via Http. Renamed files to fix build. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7e970233
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7e970233
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7e970233

Branch: refs/heads/branch-3.0-ams
Commit: 7e970233a5cf046903baf87d1e4b4d89264db4f3
Parents: 0e5094f
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue May 30 16:45:27 2017 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue May 30 16:45:27 2017 -0700

----------------------------------------------------------------------
 .../timeline/HBaseTimelineMetricStore.java      | 544 -------------------
 .../timeline/HBaseTimelineMetricsService.java   | 544 +++++++++++++++++++
 .../timeline/HBaseTimelineMetricStoreTest.java  | 132 -----
 .../HBaseTimelineMetricsServiceTest.java        | 132 +++++
 4 files changed, 676 insertions(+), 676 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
deleted file mode 100644
index 9ebc64c..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
-import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.*;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
-
-public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore {
-
-  static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class);
-  private final TimelineMetricConfiguration configuration;
-  private PhoenixHBaseAccessor hBaseAccessor;
-  private static volatile boolean isInitialized = false;
-  private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
-  private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>();
-  private TimelineMetricMetadataManager metricMetadataManager;
-  private Integer defaultTopNHostsLimit;
-  private MetricCollectorHAController haController;
-  private AmsKafkaProducer kafkaProducer = new AmsKafkaProducer("104.196.85.21:6667");
-
-  /**
-   * Construct the service.
-   *
-   */
-  public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) {
-    super(HBaseTimelineMetricsService.class.getName());
-    this.configuration = configuration;
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
-    initializeSubsystem();
-  }
-
-  private synchronized void initializeSubsystem() {
-    if (!isInitialized) {
-      hBaseAccessor = new PhoenixHBaseAccessor(null);
-      // Initialize schema
-      hBaseAccessor.initMetricSchema();
-      // Initialize metadata from store
-      try {
-        metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
-      } catch (MalformedURLException | URISyntaxException e) {
-        throw new ExceptionInInitializerError("Unable to initialize metadata manager");
-      }
-      metricMetadataManager.initializeMetadata();
-      // Initialize policies before TTL update
-      hBaseAccessor.initPoliciesAndTTL();
-      // Start HA service
-      // Start the controller
-      if (!configuration.isDistributedCollectorModeDisabled()) {
-        haController = new MetricCollectorHAController(configuration);
-        try {
-          haController.initializeHAController();
-        } catch (Exception e) {
-          LOG.error(e);
-          throw new MetricsSystemInitializationException("Unable to " +
-            "initialize HA controller", e);
-        }
-      } else {
-        LOG.info("Distributed collector mode disabled");
-      }
-
-      //Initialize whitelisting & blacklisting if needed
-      TimelineMetricsFilter.initializeMetricFilter(configuration);
-
-      Configuration metricsConf = null;
-      try {
-        metricsConf = configuration.getMetricsConf();
-      } catch (Exception e) {
-        throw new ExceptionInInitializerError("Cannot initialize configuration.");
-      }
-
-      defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
-      if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
-        LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
-      }
-
-      // Start the cluster aggregator second
-      TimelineMetricAggregator secondClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
-          hBaseAccessor, metricsConf, metricMetadataManager, haController);
-      scheduleAggregatorThread(secondClusterAggregator);
-
-      // Start the minute cluster aggregator
-      TimelineMetricAggregator minuteClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
-          hBaseAccessor, metricsConf, haController);
-      scheduleAggregatorThread(minuteClusterAggregator);
-
-      // Start the hourly cluster aggregator
-      TimelineMetricAggregator hourlyClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
-          hBaseAccessor, metricsConf, haController);
-      scheduleAggregatorThread(hourlyClusterAggregator);
-
-      // Start the daily cluster aggregator
-      TimelineMetricAggregator dailyClusterAggregator =
-        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
-          hBaseAccessor, metricsConf, haController);
-      scheduleAggregatorThread(dailyClusterAggregator);
-
-      // Start the minute host aggregator
-      if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) {
-        LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector");
-      } else {
-        TimelineMetricAggregator minuteHostAggregator =
-          TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
-            hBaseAccessor, metricsConf, haController);
-        scheduleAggregatorThread(minuteHostAggregator);
-      }
-
-      // Start the hourly host aggregator
-      TimelineMetricAggregator hourlyHostAggregator =
-        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
-          hBaseAccessor, metricsConf, haController);
-      scheduleAggregatorThread(hourlyHostAggregator);
-
-      // Start the daily host aggregator
-      TimelineMetricAggregator dailyHostAggregator =
-        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
-          hBaseAccessor, metricsConf, haController);
-      scheduleAggregatorThread(dailyHostAggregator);
-
-      if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
-        int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
-        int delay = configuration.getTimelineMetricsServiceWatcherDelay();
-        // Start the watchdog
-        watchdogExecutorService.scheduleWithFixedDelay(
-          new TimelineMetricStoreWatcher(this, configuration),
-          initDelay, delay, TimeUnit.SECONDS);
-        LOG.info("Started watchdog for timeline metrics store with initial " +
-          "delay = " + initDelay + ", delay = " + delay);
-      }
-
-      isInitialized = true;
-    }
-
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-  }
-
-  @Override
-  public TimelineMetrics getTimelineMetrics(List<String> metricNames,
-      List<String> hostnames, String applicationId, String instanceId,
-      Long startTime, Long endTime, Precision precision, Integer limit,
-      boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException {
-
-    if (metricNames == null || metricNames.isEmpty()) {
-      throw new IllegalArgumentException("No metric name filter specified.");
-    }
-    if ((startTime == null && endTime != null)
-        || (startTime != null && endTime == null)) {
-      throw new IllegalArgumentException("Open ended query not supported ");
-    }
-    if (limit != null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT){
-      throw new IllegalArgumentException("Limit too big");
-    }
-
-    TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null;
-    if (!StringUtils.isEmpty(seriesAggregateFunction)) {
-      SeriesAggregateFunction func = SeriesAggregateFunction.getFunction(seriesAggregateFunction);
-      seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func);
-    }
-
-    Multimap<String, List<Function>> metricFunctions =
-      parseMetricNamesToAggregationFunctions(metricNames);
-
-    ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet()))
-      .hostnames(hostnames)
-      .appId(applicationId)
-      .instanceId(instanceId)
-      .startTime(startTime)
-      .endTime(endTime)
-      .precision(precision)
-      .limit(limit)
-      .grouped(groupedByHosts);
-
-    if (topNConfig != null) {
-      if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true.
-        TopNCondition.isTopNMetricCondition(metricNames, hostnames)) {
-        conditionBuilder.topN(topNConfig.getTopN());
-        conditionBuilder.isBottomN(topNConfig.getIsBottomN());
-        Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction());
-        Function function = new Function(readFunction, null);
-        conditionBuilder.topNFunction(function);
-      } else {
-        LOG.info("Invalid Input for TopN query. Ignoring TopN Request.");
-      }
-    } else if (startTime != null && hostnames != null && hostnames.size() > defaultTopNHostsLimit) {
-      // if (timeseries query AND hostnames passed AND size(hostnames) > limit)
-      LOG.info("Requesting data for more than " + defaultTopNHostsLimit +  " Hosts. " +
-        "Defaulting to Top " + defaultTopNHostsLimit);
-      conditionBuilder.topN(defaultTopNHostsLimit);
-      conditionBuilder.isBottomN(false);
-    }
-
-    Condition condition = conditionBuilder.build();
-
-    TimelineMetrics metrics;
-
-    if (hostnames == null || hostnames.isEmpty()) {
-      metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions);
-    } else {
-      metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions);
-    }
-
-    metrics = postProcessMetrics(metrics);
-
-    if (metrics.getMetrics().size() == 0) {
-      return metrics;
-    }
-
-    return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics);
-  }
-
-  private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) {
-    List<TimelineMetric> metricsList = metrics.getMetrics();
-
-    for (TimelineMetric metric : metricsList){
-      String name = metric.getMetricName();
-      if (name.contains("._rate")){
-        updateValuesAsRate(metric.getMetricValues(), false);
-      } else if (name.contains("._diff")) {
-        updateValuesAsRate(metric.getMetricValues(), true);
-      }
-    }
-
-    return metrics;
-  }
-
-  private TimelineMetrics seriesAggregateMetrics(TimelineMetricsSeriesAggregateFunction seriesAggrFuncInstance,
-      TimelineMetrics metrics) {
-    if (seriesAggrFuncInstance != null) {
-      TimelineMetric appliedMetric = seriesAggrFuncInstance.apply(metrics);
-      metrics.setMetrics(Collections.singletonList(appliedMetric));
-    }
-    return metrics;
-  }
-
-  static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues, boolean isDiff) {
-    Long prevTime = null;
-    Double prevVal = null;
-    long step;
-    Double diff;
-
-    for(Iterator<Map.Entry<Long, Double>> it = metricValues.entrySet().iterator(); it.hasNext(); ) {
-      Map.Entry<Long, Double> timeValueEntry = it.next();
-      Long currTime = timeValueEntry.getKey();
-      Double currVal = timeValueEntry.getValue();
-
-      if (prevTime != null) {
-        step = currTime - prevTime;
-        diff = currVal - prevVal;
-        Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
-        timeValueEntry.setValue(rate);
-      } else {
-        it.remove();
-      }
-
-      prevTime = currTime;
-      prevVal = currVal;
-    }
-
-    return metricValues;
-  }
-
-  static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) {
-    Multimap<String, List<Function>> metricsFunctions = ArrayListMultimap.create();
-
-    for (String metricName : metricNames){
-      Function function = Function.DEFAULT_VALUE_FUNCTION;
-      String cleanMetricName = metricName;
-
-      try {
-        function = Function.fromMetricName(metricName);
-        int functionStartIndex = metricName.indexOf("._");
-        if (functionStartIndex > 0) {
-          cleanMetricName = metricName.substring(0, functionStartIndex);
-        }
-      } catch (Function.FunctionFormatException ffe){
-        // unknown function so
-        // fallback to VALUE, and fullMetricName
-      }
-
-      List<Function>  functionsList = new ArrayList<>();
-      functionsList.add(function);
-      metricsFunctions.put(cleanMetricName, functionsList);
-    }
-
-    return metricsFunctions;
-  }
-
-  @Override
-  public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException {
-    // Error indicated by the Sql exception
-    TimelinePutResponse response = new TimelinePutResponse();
-
-    try {
-      if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) {
-        kafkaProducer.sendMetrics(fromTimelineMetrics(metrics));
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      LOG.error(e);
-    }
-    hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
-
-    return response;
-  }
-
-
-  private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) {
-    org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics();
-
-    List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>();
-    for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
-      timelineMetricList.add(fromTimelineMetric(timelineMetric));
-    }
-    otherMetrics.setMetrics(timelineMetricList);
-    return otherMetrics;
-  }
-
-  private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) {
-
-    org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric();
-    otherMetric.setMetricValues(timelineMetric.getMetricValues());
-    otherMetric.setStartTime(timelineMetric.getStartTime());
-    otherMetric.setHostName(timelineMetric.getHostName());
-    otherMetric.setInstanceId(timelineMetric.getInstanceId());
-    otherMetric.setAppId(timelineMetric.getAppId());
-    otherMetric.setMetricName(timelineMetric.getMetricName());
-
-    return otherMetric;
-  }
-
-  @Override
-  public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
-      throws SQLException, IOException {
-    hBaseAccessor.insertContainerMetrics(metrics);
-    return new TimelinePutResponse();
-  }
-
-  @Override
-  public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException {
-    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata =
-      metricMetadataManager.getMetadataCache();
-
-    boolean includeBlacklistedMetrics = StringUtils.isNotEmpty(query) && "all".equalsIgnoreCase(query);
-
-    // Group Metadata by AppId
-    Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>();
-    for (TimelineMetricMetadata metricMetadata : metadata.values()) {
-
-      if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) {
-        continue;
-      }
-      List<TimelineMetricMetadata> metadataList = metadataByAppId.get(metricMetadata.getAppId());
-      if (metadataList == null) {
-        metadataList = new ArrayList<>();
-        metadataByAppId.put(metricMetadata.getAppId(), metadataList);
-      }
-
-      metadataList.add(metricMetadata);
-    }
-
-    return metadataByAppId;
-  }
-
-  @Override
-  public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
-    return metricMetadataManager.getHostedAppsCache();
-  }
-
-  @Override
-  public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
-    Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
-    for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
-      aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
-    }
-    hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
-
-
-    return new TimelinePutResponse();
-  }
-
-  @Override
-  public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
-          throws SQLException, IOException {
-
-    Map<String, Set<String>> hostedApps = metricMetadataManager.getHostedAppsCache();
-    Map<String, Set<String>> instanceHosts = metricMetadataManager.getHostedInstanceCache();
-    Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>();
-
-    if (MapUtils.isEmpty(instanceHosts)) {
-      Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
-      for (String host : hostedApps.keySet()) {
-        for (String app : hostedApps.get(host)) {
-          if (!appHostMap.containsKey(app)) {
-            appHostMap.put(app, new HashSet<String>());
-          }
-          appHostMap.get(app).add(host);
-        }
-      }
-      instanceAppHosts.put("", appHostMap);
-    } else {
-      for (String instance : instanceHosts.keySet()) {
-
-        if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) {
-          continue;
-        }
-        Map<String, Set<String>> appHostMap = new  HashMap<String, Set<String>>();
-        instanceAppHosts.put(instance, appHostMap);
-
-        Set<String> hostsWithInstance = instanceHosts.get(instance);
-        for (String host : hostsWithInstance) {
-          for (String app : hostedApps.get(host)) {
-            if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) {
-              continue;
-            }
-
-            if (!appHostMap.containsKey(app)) {
-              appHostMap.put(app, new HashSet<String>());
-            }
-            appHostMap.get(app).add(host);
-          }
-        }
-      }
-    }
-
-    return instanceAppHosts;
-  }
-
-  @Override
-  public List<String> getLiveInstances() {
-
-    List<String> instances = null;
-    try {
-        if (haController == null) {
-          // Always return current host as live (embedded operation mode)
-          return Collections.singletonList(configuration.getInstanceHostnameFromEnv());
-        }
-        instances = haController.getLiveInstanceHostNames();
-        if (instances == null || instances.isEmpty()) {
-          // fallback
-          instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv());
-        }
-      } catch (UnknownHostException e) {
-        LOG.debug("Exception on getting hostname from env.", e);
-      }
-    return instances;
-  }
-
-  private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) {
-    if (!aggregator.isDisabled()) {
-      ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName()));
-          }
-        }
-      );
-      scheduledExecutors.put(aggregator.getName(), executorService);
-      executorService.scheduleAtFixedRate(aggregator,
-        0l,
-        aggregator.getSleepIntervalMillis(),
-        TimeUnit.MILLISECONDS);
-      LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
-        + aggregator.getSleepIntervalMillis() + " milliseconds.");
-    } else {
-      LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
new file mode 100644
index 0000000..9ebc64c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
+
+public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore {
+
+  static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class);
+  private final TimelineMetricConfiguration configuration;
+  private PhoenixHBaseAccessor hBaseAccessor;
+  private static volatile boolean isInitialized = false;
+  private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
+  private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>();
+  private TimelineMetricMetadataManager metricMetadataManager;
+  private Integer defaultTopNHostsLimit;
+  private MetricCollectorHAController haController;
+  private AmsKafkaProducer kafkaProducer = new AmsKafkaProducer("104.196.85.21:6667");
+
+  /**
+   * Construct the service.
+   *
+   */
+  public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) {
+    super(HBaseTimelineMetricsService.class.getName());
+    this.configuration = configuration;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    initializeSubsystem();
+  }
+
+  private synchronized void initializeSubsystem() {
+    if (!isInitialized) {
+      hBaseAccessor = new PhoenixHBaseAccessor(null);
+      // Initialize schema
+      hBaseAccessor.initMetricSchema();
+      // Initialize metadata from store
+      try {
+        metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
+      } catch (MalformedURLException | URISyntaxException e) {
+        throw new ExceptionInInitializerError("Unable to initialize metadata manager");
+      }
+      metricMetadataManager.initializeMetadata();
+      // Initialize policies before TTL update
+      hBaseAccessor.initPoliciesAndTTL();
+      // Start HA service
+      // Start the controller
+      if (!configuration.isDistributedCollectorModeDisabled()) {
+        haController = new MetricCollectorHAController(configuration);
+        try {
+          haController.initializeHAController();
+        } catch (Exception e) {
+          LOG.error(e);
+          throw new MetricsSystemInitializationException("Unable to " +
+            "initialize HA controller", e);
+        }
+      } else {
+        LOG.info("Distributed collector mode disabled");
+      }
+
+      //Initialize whitelisting & blacklisting if needed
+      TimelineMetricsFilter.initializeMetricFilter(configuration);
+
+      Configuration metricsConf = null;
+      try {
+        metricsConf = configuration.getMetricsConf();
+      } catch (Exception e) {
+        throw new ExceptionInInitializerError("Cannot initialize configuration.");
+      }
+
+      defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
+      if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
+        LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
+      }
+
+      // Start the cluster aggregator second
+      TimelineMetricAggregator secondClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
+          hBaseAccessor, metricsConf, metricMetadataManager, haController);
+      scheduleAggregatorThread(secondClusterAggregator);
+
+      // Start the minute cluster aggregator
+      TimelineMetricAggregator minuteClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
+          hBaseAccessor, metricsConf, haController);
+      scheduleAggregatorThread(minuteClusterAggregator);
+
+      // Start the hourly cluster aggregator
+      TimelineMetricAggregator hourlyClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
+          hBaseAccessor, metricsConf, haController);
+      scheduleAggregatorThread(hourlyClusterAggregator);
+
+      // Start the daily cluster aggregator
+      TimelineMetricAggregator dailyClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
+          hBaseAccessor, metricsConf, haController);
+      scheduleAggregatorThread(dailyClusterAggregator);
+
+      // Start the minute host aggregator
+      if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) {
+        LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector");
+      } else {
+        TimelineMetricAggregator minuteHostAggregator =
+          TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+            hBaseAccessor, metricsConf, haController);
+        scheduleAggregatorThread(minuteHostAggregator);
+      }
+
+      // Start the hourly host aggregator
+      TimelineMetricAggregator hourlyHostAggregator =
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
+          hBaseAccessor, metricsConf, haController);
+      scheduleAggregatorThread(hourlyHostAggregator);
+
+      // Start the daily host aggregator
+      TimelineMetricAggregator dailyHostAggregator =
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
+          hBaseAccessor, metricsConf, haController);
+      scheduleAggregatorThread(dailyHostAggregator);
+
+      if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
+        int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
+        int delay = configuration.getTimelineMetricsServiceWatcherDelay();
+        // Start the watchdog
+        watchdogExecutorService.scheduleWithFixedDelay(
+          new TimelineMetricStoreWatcher(this, configuration),
+          initDelay, delay, TimeUnit.SECONDS);
+        LOG.info("Started watchdog for timeline metrics store with initial " +
+          "delay = " + initDelay + ", delay = " + delay);
+      }
+
+      isInitialized = true;
+    }
+
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  @Override
+  public TimelineMetrics getTimelineMetrics(List<String> metricNames,
+      List<String> hostnames, String applicationId, String instanceId,
+      Long startTime, Long endTime, Precision precision, Integer limit,
+      boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException {
+
+    if (metricNames == null || metricNames.isEmpty()) {
+      throw new IllegalArgumentException("No metric name filter specified.");
+    }
+    if ((startTime == null && endTime != null)
+        || (startTime != null && endTime == null)) {
+      throw new IllegalArgumentException("Open ended query not supported ");
+    }
+    if (limit != null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT){
+      throw new IllegalArgumentException("Limit too big");
+    }
+
+    TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null;
+    if (!StringUtils.isEmpty(seriesAggregateFunction)) {
+      SeriesAggregateFunction func = SeriesAggregateFunction.getFunction(seriesAggregateFunction);
+      seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func);
+    }
+
+    Multimap<String, List<Function>> metricFunctions =
+      parseMetricNamesToAggregationFunctions(metricNames);
+
+    ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet()))
+      .hostnames(hostnames)
+      .appId(applicationId)
+      .instanceId(instanceId)
+      .startTime(startTime)
+      .endTime(endTime)
+      .precision(precision)
+      .limit(limit)
+      .grouped(groupedByHosts);
+
+    if (topNConfig != null) {
+      if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true.
+        TopNCondition.isTopNMetricCondition(metricNames, hostnames)) {
+        conditionBuilder.topN(topNConfig.getTopN());
+        conditionBuilder.isBottomN(topNConfig.getIsBottomN());
+        Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction());
+        Function function = new Function(readFunction, null);
+        conditionBuilder.topNFunction(function);
+      } else {
+        LOG.info("Invalid Input for TopN query. Ignoring TopN Request.");
+      }
+    } else if (startTime != null && hostnames != null && hostnames.size() > defaultTopNHostsLimit) {
+      // if (timeseries query AND hostnames passed AND size(hostnames) > limit)
+      LOG.info("Requesting data for more than " + defaultTopNHostsLimit +  " Hosts. " +
+        "Defaulting to Top " + defaultTopNHostsLimit);
+      conditionBuilder.topN(defaultTopNHostsLimit);
+      conditionBuilder.isBottomN(false);
+    }
+
+    Condition condition = conditionBuilder.build();
+
+    TimelineMetrics metrics;
+
+    if (hostnames == null || hostnames.isEmpty()) {
+      metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions);
+    } else {
+      metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions);
+    }
+
+    metrics = postProcessMetrics(metrics);
+
+    if (metrics.getMetrics().size() == 0) {
+      return metrics;
+    }
+
+    return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics);
+  }
+
+  private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) {
+    List<TimelineMetric> metricsList = metrics.getMetrics();
+
+    for (TimelineMetric metric : metricsList){
+      String name = metric.getMetricName();
+      if (name.contains("._rate")){
+        updateValuesAsRate(metric.getMetricValues(), false);
+      } else if (name.contains("._diff")) {
+        updateValuesAsRate(metric.getMetricValues(), true);
+      }
+    }
+
+    return metrics;
+  }
+
+  private TimelineMetrics seriesAggregateMetrics(TimelineMetricsSeriesAggregateFunction seriesAggrFuncInstance,
+      TimelineMetrics metrics) {
+    if (seriesAggrFuncInstance != null) {
+      TimelineMetric appliedMetric = seriesAggrFuncInstance.apply(metrics);
+      metrics.setMetrics(Collections.singletonList(appliedMetric));
+    }
+    return metrics;
+  }
+
+  static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues, boolean isDiff) {
+    Long prevTime = null;
+    Double prevVal = null;
+    long step;
+    Double diff;
+
+    for(Iterator<Map.Entry<Long, Double>> it = metricValues.entrySet().iterator(); it.hasNext(); ) {
+      Map.Entry<Long, Double> timeValueEntry = it.next();
+      Long currTime = timeValueEntry.getKey();
+      Double currVal = timeValueEntry.getValue();
+
+      if (prevTime != null) {
+        step = currTime - prevTime;
+        diff = currVal - prevVal;
+        Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
+        timeValueEntry.setValue(rate);
+      } else {
+        it.remove();
+      }
+
+      prevTime = currTime;
+      prevVal = currVal;
+    }
+
+    return metricValues;
+  }
+
+  static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) {
+    Multimap<String, List<Function>> metricsFunctions = ArrayListMultimap.create();
+
+    for (String metricName : metricNames){
+      Function function = Function.DEFAULT_VALUE_FUNCTION;
+      String cleanMetricName = metricName;
+
+      try {
+        function = Function.fromMetricName(metricName);
+        int functionStartIndex = metricName.indexOf("._");
+        if (functionStartIndex > 0) {
+          cleanMetricName = metricName.substring(0, functionStartIndex);
+        }
+      } catch (Function.FunctionFormatException ffe){
+        // unknown function so
+        // fallback to VALUE, and fullMetricName
+      }
+
+      List<Function>  functionsList = new ArrayList<>();
+      functionsList.add(function);
+      metricsFunctions.put(cleanMetricName, functionsList);
+    }
+
+    return metricsFunctions;
+  }
+
+  @Override
+  public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException {
+    // Error indicated by the Sql exception
+    TimelinePutResponse response = new TimelinePutResponse();
+
+    try {
+      if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) {
+        kafkaProducer.sendMetrics(fromTimelineMetrics(metrics));
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error(e);
+    }
+    hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
+
+    return response;
+  }
+
+
+  private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) {
+    org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics();
+
+    List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>();
+    for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+      timelineMetricList.add(fromTimelineMetric(timelineMetric));
+    }
+    otherMetrics.setMetrics(timelineMetricList);
+    return otherMetrics;
+  }
+
+  private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) {
+
+    org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric();
+    otherMetric.setMetricValues(timelineMetric.getMetricValues());
+    otherMetric.setStartTime(timelineMetric.getStartTime());
+    otherMetric.setHostName(timelineMetric.getHostName());
+    otherMetric.setInstanceId(timelineMetric.getInstanceId());
+    otherMetric.setAppId(timelineMetric.getAppId());
+    otherMetric.setMetricName(timelineMetric.getMetricName());
+
+    return otherMetric;
+  }
+
+  @Override
+  public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
+      throws SQLException, IOException {
+    hBaseAccessor.insertContainerMetrics(metrics);
+    return new TimelinePutResponse();
+  }
+
+  @Override
+  public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException {
+    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata =
+      metricMetadataManager.getMetadataCache();
+
+    boolean includeBlacklistedMetrics = StringUtils.isNotEmpty(query) && "all".equalsIgnoreCase(query);
+
+    // Group Metadata by AppId
+    Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>();
+    for (TimelineMetricMetadata metricMetadata : metadata.values()) {
+
+      if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) {
+        continue;
+      }
+      List<TimelineMetricMetadata> metadataList = metadataByAppId.get(metricMetadata.getAppId());
+      if (metadataList == null) {
+        metadataList = new ArrayList<>();
+        metadataByAppId.put(metricMetadata.getAppId(), metadataList);
+      }
+
+      metadataList.add(metricMetadata);
+    }
+
+    return metadataByAppId;
+  }
+
+  @Override
+  public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
+    return metricMetadataManager.getHostedAppsCache();
+  }
+
+  @Override
+  public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+    Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+    for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
+      aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
+    }
+    hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+
+    return new TimelinePutResponse();
+  }
+
+  @Override
+  public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
+          throws SQLException, IOException {
+
+    Map<String, Set<String>> hostedApps = metricMetadataManager.getHostedAppsCache();
+    Map<String, Set<String>> instanceHosts = metricMetadataManager.getHostedInstanceCache();
+    Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>();
+
+    if (MapUtils.isEmpty(instanceHosts)) {
+      Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
+      for (String host : hostedApps.keySet()) {
+        for (String app : hostedApps.get(host)) {
+          if (!appHostMap.containsKey(app)) {
+            appHostMap.put(app, new HashSet<String>());
+          }
+          appHostMap.get(app).add(host);
+        }
+      }
+      instanceAppHosts.put("", appHostMap);
+    } else {
+      for (String instance : instanceHosts.keySet()) {
+
+        if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) {
+          continue;
+        }
+        Map<String, Set<String>> appHostMap = new  HashMap<String, Set<String>>();
+        instanceAppHosts.put(instance, appHostMap);
+
+        Set<String> hostsWithInstance = instanceHosts.get(instance);
+        for (String host : hostsWithInstance) {
+          for (String app : hostedApps.get(host)) {
+            if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) {
+              continue;
+            }
+
+            if (!appHostMap.containsKey(app)) {
+              appHostMap.put(app, new HashSet<String>());
+            }
+            appHostMap.get(app).add(host);
+          }
+        }
+      }
+    }
+
+    return instanceAppHosts;
+  }
+
+  @Override
+  public List<String> getLiveInstances() {
+
+    List<String> instances = null;
+    try {
+        if (haController == null) {
+          // Always return current host as live (embedded operation mode)
+          return Collections.singletonList(configuration.getInstanceHostnameFromEnv());
+        }
+        instances = haController.getLiveInstanceHostNames();
+        if (instances == null || instances.isEmpty()) {
+          // fallback
+          instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv());
+        }
+      } catch (UnknownHostException e) {
+        LOG.debug("Exception on getting hostname from env.", e);
+      }
+    return instances;
+  }
+
+  private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) {
+    if (!aggregator.isDisabled()) {
+      ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactory() {
+          @Override
+          public Thread newThread(Runnable r) {
+            return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName()));
+          }
+        }
+      );
+      scheduledExecutors.put(aggregator.getName(), executorService);
+      executorService.scheduleAtFixedRate(aggregator,
+        0l,
+        aggregator.getSleepIntervalMillis(),
+        TimeUnit.MILLISECONDS);
+      LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
+        + aggregator.getSleepIntervalMillis() + " milliseconds.");
+    } else {
+      LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
deleted file mode 100644
index f035678..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import com.google.common.collect.Multimap;
-import junit.framework.Assert;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.SUM;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class HBaseTimelineMetricsServiceTest {
-
-  public static final String MEM_METRIC = "mem";
-  public static final String BYTES_IN_METRIC = "bytes_in";
-  public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not._afunction";
-
-  @Test
-  public void testParseMetricNamesToAggregationFunctions() throws Exception {
-    //giwen
-    List<String> metricNames = Arrays.asList(
-      MEM_METRIC + "._avg",
-      MEM_METRIC + "._sum",
-      MEM_METRIC + "._rate._avg",
-      BYTES_IN_METRIC,
-      BYTES_NOT_AFUNCTION_METRIC);
-
-    //when
-    Multimap<String, List<Function>> multimap =
-      HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames);
-
-    //then
-    Assert.assertEquals(multimap.keySet().size(), 3);
-    Assert.assertTrue(multimap.containsKey(MEM_METRIC));
-    Assert.assertTrue(multimap.containsKey(BYTES_IN_METRIC));
-    Assert.assertTrue(multimap.containsKey(BYTES_NOT_AFUNCTION_METRIC));
-
-    List<List<Function>> metricEntry = (List<List<Function>>) multimap.get(MEM_METRIC);
-    HashMap<String, List<Function>> mfm = new HashMap<String, List<Function>>();
-    mfm.put(MEM_METRIC, metricEntry.get(0));
-
-    assertThat(mfm.get(MEM_METRIC)).containsOnly(
-      new Function(AVG, null));
-
-    mfm = new HashMap<String, List<Function>>();
-    mfm.put(MEM_METRIC, metricEntry.get(1));
-    assertThat(mfm.get(MEM_METRIC)).containsOnly(
-      new Function(SUM, null));
-
-    mfm = new HashMap<String, List<Function>>();
-    mfm.put(MEM_METRIC, metricEntry.get(2));
-    assertThat(mfm.get(MEM_METRIC)).containsOnly(
-      new Function(AVG, RATE));
-
-    metricEntry = (List<List<Function>>) multimap.get(BYTES_IN_METRIC);
-    mfm = new HashMap<String, List<Function>>();
-    mfm.put(BYTES_IN_METRIC, metricEntry.get(0));
-
-    assertThat(mfm.get(BYTES_IN_METRIC))
-      .contains(Function.DEFAULT_VALUE_FUNCTION);
-
-    metricEntry = (List<List<Function>>) multimap.get(BYTES_NOT_AFUNCTION_METRIC);
-    mfm = new HashMap<String, List<Function>>();
-    mfm.put(BYTES_NOT_AFUNCTION_METRIC, metricEntry.get(0));
-
-    assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC))
-      .contains(Function.DEFAULT_VALUE_FUNCTION);
-
-  }
-
-  @Test
-  public void testRateCalculationOnMetricsWithEqualValues() throws Exception {
-    Map<Long, Double> metricValues = new TreeMap<>();
-    metricValues.put(1454016368371L, 1011.25);
-    metricValues.put(1454016428371L, 1011.25);
-    metricValues.put(1454016488371L, 1011.25);
-    metricValues.put(1454016548371L, 1011.25);
-    metricValues.put(1454016608371L, 1011.25);
-    metricValues.put(1454016668371L, 1011.25);
-    metricValues.put(1454016728371L, 1011.25);
-
-    // Calculate rate
-    Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false);
-
-    // Make sure rate is zero
-    for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) {
-      Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey()
-          + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue());
-    }
-  }
-
-  @Test
-  public void testDiffCalculation() throws Exception {
-    Map<Long, Double> metricValues = new TreeMap<>();
-    metricValues.put(1454016368371L, 1011.25);
-    metricValues.put(1454016428371L, 1010.25);
-    metricValues.put(1454016488371L, 1012.25);
-    metricValues.put(1454016548371L, 1010.25);
-    metricValues.put(1454016608371L, 1010.25);
-
-    Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true);
-
-    Assert.assertTrue(rates.size()==4);
-    Assert.assertTrue(rates.containsValue(-1.0));
-    Assert.assertTrue(rates.containsValue(2.0));
-    Assert.assertTrue(rates.containsValue(0.0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
new file mode 100644
index 0000000..f035678
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import com.google.common.collect.Multimap;
+import junit.framework.Assert;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.SUM;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class HBaseTimelineMetricsServiceTest {
+
+  public static final String MEM_METRIC = "mem";
+  public static final String BYTES_IN_METRIC = "bytes_in";
+  public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not._afunction";
+
+  @Test
+  public void testParseMetricNamesToAggregationFunctions() throws Exception {
+    //giwen
+    List<String> metricNames = Arrays.asList(
+      MEM_METRIC + "._avg",
+      MEM_METRIC + "._sum",
+      MEM_METRIC + "._rate._avg",
+      BYTES_IN_METRIC,
+      BYTES_NOT_AFUNCTION_METRIC);
+
+    //when
+    Multimap<String, List<Function>> multimap =
+      HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames);
+
+    //then
+    Assert.assertEquals(multimap.keySet().size(), 3);
+    Assert.assertTrue(multimap.containsKey(MEM_METRIC));
+    Assert.assertTrue(multimap.containsKey(BYTES_IN_METRIC));
+    Assert.assertTrue(multimap.containsKey(BYTES_NOT_AFUNCTION_METRIC));
+
+    List<List<Function>> metricEntry = (List<List<Function>>) multimap.get(MEM_METRIC);
+    HashMap<String, List<Function>> mfm = new HashMap<String, List<Function>>();
+    mfm.put(MEM_METRIC, metricEntry.get(0));
+
+    assertThat(mfm.get(MEM_METRIC)).containsOnly(
+      new Function(AVG, null));
+
+    mfm = new HashMap<String, List<Function>>();
+    mfm.put(MEM_METRIC, metricEntry.get(1));
+    assertThat(mfm.get(MEM_METRIC)).containsOnly(
+      new Function(SUM, null));
+
+    mfm = new HashMap<String, List<Function>>();
+    mfm.put(MEM_METRIC, metricEntry.get(2));
+    assertThat(mfm.get(MEM_METRIC)).containsOnly(
+      new Function(AVG, RATE));
+
+    metricEntry = (List<List<Function>>) multimap.get(BYTES_IN_METRIC);
+    mfm = new HashMap<String, List<Function>>();
+    mfm.put(BYTES_IN_METRIC, metricEntry.get(0));
+
+    assertThat(mfm.get(BYTES_IN_METRIC))
+      .contains(Function.DEFAULT_VALUE_FUNCTION);
+
+    metricEntry = (List<List<Function>>) multimap.get(BYTES_NOT_AFUNCTION_METRIC);
+    mfm = new HashMap<String, List<Function>>();
+    mfm.put(BYTES_NOT_AFUNCTION_METRIC, metricEntry.get(0));
+
+    assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC))
+      .contains(Function.DEFAULT_VALUE_FUNCTION);
+
+  }
+
+  @Test
+  public void testRateCalculationOnMetricsWithEqualValues() throws Exception {
+    Map<Long, Double> metricValues = new TreeMap<>();
+    metricValues.put(1454016368371L, 1011.25);
+    metricValues.put(1454016428371L, 1011.25);
+    metricValues.put(1454016488371L, 1011.25);
+    metricValues.put(1454016548371L, 1011.25);
+    metricValues.put(1454016608371L, 1011.25);
+    metricValues.put(1454016668371L, 1011.25);
+    metricValues.put(1454016728371L, 1011.25);
+
+    // Calculate rate
+    Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false);
+
+    // Make sure rate is zero
+    for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) {
+      Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey()
+          + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue());
+    }
+  }
+
+  @Test
+  public void testDiffCalculation() throws Exception {
+    Map<Long, Double> metricValues = new TreeMap<>();
+    metricValues.put(1454016368371L, 1011.25);
+    metricValues.put(1454016428371L, 1010.25);
+    metricValues.put(1454016488371L, 1012.25);
+    metricValues.put(1454016548371L, 1010.25);
+    metricValues.put(1454016608371L, 1010.25);
+
+    Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true);
+
+    Assert.assertTrue(rates.size()==4);
+    Assert.assertTrue(rates.containsValue(-1.0));
+    Assert.assertTrue(rates.containsValue(2.0));
+    Assert.assertTrue(rates.containsValue(0.0));
+  }
+}