You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2017/09/26 22:26:47 UTC

[48/50] [abbrv] ambari git commit: Merge branch 'trunk' into branch-3.0-ams

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index a4b0bcc,0000000..bb26439
mode 100644,000000..100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@@ -1,577 -1,0 +1,590 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 +
- import com.google.common.collect.ArrayListMultimap;
- import com.google.common.collect.Multimap;
++import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
++import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
++import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
++
++import java.io.IOException;
++import java.net.MalformedURLException;
++import java.net.URISyntaxException;
++import java.net.UnknownHostException;
++import java.sql.SQLException;
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Iterator;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.Executors;
++import java.util.concurrent.ScheduledExecutorService;
++import java.util.concurrent.ThreadFactory;
++import java.util.concurrent.TimeUnit;
++
 +import org.apache.commons.collections.MapUtils;
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 +import org.apache.hadoop.metrics2.sink.timeline.Precision;
 +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
 +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 +import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 +import org.apache.hadoop.service.AbstractService;
 +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
++import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
++import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
++import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
- import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
- import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
- import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory;
 +
- import java.io.IOException;
- import java.net.MalformedURLException;
- import java.net.URISyntaxException;
- import java.net.UnknownHostException;
- import java.sql.SQLException;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.TimeUnit;
- 
- import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
- import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
++import com.google.common.collect.ArrayListMultimap;
++import com.google.common.collect.Multimap;
 +
 +public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore {
 +
 +  static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class);
 +  private final TimelineMetricConfiguration configuration;
 +  private TimelineMetricDistributedCache cache;
 +  private PhoenixHBaseAccessor hBaseAccessor;
 +  private static volatile boolean isInitialized = false;
 +  private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
 +  private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>();
 +  private final ConcurrentHashMap<String, Long> postedAggregatedMap = new ConcurrentHashMap<>();
 +  private TimelineMetricMetadataManager metricMetadataManager;
 +  private Integer defaultTopNHostsLimit;
 +  private MetricCollectorHAController haController;
