You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2017/09/26 22:26:47 UTC
[48/50] [abbrv] ambari git commit: Merge branch 'trunk' into
branch-3.0-ams
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index a4b0bcc,0000000..bb26439
mode 100644,000000..100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@@ -1,577 -1,0 +1,590 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
- import com.google.common.collect.ArrayListMultimap;
- import com.google.common.collect.Multimap;
++import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
++import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
++import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
++
++import java.io.IOException;
++import java.net.MalformedURLException;
++import java.net.URISyntaxException;
++import java.net.UnknownHostException;
++import java.sql.SQLException;
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Iterator;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.Executors;
++import java.util.concurrent.ScheduledExecutorService;
++import java.util.concurrent.ThreadFactory;
++import java.util.concurrent.TimeUnit;
++
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
++import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
++import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
++import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
- import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
- import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
- import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory;
+
- import java.io.IOException;
- import java.net.MalformedURLException;
- import java.net.URISyntaxException;
- import java.net.UnknownHostException;
- import java.sql.SQLException;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.TimeUnit;
-
- import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
- import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
++import com.google.common.collect.ArrayListMultimap;
++import com.google.common.collect.Multimap;
+
+public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore {
+
+ static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class);
+ private final TimelineMetricConfiguration configuration;
+ private TimelineMetricDistributedCache cache;
+ private PhoenixHBaseAccessor hBaseAccessor;
+ private static volatile boolean isInitialized = false;
+ private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
+ private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>();
+ private final ConcurrentHashMap<String, Long> postedAggregatedMap = new ConcurrentHashMap<>();
+ private TimelineMetricMetadataManager metricMetadataManager;
+ private Integer defaultTopNHostsLimit;
+ private MetricCollectorHAController haController;
- // private MetricKafkaProducer metricKafkaProducer;
++ private boolean containerMetricsDisabled = false;
+
+ /**
+ * Construct the service.
+ *
+ */
+ public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) {
+ super(HBaseTimelineMetricsService.class.getName());
+ this.configuration = configuration;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ initializeSubsystem();
+ }
+
+ private TimelineMetricDistributedCache startCacheNode() throws MalformedURLException, URISyntaxException {
+ //TODO make configurable
+ return new TimelineMetricsIgniteCache();
+ }
+
+
+ private synchronized void initializeSubsystem() {
+ if (!isInitialized) {
+ hBaseAccessor = new PhoenixHBaseAccessor(null);
+ // Initialize schema
+ hBaseAccessor.initMetricSchema();
+ // Initialize metadata from store
+ try {
+ metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
+ } catch (MalformedURLException | URISyntaxException e) {
+ throw new ExceptionInInitializerError("Unable to initialize metadata manager");
+ }
+ metricMetadataManager.initializeMetadata();
+ // Initialize policies before TTL update
+ hBaseAccessor.initPoliciesAndTTL();
+ // Start HA service
+ // Start the controller
+ if (!configuration.isDistributedCollectorModeDisabled()) {
+ haController = new MetricCollectorHAController(configuration);
+ try {
+ haController.initializeHAController();
+ } catch (Exception e) {
+ LOG.error(e);
+ throw new MetricsSystemInitializationException("Unable to " +
+ "initialize HA controller", e);
+ }
+ } else {
+ LOG.info("Distributed collector mode disabled");
+ }
+
+ //Initialize whitelisting & blacklisting if needed
+ TimelineMetricsFilter.initializeMetricFilter(configuration);
+
+ Configuration metricsConf = null;
+ try {
+ metricsConf = configuration.getMetricsConf();
+ } catch (Exception e) {
+ throw new ExceptionInInitializerError("Cannot initialize configuration.");
+ }
+
+ if (configuration.isCollectorInMemoryAggregationEnabled()) {
+ try {
+ cache = startCacheNode();
+ } catch (Exception e) {
+ throw new MetricsSystemInitializationException("Unable to " +
+ "start cache node", e);
+ }
+ }
+// String kafkaServers = configuration.getKafkaServers();
+// if (kafkaServers != null) {
+// metricKafkaProducer = new MetricKafkaProducer(kafkaServers);
+// }
+
+ defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
+ if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
+ LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
+ }
+
+ // Start the cluster aggregator second
+ TimelineMetricAggregator secondClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
+ hBaseAccessor, metricsConf, metricMetadataManager, haController, cache);
+ scheduleAggregatorThread(secondClusterAggregator);
+
+ // Start the minute cluster aggregator
+ TimelineMetricAggregator minuteClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
+ scheduleAggregatorThread(minuteClusterAggregator);
+
+ // Start the hourly cluster aggregator
+ TimelineMetricAggregator hourlyClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
+ scheduleAggregatorThread(hourlyClusterAggregator);
+
+ // Start the daily cluster aggregator
+ TimelineMetricAggregator dailyClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
+ scheduleAggregatorThread(dailyClusterAggregator);
+
+ // Start the minute host aggregator
+ if (configuration.isHostInMemoryAggregationEnabled()) {
+ LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, switching to filtering host minute aggregation on collector");
+ TimelineMetricAggregator minuteHostAggregator =
+ TimelineMetricAggregatorFactory.createFilteringTimelineMetricAggregatorMinute(
+ hBaseAccessor, metricsConf, metricMetadataManager, haController, postedAggregatedMap);
+ scheduleAggregatorThread(minuteHostAggregator);
+ } else {
+ TimelineMetricAggregator minuteHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
+ scheduleAggregatorThread(minuteHostAggregator);
+ }
+
+ // Start the hourly host aggregator
+ TimelineMetricAggregator hourlyHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
+ scheduleAggregatorThread(hourlyHostAggregator);
+
+ // Start the daily host aggregator
+ TimelineMetricAggregator dailyHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
+ scheduleAggregatorThread(dailyHostAggregator);
+
+ if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
+ int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
+ int delay = configuration.getTimelineMetricsServiceWatcherDelay();
+ // Start the watchdog
+ watchdogExecutorService.scheduleWithFixedDelay(
+ new TimelineMetricStoreWatcher(this, configuration),
+ initDelay, delay, TimeUnit.SECONDS);
+ LOG.info("Started watchdog for timeline metrics store with initial " +
+ "delay = " + initDelay + ", delay = " + delay);
+ }
-
++ containerMetricsDisabled = configuration.isContainerMetricsDisabled();
+ isInitialized = true;
+ }
+
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ @Override
+ public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException {
+ return hBaseAccessor.getAnomalyMetricRecords(method, startTime, endTime, limit);
+ }
+
+ @Override
+ public TimelineMetrics getTimelineMetrics(List<String> metricNames,
+ List<String> hostnames, String applicationId, String instanceId,
+ Long startTime, Long endTime, Precision precision, Integer limit,
+ boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException {
+
+ if (metricNames == null || metricNames.isEmpty()) {
+ throw new IllegalArgumentException("No metric name filter specified.");
+ }
+ if ((startTime == null && endTime != null)
+ || (startTime != null && endTime == null)) {
+ throw new IllegalArgumentException("Open ended query not supported ");
+ }
+ if (limit != null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT){
+ throw new IllegalArgumentException("Limit too big");
+ }
+
+ TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null;
+ if (!StringUtils.isEmpty(seriesAggregateFunction)) {
+ SeriesAggregateFunction func = SeriesAggregateFunction.getFunction(seriesAggregateFunction);
+ seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func);
+ }
+
+ Multimap<String, List<Function>> metricFunctions =
+ parseMetricNamesToAggregationFunctions(metricNames);
+
+ List<byte[]> uuids = metricMetadataManager.getUuids(metricFunctions.keySet(), hostnames, applicationId, instanceId);
+
+ ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet()))
+ .hostnames(hostnames)
+ .appId(applicationId)
+ .instanceId(instanceId)
+ .startTime(startTime)
+ .endTime(endTime)
+ .precision(precision)
+ .limit(limit)
+ .grouped(groupedByHosts)
+ .uuid(uuids);
+
+ if (topNConfig != null) {
+ if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true.
+ TopNCondition.isTopNMetricCondition(metricNames, hostnames)) {
+ conditionBuilder.topN(topNConfig.getTopN());
+ conditionBuilder.isBottomN(topNConfig.getIsBottomN());
+ Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction());
+ Function function = new Function(readFunction, null);
+ conditionBuilder.topNFunction(function);
+ } else {
+ LOG.info("Invalid Input for TopN query. Ignoring TopN Request.");
+ }
+ } else if (startTime != null && hostnames != null && hostnames.size() > defaultTopNHostsLimit) {
+ // if (timeseries query AND hostnames passed AND size(hostnames) > limit)
+ LOG.info("Requesting data for more than " + defaultTopNHostsLimit + " Hosts. " +
+ "Defaulting to Top " + defaultTopNHostsLimit);
+ conditionBuilder.topN(defaultTopNHostsLimit);
+ conditionBuilder.isBottomN(false);
+ }
+
+ Condition condition = conditionBuilder.build();
+
+ TimelineMetrics metrics;
+
+ if (hostnames == null || hostnames.isEmpty()) {
+ metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions);
+ } else {
+ metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions);
+ }
+
+ metrics = postProcessMetrics(metrics);
+
+ if (metrics.getMetrics().size() == 0) {
+ return metrics;
+ }
+
+ return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics);
+ }
+
+ private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) {
+ List<TimelineMetric> metricsList = metrics.getMetrics();
+
+ for (TimelineMetric metric : metricsList){
+ String name = metric.getMetricName();
+ if (name.contains("._rate")){
+ updateValuesAsRate(metric.getMetricValues(), false);
+ } else if (name.contains("._diff")) {
+ updateValuesAsRate(metric.getMetricValues(), true);
+ }
+ }
+
+ return metrics;
+ }
+
+ private TimelineMetrics seriesAggregateMetrics(TimelineMetricsSeriesAggregateFunction seriesAggrFuncInstance,
+ TimelineMetrics metrics) {
+ if (seriesAggrFuncInstance != null) {
+ TimelineMetric appliedMetric = seriesAggrFuncInstance.apply(metrics);
+ metrics.setMetrics(Collections.singletonList(appliedMetric));
+ }
+ return metrics;
+ }
+
+ static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues, boolean isDiff) {
+ Long prevTime = null;
+ Double prevVal = null;
+ long step;
+ Double diff;
+
+ for(Iterator<Map.Entry<Long, Double>> it = metricValues.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<Long, Double> timeValueEntry = it.next();
+ Long currTime = timeValueEntry.getKey();
+ Double currVal = timeValueEntry.getValue();
+
+ if (prevTime != null) {
+ step = currTime - prevTime;
+ diff = currVal - prevVal;
- Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
- timeValueEntry.setValue(rate);
++ if (diff < 0) {
++ it.remove(); //Discard calculating rate when the metric counter has been reset.
++ } else {
++ Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
++ timeValueEntry.setValue(rate);
++ }
+ } else {
+ it.remove();
+ }
+
+ prevTime = currTime;
+ prevVal = currVal;
+ }
+
+ return metricValues;
+ }
+
+ static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) {
+ Multimap<String, List<Function>> metricsFunctions = ArrayListMultimap.create();
+
+ for (String metricName : metricNames){
+ Function function = Function.DEFAULT_VALUE_FUNCTION;
+ String cleanMetricName = metricName;
+
+ try {
+ function = Function.fromMetricName(metricName);
+ int functionStartIndex = metricName.indexOf("._");
+ if (functionStartIndex > 0) {
+ cleanMetricName = metricName.substring(0, functionStartIndex);
+ }
+ } catch (Function.FunctionFormatException ffe){
+ // unknown function so
+ // fallback to VALUE, and fullMetricName
+ }
+
+ List<Function> functionsList = new ArrayList<>();
+ functionsList.add(function);
+ metricsFunctions.put(cleanMetricName, functionsList);
+ }
+
+ return metricsFunctions;
+ }
+
+ @Override
+ public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException {
+ // Error indicated by the Sql exception
+ TimelinePutResponse response = new TimelinePutResponse();
+
+ hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
+
+ if (configuration.isCollectorInMemoryAggregationEnabled()) {
+ cache.putMetrics(metrics.getMetrics(), metricMetadataManager);
+ }
+
+// try {
+// metricKafkaProducer.sendMetrics(metrics);
+//// if (metrics.getMetrics().size() != 0 && metrics.getMetrics().get(0).getAppId().equals("anomaly-engine-test-metric")) {
+//// }
+// } catch (Exception e) {
+// LOG.error(e);
+// }
+
+ return response;
+ }
+
+ @Override
+ public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
+ throws SQLException, IOException {
++
++ if (containerMetricsDisabled) {
++ LOG.debug("Ignoring submitted container metrics according to configuration. Values will not be stored.");
++ return new TimelinePutResponse();
++ }
++
+ hBaseAccessor.insertContainerMetrics(metrics);
+ return new TimelinePutResponse();
+ }
+
+ @Override
+ public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException {
+ Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata =
+ metricMetadataManager.getMetadataCache();
+
+ boolean includeBlacklistedMetrics = StringUtils.isNotEmpty(query) && "all".equalsIgnoreCase(query);
+
+ // Group Metadata by AppId
+ Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>();
+ for (TimelineMetricMetadata metricMetadata : metadata.values()) {
+
+ if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) {
+ continue;
+ }
+ List<TimelineMetricMetadata> metadataList = metadataByAppId.get(metricMetadata.getAppId());
+ if (metadataList == null) {
+ metadataList = new ArrayList<>();
+ metadataByAppId.put(metricMetadata.getAppId(), metadataList);
+ }
+
+ metadataList.add(metricMetadata);
+ }
+
+ return metadataByAppId;
+ }
+
+ @Override
+ public Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException {
+ return metricMetadataManager.getUuidKeyMap();
+ }
+
+ @Override
+ public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
+ Map<String, TimelineMetricHostMetadata> hostsMetadata = metricMetadataManager.getHostedAppsCache();
+ Map<String, Set<String>> hostAppMap = new HashMap<>();
+ for (String hostname : hostsMetadata.keySet()) {
+ hostAppMap.put(hostname, hostsMetadata.get(hostname).getHostedApps().keySet());
+ }
+ return hostAppMap;
+ }
+
+ @Override
+ public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+ Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+ String hostname = null;
+ for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
+ aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
+ hostname = hostname == null ? entry.getTimelineMetric().getHostName() : hostname;
+ break;
+ }
+ long timestamp = aggregationResult.getTimeInMilis();
+ postedAggregatedMap.put(hostname, timestamp);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Adding host %s to aggregated by in-memory aggregator. Timestamp : %s", hostname, timestamp));
+ }
+ hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+
+ return new TimelinePutResponse();
+ }
+
+ @Override
- public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
- throws SQLException, IOException {
-
++ public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException {
+ Map<String, Set<String>> hostedApps = getHostAppsMetadata();
+ Map<String, Set<String>> instanceHosts = metricMetadataManager.getHostedInstanceCache();
++ if (configuration.getTimelineMetricsMultipleClusterSupport()) {
++ instanceHosts = metricMetadataManager.getHostedInstanceCache();
++ }
+ Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>();
+
+ if (MapUtils.isEmpty(instanceHosts)) {
+ Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
+ for (String host : hostedApps.keySet()) {
+ for (String app : hostedApps.get(host)) {
+ if (!appHostMap.containsKey(app)) {
+ appHostMap.put(app, new HashSet<String>());
+ }
+ appHostMap.get(app).add(host);
+ }
+ }
+ instanceAppHosts.put("", appHostMap);
+ } else {
+ for (String instance : instanceHosts.keySet()) {
+
+ if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) {
+ continue;
+ }
+ Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
+ instanceAppHosts.put(instance, appHostMap);
+
+ Set<String> hostsWithInstance = instanceHosts.get(instance);
+ for (String host : hostsWithInstance) {
+ for (String app : hostedApps.get(host)) {
+ if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) {
+ continue;
+ }
+
+ if (!appHostMap.containsKey(app)) {
+ appHostMap.put(app, new HashSet<String>());
+ }
+ appHostMap.get(app).add(host);
+ }
+ }
+ }
+ }
+
+ return instanceAppHosts;
+ }
+
+ @Override
+ public List<String> getLiveInstances() {
+
+ List<String> instances = null;
+ try {
+ if (haController == null) {
+ // Always return current host as live (embedded operation mode)
+ return Collections.singletonList(configuration.getInstanceHostnameFromEnv());
+ }
+ instances = haController.getLiveInstanceHostNames();
+ if (instances == null || instances.isEmpty()) {
+ // fallback
+ instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv());
+ }
+ } catch (UnknownHostException e) {
+ LOG.debug("Exception on getting hostname from env.", e);
+ }
+ return instances;
+ }
+
+ private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) {
+ if (!aggregator.isDisabled()) {
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName()));
+ }
+ }
+ );
+ scheduledExecutors.put(aggregator.getName(), executorService);
+ executorService.scheduleAtFixedRate(aggregator,
+ 0l,
+ aggregator.getSleepIntervalMillis(),
+ TimeUnit.MILLISECONDS);
+ LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
+ + aggregator.getSleepIntervalMillis() + " milliseconds.");
+ } else {
+ LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index c86137b,899928a..d0e385b
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@@ -313,9 -299,9 +322,12 @@@ public class TimelineMetricConfiguratio
public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES =
"timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles";
+ public static final String TIMELINE_METRICS_UUID_GEN_STRATEGY =
+ "timeline.metrics.uuid.gen.strategy";
+
+ public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS =
+ "timeline.metrics.support.multiple.clusters";
+
public static final String HOST_APP_ID = "HOST";
public static final String DEFAULT_INSTANCE_PORT = "12001";
@@@ -567,90 -517,18 +586,107 @@@
return whitelist;
}
+ public int getExternalSinkInterval(SOURCE_NAME sourceName) {
+ return Integer.parseInt(metricsConf.get(String.format(EXTERNAL_SINK_INTERVAL, sourceName), "-1"));
+ }
+
+ public InternalSourceProvider getInternalSourceProvider() {
+ Class<? extends InternalSourceProvider> providerClass =
+ metricsConf.getClass(TIMELINE_METRICS_SOURCE_PROVIDER_CLASS,
+ DefaultInternalMetricsSourceProvider.class, InternalSourceProvider.class);
+ return ReflectionUtils.newInstance(providerClass, metricsConf);
+ }
+
+ public ExternalSinkProvider getExternalSinkProvider() {
+ Class<?> providerClass = metricsConf.getClassByNameOrNull(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
+ if (providerClass != null) {
+ return (ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf);
+ }
+ return null;
+ }
+
+ public String getInternalCacheHeapPercent(String instanceName) {
+ String heapPercent = metricsConf.get(String.format(INTERNAL_CACHE_HEAP_PERCENT, instanceName));
+ if (StringUtils.isEmpty(heapPercent)) {
+ return "5%";
+ } else {
+ return heapPercent.endsWith("%") ? heapPercent : heapPercent + "%";
+ }
+ }
+
+ public String getDefaultMetricsSinkDir() {
+ String dirPath = metricsConf.get(DEFAULT_EXTERNAL_SINK_DIR);
+ if (dirPath == null) {
+ // Only one logger at the time of writing
+ Appender appender = (Appender) Logger.getRootLogger().getAllAppenders().nextElement();
+ if (appender instanceof FileAppender) {
+ File f = new File(((FileAppender) appender).getFile());
+ if (f.exists()) {
+ dirPath = f.getParent();
+ } else {
+ dirPath = "/tmp";
+ }
+ }
+ }
+
+ return dirPath;
+ }
+
+ public boolean isHostInMemoryAggregationEnabled() {
+ if (metricsConf != null) {
+ return Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "false"));
+ } else {
+ return false;
+ }
+ }
+
+ public boolean isContainerMetricsDisabled() {
+ try {
+ return metricsConf != null && Boolean.parseBoolean(metricsConf.get(TIMELINE_SERVICE_DISABLE_CONTAINER_METRICS, "false"));
+ } catch (Exception e) {
++
++ return false;
++ }
++ }
++
+ public boolean isCollectorInMemoryAggregationEnabled() {
+ if (metricsConf != null) {
+ return Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_COLLECTOR_INMEMORY_AGGREGATION, "false"));
+ } else {
return false;
}
}
+ public List<String> getAppIdsForHostAggregation() {
+ String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS);
+ if (!StringUtils.isEmpty(appIds)) {
+ return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
+ }
+ return Collections.emptyList();
+ }
+
- public String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
- StringBuilder sb = new StringBuilder();
- String[] quorumParts = zkQuorum.split(",");
- String prefix = "";
- for (String part : quorumParts) {
- sb.append(prefix);
- sb.append(part.trim());
- if (!part.contains(":")) {
- sb.append(":");
- sb.append(zkClientPort);
++ public String getZkConnectionUrl(String zkClientPort, String zkQuorum){
++ StringBuilder sb = new StringBuilder();
++ String[] quorumParts = zkQuorum.split(",");
++ String prefix = "";
++ for (String part : quorumParts) {
++ sb.append(prefix);
++ sb.append(part.trim());
++ if (!part.contains(":")) {
++ sb.append(":");
++ sb.append(zkClientPort);
++ }
++ prefix = ",";
+ }
- prefix = ",";
++
++ return sb.toString();
++
+ }
+
- return sb.toString();
+ public boolean isWhitelistingEnabled() {
+ if (metricsConf != null) {
+ return Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_WHITELIST_ENABLED, "false"));
+ }
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
index f035678,0000000..e06033d
mode 100644,000000..100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
@@@ -1,132 -1,0 +1,136 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import com.google.common.collect.Multimap;
+import junit.framework.Assert;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.SUM;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class HBaseTimelineMetricsServiceTest {
+
+ public static final String MEM_METRIC = "mem";
+ public static final String BYTES_IN_METRIC = "bytes_in";
+ public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not._afunction";
+
+ @Test
+ public void testParseMetricNamesToAggregationFunctions() throws Exception {
+ //giwen
+ List<String> metricNames = Arrays.asList(
+ MEM_METRIC + "._avg",
+ MEM_METRIC + "._sum",
+ MEM_METRIC + "._rate._avg",
+ BYTES_IN_METRIC,
+ BYTES_NOT_AFUNCTION_METRIC);
+
+ //when
+ Multimap<String, List<Function>> multimap =
+ HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames);
+
+ //then
+ Assert.assertEquals(multimap.keySet().size(), 3);
+ Assert.assertTrue(multimap.containsKey(MEM_METRIC));
+ Assert.assertTrue(multimap.containsKey(BYTES_IN_METRIC));
+ Assert.assertTrue(multimap.containsKey(BYTES_NOT_AFUNCTION_METRIC));
+
+ List<List<Function>> metricEntry = (List<List<Function>>) multimap.get(MEM_METRIC);
+ HashMap<String, List<Function>> mfm = new HashMap<String, List<Function>>();
+ mfm.put(MEM_METRIC, metricEntry.get(0));
+
+ assertThat(mfm.get(MEM_METRIC)).containsOnly(
+ new Function(AVG, null));
+
+ mfm = new HashMap<String, List<Function>>();
+ mfm.put(MEM_METRIC, metricEntry.get(1));
+ assertThat(mfm.get(MEM_METRIC)).containsOnly(
+ new Function(SUM, null));
+
+ mfm = new HashMap<String, List<Function>>();
+ mfm.put(MEM_METRIC, metricEntry.get(2));
+ assertThat(mfm.get(MEM_METRIC)).containsOnly(
+ new Function(AVG, RATE));
+
+ metricEntry = (List<List<Function>>) multimap.get(BYTES_IN_METRIC);
+ mfm = new HashMap<String, List<Function>>();
+ mfm.put(BYTES_IN_METRIC, metricEntry.get(0));
+
+ assertThat(mfm.get(BYTES_IN_METRIC))
+ .contains(Function.DEFAULT_VALUE_FUNCTION);
+
+ metricEntry = (List<List<Function>>) multimap.get(BYTES_NOT_AFUNCTION_METRIC);
+ mfm = new HashMap<String, List<Function>>();
+ mfm.put(BYTES_NOT_AFUNCTION_METRIC, metricEntry.get(0));
+
+ assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC))
+ .contains(Function.DEFAULT_VALUE_FUNCTION);
+
+ }
+
+ @Test
+ public void testRateCalculationOnMetricsWithEqualValues() throws Exception {
+ Map<Long, Double> metricValues = new TreeMap<>();
- metricValues.put(1454016368371L, 1011.25);
- metricValues.put(1454016428371L, 1011.25);
- metricValues.put(1454016488371L, 1011.25);
- metricValues.put(1454016548371L, 1011.25);
- metricValues.put(1454016608371L, 1011.25);
- metricValues.put(1454016668371L, 1011.25);
- metricValues.put(1454016728371L, 1011.25);
++ metricValues.put(1454000000000L, 1.0);
++ metricValues.put(1454000001000L, 6.0);
++ metricValues.put(1454000002000L, 0.0);
++ metricValues.put(1454000003000L, 3.0);
++ metricValues.put(1454000004000L, 4.0);
++ metricValues.put(1454000005000L, 7.0);
+
+ // Calculate rate
+ Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false);
+
+ // Make sure rate is zero
- for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) {
- Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey()
- + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue());
- }
++ Assert.assertTrue(rates.size() == 4);
++
++ Assert.assertFalse(rates.containsKey(1454000000000L));
++ Assert.assertFalse(rates.containsKey(1454000002000L));
++
++ Assert.assertEquals(rates.get(1454000001000L), 5.0);
++ Assert.assertEquals(rates.get(1454000003000L), 3.0);
++ Assert.assertEquals(rates.get(1454000004000L), 1.0);
++ Assert.assertEquals(rates.get(1454000005000L), 3.0);
+ }
+
+ @Test
+ public void testDiffCalculation() throws Exception {
+ Map<Long, Double> metricValues = new TreeMap<>();
+ metricValues.put(1454016368371L, 1011.25);
+ metricValues.put(1454016428371L, 1010.25);
+ metricValues.put(1454016488371L, 1012.25);
- metricValues.put(1454016548371L, 1010.25);
- metricValues.put(1454016608371L, 1010.25);
++ metricValues.put(1454016548371L, 1015.25);
++ metricValues.put(1454016608371L, 1020.25);
+
+ Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true);
+
- Assert.assertTrue(rates.size()==4);
- Assert.assertTrue(rates.containsValue(-1.0));
++ Assert.assertTrue(rates.size() == 3);
+ Assert.assertTrue(rates.containsValue(2.0));
- Assert.assertTrue(rates.containsValue(0.0));
++ Assert.assertTrue(rates.containsValue(3.0));
++ Assert.assertTrue(rates.containsValue(5.0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --cc ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index eb772bc,8abcd83..12bd463
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@@ -17,26 -17,24 +17,25 @@@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
-import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+ import java.io.IOException;
+ import java.sql.SQLException;
+ import java.util.ArrayList;
+ import java.util.Collections;
-import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.TreeMap;
+
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
+
- import java.io.IOException;
- import java.sql.SQLException;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.TreeMap;
-
public class TestTimelineMetricStore implements TimelineMetricStore {
@Override
public TimelineMetrics getTimelineMetrics(List<String> metricNames,
@@@ -113,17 -111,5 +112,16 @@@
public List<String> getLiveInstances() {
return Collections.emptyList();
}
-
+
+ @Override
+ public Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException {
+ return null;
+ }
+
+ @Override
+ public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) {
+ return null;
+ }
-
}
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
----------------------------------------------------------------------
diff --cc ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
index b570d19,71f40e8..3a1405e
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
@@@ -87,7 -87,7 +87,7 @@@ public class MetricsRequestHelper
uriBuilder.setParameter("precision", higherPrecision);
String newSpec = uriBuilder.toString();
connection = streamProvider.processURL(newSpec, HttpMethod.GET, (String) null,
- Collections.<String, List<String>>emptyMap());
- Collections.emptyMap());
++ Collections.<String, List<String>emptyMap());
if (!checkConnectionForPrecisionException(connection)) {
throw new IOException("Encountered Precision exception : Higher precision request also failed.");
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
----------------------------------------------------------------------
diff --cc ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
index 45c8967,2401d75..ea231f3
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
@@@ -79,5 -142,12 +142,12 @@@ public class TimelineMetricsCacheSizeOf
return size;
}
+ @Override
+ public SizeOfEngine copyWith(int maxDepth, boolean abortWhenMaxDepthExceeded) {
+ LOG.debug("Copying tracing sizeof engine, maxdepth: {}, abort: {}",
+ maxDepth, abortWhenMaxDepthExceeded);
- }
+ return new TimelineMetricsCacheSizeOfEngine(
+ underlying.copyWith(maxDepth, abortWhenMaxDepthExceeded));
+ }
-}
++}
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --cc ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index d54db37,b7ae456..cd4ddfd
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@@ -799,8 -827,25 +827,30 @@@
<on-ambari-upgrade add="true"/>
</property>
<property>
+ <name>timeline.metrics.host.inmemory.aggregation.http.policy</name>
+ <value>HTTP_ONLY</value>
+ <on-ambari-upgrade add="true"/>
+ </property>
++ <property>
+ <name>timeline.metrics.whitelisting.enabled</name>
+ <value>false</value>
+ <description>Enable/Disable metric whitelisting</description>
+ <display-name>Enable only whitelisted metrics</display-name>
+ <on-ambari-upgrade add="true"/>
+ <value-attributes>
+ <overridable>false</overridable>
+ <type>value-list</type>
+ <entries>
+ <entry>
+ <value>true</value>
+ <label>True</label>
+ </entry>
+ <entry>
+ <value>false</value>
+ <label>False</label>
+ </entry>
+ </entries>
+ <selection-cardinality>1</selection-cardinality>
+ </value-attributes>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/common-services/STORM/1.0.1.3.0/package/scripts/params_linux.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheSizingTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-server/src/test/python/stacks/2.0.6/configs/default_ams_embedded.json
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/pom.xml
----------------------------------------------------------------------