- //  private MetricKafkaProducer metricKafkaProducer;
++  private boolean containerMetricsDisabled = false;
 +
 +  /**
 +   * Construct the service.
 +   *
 +   */
 +  public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) {
 +    super(HBaseTimelineMetricsService.class.getName());
 +    this.configuration = configuration;
 +  }
 +
 +  @Override
 +  protected void serviceInit(Configuration conf) throws Exception {
 +    super.serviceInit(conf);
 +    initializeSubsystem();
 +  }
 +
 +  private TimelineMetricDistributedCache startCacheNode() throws MalformedURLException, URISyntaxException {
 +    //TODO make configurable
 +    return new TimelineMetricsIgniteCache();
 +  }
 +
 +
 +  private synchronized void initializeSubsystem() {
 +    if (!isInitialized) {
 +      hBaseAccessor = new PhoenixHBaseAccessor(null);
 +      // Initialize schema
 +      hBaseAccessor.initMetricSchema();
 +      // Initialize metadata from store
 +      try {
 +        metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
 +      } catch (MalformedURLException | URISyntaxException e) {
 +        throw new ExceptionInInitializerError("Unable to initialize metadata manager");
 +      }
 +      metricMetadataManager.initializeMetadata();
 +      // Initialize policies before TTL update
 +      hBaseAccessor.initPoliciesAndTTL();
 +      // Start HA service
 +      // Start the controller
 +      if (!configuration.isDistributedCollectorModeDisabled()) {
 +        haController = new MetricCollectorHAController(configuration);
 +        try {
 +          haController.initializeHAController();
 +        } catch (Exception e) {
 +          LOG.error(e);
 +          throw new MetricsSystemInitializationException("Unable to " +
 +            "initialize HA controller", e);
 +        }
 +      } else {
 +        LOG.info("Distributed collector mode disabled");
 +      }
 +
 +      //Initialize whitelisting & blacklisting if needed
 +      TimelineMetricsFilter.initializeMetricFilter(configuration);
 +
 +      Configuration metricsConf = null;
 +      try {
 +        metricsConf = configuration.getMetricsConf();
 +      } catch (Exception e) {
 +        throw new ExceptionInInitializerError("Cannot initialize configuration.");
 +      }
 +
 +      if (configuration.isCollectorInMemoryAggregationEnabled()) {
 +        try {
 +          cache = startCacheNode();
 +        } catch (Exception e) {
 +          throw new MetricsSystemInitializationException("Unable to " +
 +              "start cache node", e);
 +        }
 +      }
 +//      String kafkaServers = configuration.getKafkaServers();
 +//      if (kafkaServers != null) {
 +//        metricKafkaProducer = new MetricKafkaProducer(kafkaServers);
 +//      }
 +
 +      defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
 +      if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
 +        LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
 +      }
 +
 +      // Start the cluster aggregator second
 +      TimelineMetricAggregator secondClusterAggregator =
 +        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
 +          hBaseAccessor, metricsConf, metricMetadataManager, haController, cache);
 +      scheduleAggregatorThread(secondClusterAggregator);
 +
 +      // Start the minute cluster aggregator
 +      TimelineMetricAggregator minuteClusterAggregator =
 +        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
 +          hBaseAccessor, metricsConf, metricMetadataManager, haController);
 +      scheduleAggregatorThread(minuteClusterAggregator);
 +
 +      // Start the hourly cluster aggregator
 +      TimelineMetricAggregator hourlyClusterAggregator =
 +        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
 +          hBaseAccessor, metricsConf, metricMetadataManager, haController);
 +      scheduleAggregatorThread(hourlyClusterAggregator);
 +
 +      // Start the daily cluster aggregator
 +      TimelineMetricAggregator dailyClusterAggregator =
 +        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
 +          hBaseAccessor, metricsConf, metricMetadataManager, haController);
 +      scheduleAggregatorThread(dailyClusterAggregator);
 +
 +      // Start the minute host aggregator
 +      if (configuration.isHostInMemoryAggregationEnabled()) {
 +        LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, switching to filtering host minute aggregation on collector");
 +        TimelineMetricAggregator minuteHostAggregator =
 +          TimelineMetricAggregatorFactory.createFilteringTimelineMetricAggregatorMinute(
 +            hBaseAccessor, metricsConf, metricMetadataManager, haController, postedAggregatedMap);
 +        scheduleAggregatorThread(minuteHostAggregator);
 +      } else {
 +        TimelineMetricAggregator minuteHostAggregator =
 +          TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
 +            hBaseAccessor, metricsConf, metricMetadataManager, haController);
 +        scheduleAggregatorThread(minuteHostAggregator);
 +      }
 +
 +      // Start the hourly host aggregator
 +      TimelineMetricAggregator hourlyHostAggregator =
 +        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
 +          hBaseAccessor, metricsConf, metricMetadataManager, haController);
 +      scheduleAggregatorThread(hourlyHostAggregator);
 +
 +      // Start the daily host aggregator
 +      TimelineMetricAggregator dailyHostAggregator =
 +        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
 +          hBaseAccessor, metricsConf, metricMetadataManager, haController);
 +      scheduleAggregatorThread(dailyHostAggregator);
 +
 +      if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
 +        int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
 +        int delay = configuration.getTimelineMetricsServiceWatcherDelay();
 +        // Start the watchdog
 +        watchdogExecutorService.scheduleWithFixedDelay(
 +          new TimelineMetricStoreWatcher(this, configuration),
 +          initDelay, delay, TimeUnit.SECONDS);
 +        LOG.info("Started watchdog for timeline metrics store with initial " +
 +          "delay = " + initDelay + ", delay = " + delay);
 +      }
- 
++      containerMetricsDisabled = configuration.isContainerMetricsDisabled();
 +      isInitialized = true;
 +    }
 +
 +  }
 +
 +  @Override
 +  protected void serviceStop() throws Exception {
 +    super.serviceStop();
 +  }
 +
 +  @Override
 +  public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException {
 +    return hBaseAccessor.getAnomalyMetricRecords(method, startTime, endTime, limit);
 +  }
 +
 +  @Override
 +  public TimelineMetrics getTimelineMetrics(List<String> metricNames,
 +      List<String> hostnames, String applicationId, String instanceId,
 +      Long startTime, Long endTime, Precision precision, Integer limit,
 +      boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException {
 +
 +    if (metricNames == null || metricNames.isEmpty()) {
 +      throw new IllegalArgumentException("No metric name filter specified.");
 +    }
 +    if ((startTime == null && endTime != null)
 +        || (startTime != null && endTime == null)) {
 +      throw new IllegalArgumentException("Open ended query not supported ");
 +    }
 +    if (limit != null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT){
 +      throw new IllegalArgumentException("Limit too big");
 +    }
 +
 +    TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null;
 +    if (!StringUtils.isEmpty(seriesAggregateFunction)) {
 +      SeriesAggregateFunction func = SeriesAggregateFunction.getFunction(seriesAggregateFunction);
 +      seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func);
 +    }
 +
 +    Multimap<String, List<Function>> metricFunctions =
 +      parseMetricNamesToAggregationFunctions(metricNames);
 +
 +    List<byte[]> uuids = metricMetadataManager.getUuids(metricFunctions.keySet(), hostnames, applicationId, instanceId);
 +
 +    ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet()))
 +      .hostnames(hostnames)
 +      .appId(applicationId)
 +      .instanceId(instanceId)
 +      .startTime(startTime)
 +      .endTime(endTime)
 +      .precision(precision)
 +      .limit(limit)
 +      .grouped(groupedByHosts)
 +      .uuid(uuids);
 +
 +    if (topNConfig != null) {
 +      if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true.
 +        TopNCondition.isTopNMetricCondition(metricNames, hostnames)) {
 +        conditionBuilder.topN(topNConfig.getTopN());
 +        conditionBuilder.isBottomN(topNConfig.getIsBottomN());
 +        Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction());
 +        Function function = new Function(readFunction, null);
 +        conditionBuilder.topNFunction(function);
 +      } else {
 +        LOG.info("Invalid Input for TopN query. Ignoring TopN Request.");
 +      }
 +    } else if (startTime != null && hostnames != null && hostnames.size() > defaultTopNHostsLimit) {
 +      // if (timeseries query AND hostnames passed AND size(hostnames) > limit)
 +      LOG.info("Requesting data for more than " + defaultTopNHostsLimit +  " Hosts. " +
 +        "Defaulting to Top " + defaultTopNHostsLimit);
 +      conditionBuilder.topN(defaultTopNHostsLimit);
 +      conditionBuilder.isBottomN(false);
 +    }
 +
 +    Condition condition = conditionBuilder.build();
 +
 +    TimelineMetrics metrics;
 +
 +    if (hostnames == null || hostnames.isEmpty()) {
 +      metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions);
 +    } else {
 +      metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions);
 +    }
 +
 +    metrics = postProcessMetrics(metrics);
 +
 +    if (metrics.getMetrics().size() == 0) {
 +      return metrics;
 +    }
 +
 +    return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics);
 +  }
 +
 +  private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) {
 +    List<TimelineMetric> metricsList = metrics.getMetrics();
 +
 +    for (TimelineMetric metric : metricsList){
 +      String name = metric.getMetricName();
 +      if (name.contains("._rate")){
 +        updateValuesAsRate(metric.getMetricValues(), false);
 +      } else if (name.contains("._diff")) {
 +        updateValuesAsRate(metric.getMetricValues(), true);
 +      }
 +    }
 +
 +    return metrics;
 +  }
 +
 +  private TimelineMetrics seriesAggregateMetrics(TimelineMetricsSeriesAggregateFunction seriesAggrFuncInstance,
 +      TimelineMetrics metrics) {
 +    if (seriesAggrFuncInstance != null) {
 +      TimelineMetric appliedMetric = seriesAggrFuncInstance.apply(metrics);
 +      metrics.setMetrics(Collections.singletonList(appliedMetric));
 +    }
 +    return metrics;
 +  }
 +
 +  static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues, boolean isDiff) {
 +    Long prevTime = null;
 +    Double prevVal = null;
 +    long step;
 +    Double diff;
 +
 +    for(Iterator<Map.Entry<Long, Double>> it = metricValues.entrySet().iterator(); it.hasNext(); ) {
 +      Map.Entry<Long, Double> timeValueEntry = it.next();
 +      Long currTime = timeValueEntry.getKey();
 +      Double currVal = timeValueEntry.getValue();
 +
 +      if (prevTime != null) {
 +        step = currTime - prevTime;
 +        diff = currVal - prevVal;
-         Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
-         timeValueEntry.setValue(rate);
++        if (diff < 0) {
++          it.remove(); //Discard calculating rate when the metric counter has been reset.
++        } else {
++          Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
++          timeValueEntry.setValue(rate);
++        }
 +      } else {
 +        it.remove();
 +      }
 +
 +      prevTime = currTime;
 +      prevVal = currVal;
 +    }
 +
 +    return metricValues;
 +  }
 +
 +  static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) {
 +    Multimap<String, List<Function>> metricsFunctions = ArrayListMultimap.create();
 +
 +    for (String metricName : metricNames){
 +      Function function = Function.DEFAULT_VALUE_FUNCTION;
 +      String cleanMetricName = metricName;
 +
 +      try {
 +        function = Function.fromMetricName(metricName);
 +        int functionStartIndex = metricName.indexOf("._");
 +        if (functionStartIndex > 0) {
 +          cleanMetricName = metricName.substring(0, functionStartIndex);
 +        }
 +      } catch (Function.FunctionFormatException ffe){
 +        // unknown function so
 +        // fallback to VALUE, and fullMetricName
 +      }
 +
 +      List<Function>  functionsList = new ArrayList<>();
 +      functionsList.add(function);
 +      metricsFunctions.put(cleanMetricName, functionsList);
 +    }
 +
 +    return metricsFunctions;
 +  }
 +
 +  @Override
 +  public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException {
 +    // Error indicated by the Sql exception
 +    TimelinePutResponse response = new TimelinePutResponse();
 +
 +    hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
 +
 +    if (configuration.isCollectorInMemoryAggregationEnabled()) {
 +      cache.putMetrics(metrics.getMetrics(), metricMetadataManager);
 +    }
 +
 +//    try {
 +//      metricKafkaProducer.sendMetrics(metrics);
 +////      if (metrics.getMetrics().size() != 0 && metrics.getMetrics().get(0).getAppId().equals("anomaly-engine-test-metric")) {
 +////      }
 +//    } catch (Exception e) {
 +//      LOG.error(e);
 +//    }
 +
 +    return response;
 +  }
 +
 +  @Override
 +  public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
 +      throws SQLException, IOException {
++
++    if (containerMetricsDisabled) {
++      LOG.debug("Ignoring submitted container metrics according to configuration. Values will not be stored.");
++      return new TimelinePutResponse();
++    }
++
 +    hBaseAccessor.insertContainerMetrics(metrics);
 +    return new TimelinePutResponse();
 +  }
 +
 +  @Override
 +  public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException {
 +    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata =
 +      metricMetadataManager.getMetadataCache();
 +
 +    boolean includeBlacklistedMetrics = StringUtils.isNotEmpty(query) && "all".equalsIgnoreCase(query);
 +
 +    // Group Metadata by AppId
 +    Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>();
 +    for (TimelineMetricMetadata metricMetadata : metadata.values()) {
 +
 +      if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) {
 +        continue;
 +      }
 +      List<TimelineMetricMetadata> metadataList = metadataByAppId.get(metricMetadata.getAppId());
 +      if (metadataList == null) {
 +        metadataList = new ArrayList<>();
 +        metadataByAppId.put(metricMetadata.getAppId(), metadataList);
 +      }
 +
 +      metadataList.add(metricMetadata);
 +    }
 +
 +    return metadataByAppId;
 +  }
 +
 +  @Override
 +  public Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException {
 +    return metricMetadataManager.getUuidKeyMap();
 +  }
 +
 +  @Override
 +  public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
 +    Map<String, TimelineMetricHostMetadata> hostsMetadata = metricMetadataManager.getHostedAppsCache();
 +    Map<String, Set<String>> hostAppMap = new HashMap<>();
 +    for (String hostname : hostsMetadata.keySet()) {
 +      hostAppMap.put(hostname, hostsMetadata.get(hostname).getHostedApps().keySet());
 +    }
 +    return hostAppMap;
 +  }
 +
 +  @Override
 +  public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
 +    Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
 +    String hostname = null;
 +    for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
 +      aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
 +      hostname = hostname == null ? entry.getTimelineMetric().getHostName() : hostname;
 +      break;
 +    }
 +    long timestamp = aggregationResult.getTimeInMilis();
 +    postedAggregatedMap.put(hostname, timestamp);
 +    if (LOG.isDebugEnabled()) {
 +      LOG.debug(String.format("Adding host %s to aggregated by in-memory aggregator. Timestamp : %s", hostname, timestamp));
 +    }
 +    hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
 +
 +
 +    return new TimelinePutResponse();
 +  }
 +
 +  @Override
-   public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
-           throws SQLException, IOException {
- 
++  public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException {
 +    Map<String, Set<String>> hostedApps = getHostAppsMetadata();
 +    Map<String, Set<String>> instanceHosts = metricMetadataManager.getHostedInstanceCache();
++    if (configuration.getTimelineMetricsMultipleClusterSupport()) {
++      instanceHosts = metricMetadataManager.getHostedInstanceCache();
++    }
 +    Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>();
 +
 +    if (MapUtils.isEmpty(instanceHosts)) {
 +      Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
 +      for (String host : hostedApps.keySet()) {
 +        for (String app : hostedApps.get(host)) {
 +          if (!appHostMap.containsKey(app)) {
 +            appHostMap.put(app, new HashSet<String>());
 +          }
 +          appHostMap.get(app).add(host);
 +        }
 +      }
 +      instanceAppHosts.put("", appHostMap);
 +    } else {
 +      for (String instance : instanceHosts.keySet()) {
 +
 +        if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) {
 +          continue;
 +        }
 +        Map<String, Set<String>> appHostMap = new  HashMap<String, Set<String>>();
 +        instanceAppHosts.put(instance, appHostMap);
 +
 +        Set<String> hostsWithInstance = instanceHosts.get(instance);
 +        for (String host : hostsWithInstance) {
 +          for (String app : hostedApps.get(host)) {
 +            if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) {
 +              continue;
 +            }
 +
 +            if (!appHostMap.containsKey(app)) {
 +              appHostMap.put(app, new HashSet<String>());
 +            }
 +            appHostMap.get(app).add(host);
 +          }
 +        }
 +      }
 +    }
 +
 +    return instanceAppHosts;
 +  }
 +
 +  @Override
 +  public List<String> getLiveInstances() {
 +
 +    List<String> instances = null;
 +    try {
 +        if (haController == null) {
 +          // Always return current host as live (embedded operation mode)
 +          return Collections.singletonList(configuration.getInstanceHostnameFromEnv());
 +        }
 +        instances = haController.getLiveInstanceHostNames();
 +        if (instances == null || instances.isEmpty()) {
 +          // fallback
 +          instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv());
 +        }
 +      } catch (UnknownHostException e) {
 +        LOG.debug("Exception on getting hostname from env.", e);
 +      }
 +    return instances;
 +  }
 +
 +  private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) {
 +    if (!aggregator.isDisabled()) {
 +      ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
 +        new ThreadFactory() {
 +          @Override
 +          public Thread newThread(Runnable r) {
 +            return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName()));
 +          }
 +        }
 +      );
 +      scheduledExecutors.put(aggregator.getName(), executorService);
 +      executorService.scheduleAtFixedRate(aggregator,
 +        0l,
 +        aggregator.getSleepIntervalMillis(),
 +        TimeUnit.MILLISECONDS);
 +      LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
 +        + aggregator.getSleepIntervalMillis() + " milliseconds.");
 +    } else {
 +      LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index c86137b,899928a..d0e385b
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@@ -313,9 -299,9 +322,12 @@@ public class TimelineMetricConfiguratio
    public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES =
      "timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles";
  
 +  public static final String TIMELINE_METRICS_UUID_GEN_STRATEGY =
 +    "timeline.metrics.uuid.gen.strategy";
 +
+   public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS =
+     "timeline.metrics.support.multiple.clusters";
+ 
    public static final String HOST_APP_ID = "HOST";
  
    public static final String DEFAULT_INSTANCE_PORT = "12001";
@@@ -567,90 -517,18 +586,107 @@@
      return whitelist;
    }
  
 +  public int getExternalSinkInterval(SOURCE_NAME sourceName) {
 +    return Integer.parseInt(metricsConf.get(String.format(EXTERNAL_SINK_INTERVAL, sourceName), "-1"));
 +  }
 +
 +  public InternalSourceProvider getInternalSourceProvider() {
 +    Class<? extends InternalSourceProvider> providerClass =
 +      metricsConf.getClass(TIMELINE_METRICS_SOURCE_PROVIDER_CLASS,
 +        DefaultInternalMetricsSourceProvider.class, InternalSourceProvider.class);
 +    return ReflectionUtils.newInstance(providerClass, metricsConf);
 +  }
 +
 +  public ExternalSinkProvider getExternalSinkProvider() {
 +    Class<?> providerClass = metricsConf.getClassByNameOrNull(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
 +    if (providerClass != null) {
 +      return (ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf);
 +    }
 +    return null;
 +  }
 +
 +  public String getInternalCacheHeapPercent(String instanceName) {
 +    String heapPercent = metricsConf.get(String.format(INTERNAL_CACHE_HEAP_PERCENT, instanceName));
 +    if (StringUtils.isEmpty(heapPercent)) {
 +      return "5%";
 +    } else {
 +      return heapPercent.endsWith("%") ? heapPercent : heapPercent + "%";
 +    }
 +  }
 +
 +  public String getDefaultMetricsSinkDir() {
 +    String dirPath = metricsConf.get(DEFAULT_EXTERNAL_SINK_DIR);
 +    if (dirPath == null) {
 +      // Only one logger at the time of writing
 +      Appender appender = (Appender) Logger.getRootLogger().getAllAppenders().nextElement();
 +      if (appender instanceof FileAppender) {
 +        File f = new File(((FileAppender) appender).getFile());
 +        if (f.exists()) {
 +          dirPath = f.getParent();
 +        } else {
 +          dirPath = "/tmp";
 +        }
 +      }
 +    }
 +
 +    return dirPath;
 +  }
 +
 +  public boolean isHostInMemoryAggregationEnabled() {
 +    if (metricsConf != null) {
 +      return Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "false"));
 +    } else {
 +      return false;
 +    }
 +  }
 +
+   public boolean isContainerMetricsDisabled() {
+     try {
+       return metricsConf != null && Boolean.parseBoolean(metricsConf.get(TIMELINE_SERVICE_DISABLE_CONTAINER_METRICS, "false"));
+     } catch (Exception e) {
++
++      return false;
++    }
++  }
++
 +  public boolean isCollectorInMemoryAggregationEnabled() {
 +    if (metricsConf != null) {
 +      return Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_COLLECTOR_INMEMORY_AGGREGATION, "false"));
 +    } else {
        return false;
      }
    }
  
 +  public List<String> getAppIdsForHostAggregation() {
 +    String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS);
 +    if (!StringUtils.isEmpty(appIds)) {
 +      return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
 +    }
 +    return Collections.emptyList();
 +  }
 +
-   public String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
-     StringBuilder sb = new StringBuilder();
-     String[] quorumParts = zkQuorum.split(",");
-     String prefix = "";
-     for (String part : quorumParts) {
-       sb.append(prefix);
-       sb.append(part.trim());
-       if (!part.contains(":")) {
-         sb.append(":");
-         sb.append(zkClientPort);
++  public String getZkConnectionUrl(String zkClientPort, String zkQuorum){
++      StringBuilder sb = new StringBuilder();
++      String[] quorumParts = zkQuorum.split(",");
++      String prefix = "";
++      for (String part : quorumParts) {
++        sb.append(prefix);
++        sb.append(part.trim());
++        if (!part.contains(":")) {
++          sb.append(":");
++          sb.append(zkClientPort);
++        }
++        prefix = ",";
 +      }
-       prefix = ",";
++
++      return sb.toString();
++
 +    }
 +
-     return sb.toString();
+   public boolean isWhitelistingEnabled() {
+     if (metricsConf != null) {
+       return Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_WHITELIST_ENABLED, "false"));
+     }
+     return false;
    }
  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
index f035678,0000000..e06033d
mode 100644,000000..100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
@@@ -1,132 -1,0 +1,136 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 +
 +import com.google.common.collect.Multimap;
 +import junit.framework.Assert;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 +import org.junit.Test;
 +
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.TreeMap;
 +
 +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG;
 +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.SUM;
 +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
 +import static org.assertj.core.api.Assertions.assertThat;
 +
 +public class HBaseTimelineMetricsServiceTest {
 +
 +  public static final String MEM_METRIC = "mem";
 +  public static final String BYTES_IN_METRIC = "bytes_in";
 +  public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not._afunction";
 +
 +  @Test
 +  public void testParseMetricNamesToAggregationFunctions() throws Exception {
 +    //giwen
 +    List<String> metricNames = Arrays.asList(
 +      MEM_METRIC + "._avg",
 +      MEM_METRIC + "._sum",
 +      MEM_METRIC + "._rate._avg",
 +      BYTES_IN_METRIC,
 +      BYTES_NOT_AFUNCTION_METRIC);
 +
 +    //when
 +    Multimap<String, List<Function>> multimap =
 +      HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames);
 +
 +    //then
 +    Assert.assertEquals(multimap.keySet().size(), 3);
 +    Assert.assertTrue(multimap.containsKey(MEM_METRIC));
 +    Assert.assertTrue(multimap.containsKey(BYTES_IN_METRIC));
 +    Assert.assertTrue(multimap.containsKey(BYTES_NOT_AFUNCTION_METRIC));
 +
 +    List<List<Function>> metricEntry = (List<List<Function>>) multimap.get(MEM_METRIC);
 +    HashMap<String, List<Function>> mfm = new HashMap<String, List<Function>>();
 +    mfm.put(MEM_METRIC, metricEntry.get(0));
 +
 +    assertThat(mfm.get(MEM_METRIC)).containsOnly(
 +      new Function(AVG, null));
 +
 +    mfm = new HashMap<String, List<Function>>();
 +    mfm.put(MEM_METRIC, metricEntry.get(1));
 +    assertThat(mfm.get(MEM_METRIC)).containsOnly(
 +      new Function(SUM, null));
 +
 +    mfm = new HashMap<String, List<Function>>();
 +    mfm.put(MEM_METRIC, metricEntry.get(2));
 +    assertThat(mfm.get(MEM_METRIC)).containsOnly(
 +      new Function(AVG, RATE));
 +
 +    metricEntry = (List<List<Function>>) multimap.get(BYTES_IN_METRIC);
 +    mfm = new HashMap<String, List<Function>>();
 +    mfm.put(BYTES_IN_METRIC, metricEntry.get(0));
 +
 +    assertThat(mfm.get(BYTES_IN_METRIC))
 +      .contains(Function.DEFAULT_VALUE_FUNCTION);
 +
 +    metricEntry = (List<List<Function>>) multimap.get(BYTES_NOT_AFUNCTION_METRIC);
 +    mfm = new HashMap<String, List<Function>>();
 +    mfm.put(BYTES_NOT_AFUNCTION_METRIC, metricEntry.get(0));
 +
 +    assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC))
 +      .contains(Function.DEFAULT_VALUE_FUNCTION);
 +
 +  }
 +
 +  @Test
 +  public void testRateCalculationOnMetricsWithEqualValues() throws Exception {
 +    Map<Long, Double> metricValues = new TreeMap<>();
-     metricValues.put(1454016368371L, 1011.25);
-     metricValues.put(1454016428371L, 1011.25);
-     metricValues.put(1454016488371L, 1011.25);
-     metricValues.put(1454016548371L, 1011.25);
-     metricValues.put(1454016608371L, 1011.25);
-     metricValues.put(1454016668371L, 1011.25);
-     metricValues.put(1454016728371L, 1011.25);
++    metricValues.put(1454000000000L, 1.0);
++    metricValues.put(1454000001000L, 6.0);
++    metricValues.put(1454000002000L, 0.0);
++    metricValues.put(1454000003000L, 3.0);
++    metricValues.put(1454000004000L, 4.0);
++    metricValues.put(1454000005000L, 7.0);
 +
 +    // Calculate rate
 +    Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false);
 +
 +    // Make sure rate is zero
-     for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) {
-       Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey()
-           + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue());
-     }
++    Assert.assertTrue(rates.size() == 4);
++
++    Assert.assertFalse(rates.containsKey(1454000000000L));
++    Assert.assertFalse(rates.containsKey(1454000002000L));
++
++    Assert.assertEquals(rates.get(1454000001000L), 5.0);
++    Assert.assertEquals(rates.get(1454000003000L), 3.0);
++    Assert.assertEquals(rates.get(1454000004000L), 1.0);
++    Assert.assertEquals(rates.get(1454000005000L), 3.0);
 +  }
 +
 +  @Test
 +  public void testDiffCalculation() throws Exception {
 +    Map<Long, Double> metricValues = new TreeMap<>();
 +    metricValues.put(1454016368371L, 1011.25);
 +    metricValues.put(1454016428371L, 1010.25);
 +    metricValues.put(1454016488371L, 1012.25);
-     metricValues.put(1454016548371L, 1010.25);
-     metricValues.put(1454016608371L, 1010.25);
++    metricValues.put(1454016548371L, 1015.25);
++    metricValues.put(1454016608371L, 1020.25);
 +
 +    Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true);
 +
-     Assert.assertTrue(rates.size()==4);
-     Assert.assertTrue(rates.containsValue(-1.0));
++    Assert.assertTrue(rates.size() == 3);
 +    Assert.assertTrue(rates.containsValue(2.0));
-     Assert.assertTrue(rates.containsValue(0.0));
++    Assert.assertTrue(rates.containsValue(3.0));
++    Assert.assertTrue(rates.containsValue(5.0));
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index eb772bc,8abcd83..12bd463
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@@ -17,26 -17,24 +17,25 @@@
   */
  package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
  
 -import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 -import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 -import org.apache.hadoop.metrics2.sink.timeline.Precision;
 -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 -import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+ import java.io.IOException;
+ import java.sql.SQLException;
+ import java.util.ArrayList;
+ import java.util.Collections;
 -import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.TreeMap;
+ 
 +import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 +import org.apache.hadoop.metrics2.sink.timeline.Precision;
 +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 +import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 +
- import java.io.IOException;
- import java.sql.SQLException;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.TreeMap;
- 
  public class TestTimelineMetricStore implements TimelineMetricStore {
    @Override
    public TimelineMetrics getTimelineMetrics(List<String> metricNames,
@@@ -113,17 -111,5 +112,16 @@@
    public List<String> getLiveInstances() {
      return Collections.emptyList();
    }
 -  
 +
 +  @Override
 +  public Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException {
 +    return null;
 +  }
 +
 +  @Override
 +  public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) {
 +    return null;
 +  }
- 
  }
 +
 +

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
----------------------------------------------------------------------
diff --cc ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
index b570d19,71f40e8..3a1405e
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
@@@ -87,7 -87,7 +87,7 @@@ public class MetricsRequestHelper 
            uriBuilder.setParameter("precision", higherPrecision);
            String newSpec = uriBuilder.toString();
            connection = streamProvider.processURL(newSpec, HttpMethod.GET, (String) null,
-             Collections.<String, List<String>>emptyMap());
 -            Collections.emptyMap());
++            Collections.<String, List<String>emptyMap());
            if (!checkConnectionForPrecisionException(connection)) {
              throw new IOException("Encountered Precision exception : Higher precision request also failed.");
            }

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
----------------------------------------------------------------------
diff --cc ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
index 45c8967,2401d75..ea231f3
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
@@@ -79,5 -142,12 +142,12 @@@ public class TimelineMetricsCacheSizeOf
      return size;
    }
  
+   @Override
+   public SizeOfEngine copyWith(int maxDepth, boolean abortWhenMaxDepthExceeded) {
+     LOG.debug("Copying tracing sizeof engine, maxdepth: {}, abort: {}",
+       maxDepth, abortWhenMaxDepthExceeded);
  
- }
+     return new TimelineMetricsCacheSizeOfEngine(
+       underlying.copyWith(maxDepth, abortWhenMaxDepthExceeded));
+   }
 -}
++}

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --cc ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index d54db37,b7ae456..cd4ddfd
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@@ -799,8 -827,25 +827,30 @@@
      <on-ambari-upgrade add="true"/>
    </property>
    <property>
 +    <name>timeline.metrics.host.inmemory.aggregation.http.policy</name>
 +    <value>HTTP_ONLY</value>
 +    <on-ambari-upgrade add="true"/>
 +  </property>
++  <property>
+     <name>timeline.metrics.whitelisting.enabled</name>
+     <value>false</value>
+     <description>Enable/Disable metric whitelisting</description>
+     <display-name>Enable only whitelisted metrics</display-name>
+     <on-ambari-upgrade add="true"/>
+     <value-attributes>
+       <overridable>false</overridable>
+       <type>value-list</type>
+       <entries>
+         <entry>
+           <value>true</value>
+           <label>True</label>
+         </entry>
+         <entry>
+           <value>false</value>
+           <label>False</label>
+         </entry>
+       </entries>
+       <selection-cardinality>1</selection-cardinality>
+     </value-attributes>
+   </property>
  </configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheSizingTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/test/python/stacks/2.0.6/configs/default_ams_embedded.json
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/pom.xml
----------------------------------------------------------------------