You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2017/05/30 23:45:36 UTC
ambari git commit: AMBARI-21079. Add ability to sink Raw metrics to
external system via Http. Renamed files to fix build. (swagle)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-ams 0e5094fdb -> 7e970233a
AMBARI-21079. Add ability to sink Raw metrics to external system via Http. Renamed files to fix build. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7e970233
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7e970233
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7e970233
Branch: refs/heads/branch-3.0-ams
Commit: 7e970233a5cf046903baf87d1e4b4d89264db4f3
Parents: 0e5094f
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue May 30 16:45:27 2017 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue May 30 16:45:27 2017 -0700
----------------------------------------------------------------------
.../timeline/HBaseTimelineMetricStore.java | 544 -------------------
.../timeline/HBaseTimelineMetricsService.java | 544 +++++++++++++++++++
.../timeline/HBaseTimelineMetricStoreTest.java | 132 -----
.../HBaseTimelineMetricsServiceTest.java | 132 +++++
4 files changed, 676 insertions(+), 676 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
deleted file mode 100644
index 9ebc64c..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
-import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.*;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
-
-public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore {
-
- static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class);
- private final TimelineMetricConfiguration configuration;
- private PhoenixHBaseAccessor hBaseAccessor;
- private static volatile boolean isInitialized = false;
- private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
- private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>();
- private TimelineMetricMetadataManager metricMetadataManager;
- private Integer defaultTopNHostsLimit;
- private MetricCollectorHAController haController;
- private AmsKafkaProducer kafkaProducer = new AmsKafkaProducer("104.196.85.21:6667");
-
- /**
- * Construct the service.
- *
- */
- public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) {
- super(HBaseTimelineMetricsService.class.getName());
- this.configuration = configuration;
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- initializeSubsystem();
- }
-
- private synchronized void initializeSubsystem() {
- if (!isInitialized) {
- hBaseAccessor = new PhoenixHBaseAccessor(null);
- // Initialize schema
- hBaseAccessor.initMetricSchema();
- // Initialize metadata from store
- try {
- metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
- } catch (MalformedURLException | URISyntaxException e) {
- throw new ExceptionInInitializerError("Unable to initialize metadata manager");
- }
- metricMetadataManager.initializeMetadata();
- // Initialize policies before TTL update
- hBaseAccessor.initPoliciesAndTTL();
- // Start HA service
- // Start the controller
- if (!configuration.isDistributedCollectorModeDisabled()) {
- haController = new MetricCollectorHAController(configuration);
- try {
- haController.initializeHAController();
- } catch (Exception e) {
- LOG.error(e);
- throw new MetricsSystemInitializationException("Unable to " +
- "initialize HA controller", e);
- }
- } else {
- LOG.info("Distributed collector mode disabled");
- }
-
- //Initialize whitelisting & blacklisting if needed
- TimelineMetricsFilter.initializeMetricFilter(configuration);
-
- Configuration metricsConf = null;
- try {
- metricsConf = configuration.getMetricsConf();
- } catch (Exception e) {
- throw new ExceptionInInitializerError("Cannot initialize configuration.");
- }
-
- defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
- if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
- LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
- }
-
- // Start the cluster aggregator second
- TimelineMetricAggregator secondClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
- hBaseAccessor, metricsConf, metricMetadataManager, haController);
- scheduleAggregatorThread(secondClusterAggregator);
-
- // Start the minute cluster aggregator
- TimelineMetricAggregator minuteClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
- hBaseAccessor, metricsConf, haController);
- scheduleAggregatorThread(minuteClusterAggregator);
-
- // Start the hourly cluster aggregator
- TimelineMetricAggregator hourlyClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
- hBaseAccessor, metricsConf, haController);
- scheduleAggregatorThread(hourlyClusterAggregator);
-
- // Start the daily cluster aggregator
- TimelineMetricAggregator dailyClusterAggregator =
- TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
- hBaseAccessor, metricsConf, haController);
- scheduleAggregatorThread(dailyClusterAggregator);
-
- // Start the minute host aggregator
- if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) {
- LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector");
- } else {
- TimelineMetricAggregator minuteHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
- hBaseAccessor, metricsConf, haController);
- scheduleAggregatorThread(minuteHostAggregator);
- }
-
- // Start the hourly host aggregator
- TimelineMetricAggregator hourlyHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
- hBaseAccessor, metricsConf, haController);
- scheduleAggregatorThread(hourlyHostAggregator);
-
- // Start the daily host aggregator
- TimelineMetricAggregator dailyHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
- hBaseAccessor, metricsConf, haController);
- scheduleAggregatorThread(dailyHostAggregator);
-
- if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
- int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
- int delay = configuration.getTimelineMetricsServiceWatcherDelay();
- // Start the watchdog
- watchdogExecutorService.scheduleWithFixedDelay(
- new TimelineMetricStoreWatcher(this, configuration),
- initDelay, delay, TimeUnit.SECONDS);
- LOG.info("Started watchdog for timeline metrics store with initial " +
- "delay = " + initDelay + ", delay = " + delay);
- }
-
- isInitialized = true;
- }
-
- }
-
- @Override
- protected void serviceStop() throws Exception {
- super.serviceStop();
- }
-
- @Override
- public TimelineMetrics getTimelineMetrics(List<String> metricNames,
- List<String> hostnames, String applicationId, String instanceId,
- Long startTime, Long endTime, Precision precision, Integer limit,
- boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException {
-
- if (metricNames == null || metricNames.isEmpty()) {
- throw new IllegalArgumentException("No metric name filter specified.");
- }
- if ((startTime == null && endTime != null)
- || (startTime != null && endTime == null)) {
- throw new IllegalArgumentException("Open ended query not supported ");
- }
- if (limit != null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT){
- throw new IllegalArgumentException("Limit too big");
- }
-
- TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null;
- if (!StringUtils.isEmpty(seriesAggregateFunction)) {
- SeriesAggregateFunction func = SeriesAggregateFunction.getFunction(seriesAggregateFunction);
- seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func);
- }
-
- Multimap<String, List<Function>> metricFunctions =
- parseMetricNamesToAggregationFunctions(metricNames);
-
- ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet()))
- .hostnames(hostnames)
- .appId(applicationId)
- .instanceId(instanceId)
- .startTime(startTime)
- .endTime(endTime)
- .precision(precision)
- .limit(limit)
- .grouped(groupedByHosts);
-
- if (topNConfig != null) {
- if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true.
- TopNCondition.isTopNMetricCondition(metricNames, hostnames)) {
- conditionBuilder.topN(topNConfig.getTopN());
- conditionBuilder.isBottomN(topNConfig.getIsBottomN());
- Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction());
- Function function = new Function(readFunction, null);
- conditionBuilder.topNFunction(function);
- } else {
- LOG.info("Invalid Input for TopN query. Ignoring TopN Request.");
- }
- } else if (startTime != null && hostnames != null && hostnames.size() > defaultTopNHostsLimit) {
- // if (timeseries query AND hostnames passed AND size(hostnames) > limit)
- LOG.info("Requesting data for more than " + defaultTopNHostsLimit + " Hosts. " +
- "Defaulting to Top " + defaultTopNHostsLimit);
- conditionBuilder.topN(defaultTopNHostsLimit);
- conditionBuilder.isBottomN(false);
- }
-
- Condition condition = conditionBuilder.build();
-
- TimelineMetrics metrics;
-
- if (hostnames == null || hostnames.isEmpty()) {
- metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions);
- } else {
- metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions);
- }
-
- metrics = postProcessMetrics(metrics);
-
- if (metrics.getMetrics().size() == 0) {
- return metrics;
- }
-
- return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics);
- }
-
- private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) {
- List<TimelineMetric> metricsList = metrics.getMetrics();
-
- for (TimelineMetric metric : metricsList){
- String name = metric.getMetricName();
- if (name.contains("._rate")){
- updateValuesAsRate(metric.getMetricValues(), false);
- } else if (name.contains("._diff")) {
- updateValuesAsRate(metric.getMetricValues(), true);
- }
- }
-
- return metrics;
- }
-
- private TimelineMetrics seriesAggregateMetrics(TimelineMetricsSeriesAggregateFunction seriesAggrFuncInstance,
- TimelineMetrics metrics) {
- if (seriesAggrFuncInstance != null) {
- TimelineMetric appliedMetric = seriesAggrFuncInstance.apply(metrics);
- metrics.setMetrics(Collections.singletonList(appliedMetric));
- }
- return metrics;
- }
-
- static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues, boolean isDiff) {
- Long prevTime = null;
- Double prevVal = null;
- long step;
- Double diff;
-
- for(Iterator<Map.Entry<Long, Double>> it = metricValues.entrySet().iterator(); it.hasNext(); ) {
- Map.Entry<Long, Double> timeValueEntry = it.next();
- Long currTime = timeValueEntry.getKey();
- Double currVal = timeValueEntry.getValue();
-
- if (prevTime != null) {
- step = currTime - prevTime;
- diff = currVal - prevVal;
- Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
- timeValueEntry.setValue(rate);
- } else {
- it.remove();
- }
-
- prevTime = currTime;
- prevVal = currVal;
- }
-
- return metricValues;
- }
-
- static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) {
- Multimap<String, List<Function>> metricsFunctions = ArrayListMultimap.create();
-
- for (String metricName : metricNames){
- Function function = Function.DEFAULT_VALUE_FUNCTION;
- String cleanMetricName = metricName;
-
- try {
- function = Function.fromMetricName(metricName);
- int functionStartIndex = metricName.indexOf("._");
- if (functionStartIndex > 0) {
- cleanMetricName = metricName.substring(0, functionStartIndex);
- }
- } catch (Function.FunctionFormatException ffe){
- // unknown function so
- // fallback to VALUE, and fullMetricName
- }
-
- List<Function> functionsList = new ArrayList<>();
- functionsList.add(function);
- metricsFunctions.put(cleanMetricName, functionsList);
- }
-
- return metricsFunctions;
- }
-
- @Override
- public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException {
- // Error indicated by the Sql exception
- TimelinePutResponse response = new TimelinePutResponse();
-
- try {
- if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) {
- kafkaProducer.sendMetrics(fromTimelineMetrics(metrics));
- }
- } catch (InterruptedException | ExecutionException e) {
- LOG.error(e);
- }
- hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
-
- return response;
- }
-
-
- private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) {
- org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics();
-
- List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>();
- for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
- timelineMetricList.add(fromTimelineMetric(timelineMetric));
- }
- otherMetrics.setMetrics(timelineMetricList);
- return otherMetrics;
- }
-
- private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) {
-
- org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric();
- otherMetric.setMetricValues(timelineMetric.getMetricValues());
- otherMetric.setStartTime(timelineMetric.getStartTime());
- otherMetric.setHostName(timelineMetric.getHostName());
- otherMetric.setInstanceId(timelineMetric.getInstanceId());
- otherMetric.setAppId(timelineMetric.getAppId());
- otherMetric.setMetricName(timelineMetric.getMetricName());
-
- return otherMetric;
- }
-
- @Override
- public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
- throws SQLException, IOException {
- hBaseAccessor.insertContainerMetrics(metrics);
- return new TimelinePutResponse();
- }
-
- @Override
- public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException {
- Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata =
- metricMetadataManager.getMetadataCache();
-
- boolean includeBlacklistedMetrics = StringUtils.isNotEmpty(query) && "all".equalsIgnoreCase(query);
-
- // Group Metadata by AppId
- Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>();
- for (TimelineMetricMetadata metricMetadata : metadata.values()) {
-
- if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) {
- continue;
- }
- List<TimelineMetricMetadata> metadataList = metadataByAppId.get(metricMetadata.getAppId());
- if (metadataList == null) {
- metadataList = new ArrayList<>();
- metadataByAppId.put(metricMetadata.getAppId(), metadataList);
- }
-
- metadataList.add(metricMetadata);
- }
-
- return metadataByAppId;
- }
-
- @Override
- public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
- return metricMetadataManager.getHostedAppsCache();
- }
-
- @Override
- public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
- Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
- for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
- aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
- }
- hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
-
-
- return new TimelinePutResponse();
- }
-
- @Override
- public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
- throws SQLException, IOException {
-
- Map<String, Set<String>> hostedApps = metricMetadataManager.getHostedAppsCache();
- Map<String, Set<String>> instanceHosts = metricMetadataManager.getHostedInstanceCache();
- Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>();
-
- if (MapUtils.isEmpty(instanceHosts)) {
- Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
- for (String host : hostedApps.keySet()) {
- for (String app : hostedApps.get(host)) {
- if (!appHostMap.containsKey(app)) {
- appHostMap.put(app, new HashSet<String>());
- }
- appHostMap.get(app).add(host);
- }
- }
- instanceAppHosts.put("", appHostMap);
- } else {
- for (String instance : instanceHosts.keySet()) {
-
- if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) {
- continue;
- }
- Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
- instanceAppHosts.put(instance, appHostMap);
-
- Set<String> hostsWithInstance = instanceHosts.get(instance);
- for (String host : hostsWithInstance) {
- for (String app : hostedApps.get(host)) {
- if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) {
- continue;
- }
-
- if (!appHostMap.containsKey(app)) {
- appHostMap.put(app, new HashSet<String>());
- }
- appHostMap.get(app).add(host);
- }
- }
- }
- }
-
- return instanceAppHosts;
- }
-
- @Override
- public List<String> getLiveInstances() {
-
- List<String> instances = null;
- try {
- if (haController == null) {
- // Always return current host as live (embedded operation mode)
- return Collections.singletonList(configuration.getInstanceHostnameFromEnv());
- }
- instances = haController.getLiveInstanceHostNames();
- if (instances == null || instances.isEmpty()) {
- // fallback
- instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv());
- }
- } catch (UnknownHostException e) {
- LOG.debug("Exception on getting hostname from env.", e);
- }
- return instances;
- }
-
- private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) {
- if (!aggregator.isDisabled()) {
- ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName()));
- }
- }
- );
- scheduledExecutors.put(aggregator.getName(), executorService);
- executorService.scheduleAtFixedRate(aggregator,
- 0l,
- aggregator.getSleepIntervalMillis(),
- TimeUnit.MILLISECONDS);
- LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
- + aggregator.getSleepIntervalMillis() + " milliseconds.");
- } else {
- LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
new file mode 100644
index 0000000..9ebc64c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
+
+public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore {
+
+ static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class);
+ private final TimelineMetricConfiguration configuration;
+ private PhoenixHBaseAccessor hBaseAccessor;
+ private static volatile boolean isInitialized = false;
+ private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor();
+ private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>();
+ private TimelineMetricMetadataManager metricMetadataManager;
+ private Integer defaultTopNHostsLimit;
+ private MetricCollectorHAController haController;
+ private AmsKafkaProducer kafkaProducer = new AmsKafkaProducer("104.196.85.21:6667");
+
+ /**
+ * Construct the service.
+ *
+ */
+ public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) {
+ super(HBaseTimelineMetricsService.class.getName());
+ this.configuration = configuration;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ initializeSubsystem();
+ }
+
+ private synchronized void initializeSubsystem() {
+ if (!isInitialized) {
+ hBaseAccessor = new PhoenixHBaseAccessor(null);
+ // Initialize schema
+ hBaseAccessor.initMetricSchema();
+ // Initialize metadata from store
+ try {
+ metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
+ } catch (MalformedURLException | URISyntaxException e) {
+ throw new ExceptionInInitializerError("Unable to initialize metadata manager");
+ }
+ metricMetadataManager.initializeMetadata();
+ // Initialize policies before TTL update
+ hBaseAccessor.initPoliciesAndTTL();
+ // Start HA service
+ // Start the controller
+ if (!configuration.isDistributedCollectorModeDisabled()) {
+ haController = new MetricCollectorHAController(configuration);
+ try {
+ haController.initializeHAController();
+ } catch (Exception e) {
+ LOG.error(e);
+ throw new MetricsSystemInitializationException("Unable to " +
+ "initialize HA controller", e);
+ }
+ } else {
+ LOG.info("Distributed collector mode disabled");
+ }
+
+ //Initialize whitelisting & blacklisting if needed
+ TimelineMetricsFilter.initializeMetricFilter(configuration);
+
+ Configuration metricsConf = null;
+ try {
+ metricsConf = configuration.getMetricsConf();
+ } catch (Exception e) {
+ throw new ExceptionInInitializerError("Cannot initialize configuration.");
+ }
+
+ defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
+ if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
+ LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
+ }
+
+ // Start the cluster aggregator second
+ TimelineMetricAggregator secondClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
+ hBaseAccessor, metricsConf, metricMetadataManager, haController);
+ scheduleAggregatorThread(secondClusterAggregator);
+
+ // Start the minute cluster aggregator
+ TimelineMetricAggregator minuteClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
+ hBaseAccessor, metricsConf, haController);
+ scheduleAggregatorThread(minuteClusterAggregator);
+
+ // Start the hourly cluster aggregator
+ TimelineMetricAggregator hourlyClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
+ hBaseAccessor, metricsConf, haController);
+ scheduleAggregatorThread(hourlyClusterAggregator);
+
+ // Start the daily cluster aggregator
+ TimelineMetricAggregator dailyClusterAggregator =
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
+ hBaseAccessor, metricsConf, haController);
+ scheduleAggregatorThread(dailyClusterAggregator);
+
+ // Start the minute host aggregator
+ if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) {
+ LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector");
+ } else {
+ TimelineMetricAggregator minuteHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+ hBaseAccessor, metricsConf, haController);
+ scheduleAggregatorThread(minuteHostAggregator);
+ }
+
+ // Start the hourly host aggregator
+ TimelineMetricAggregator hourlyHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
+ hBaseAccessor, metricsConf, haController);
+ scheduleAggregatorThread(hourlyHostAggregator);
+
+ // Start the daily host aggregator
+ TimelineMetricAggregator dailyHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
+ hBaseAccessor, metricsConf, haController);
+ scheduleAggregatorThread(dailyHostAggregator);
+
+ if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
+ int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
+ int delay = configuration.getTimelineMetricsServiceWatcherDelay();
+ // Start the watchdog
+ watchdogExecutorService.scheduleWithFixedDelay(
+ new TimelineMetricStoreWatcher(this, configuration),
+ initDelay, delay, TimeUnit.SECONDS);
+ LOG.info("Started watchdog for timeline metrics store with initial " +
+ "delay = " + initDelay + ", delay = " + delay);
+ }
+
+ isInitialized = true;
+ }
+
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ @Override
+ public TimelineMetrics getTimelineMetrics(List<String> metricNames,
+ List<String> hostnames, String applicationId, String instanceId,
+ Long startTime, Long endTime, Precision precision, Integer limit,
+ boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException {
+
+ if (metricNames == null || metricNames.isEmpty()) {
+ throw new IllegalArgumentException("No metric name filter specified.");
+ }
+ if ((startTime == null && endTime != null)
+ || (startTime != null && endTime == null)) {
+ throw new IllegalArgumentException("Open ended query not supported ");
+ }
+ if (limit != null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT){
+ throw new IllegalArgumentException("Limit too big");
+ }
+
+ TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null;
+ if (!StringUtils.isEmpty(seriesAggregateFunction)) {
+ SeriesAggregateFunction func = SeriesAggregateFunction.getFunction(seriesAggregateFunction);
+ seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func);
+ }
+
+ Multimap<String, List<Function>> metricFunctions =
+ parseMetricNamesToAggregationFunctions(metricNames);
+
+ ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet()))
+ .hostnames(hostnames)
+ .appId(applicationId)
+ .instanceId(instanceId)
+ .startTime(startTime)
+ .endTime(endTime)
+ .precision(precision)
+ .limit(limit)
+ .grouped(groupedByHosts);
+
+ if (topNConfig != null) {
+ if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true.
+ TopNCondition.isTopNMetricCondition(metricNames, hostnames)) {
+ conditionBuilder.topN(topNConfig.getTopN());
+ conditionBuilder.isBottomN(topNConfig.getIsBottomN());
+ Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction());
+ Function function = new Function(readFunction, null);
+ conditionBuilder.topNFunction(function);
+ } else {
+ LOG.info("Invalid Input for TopN query. Ignoring TopN Request.");
+ }
+ } else if (startTime != null && hostnames != null && hostnames.size() > defaultTopNHostsLimit) {
+ // if (timeseries query AND hostnames passed AND size(hostnames) > limit)
+ LOG.info("Requesting data for more than " + defaultTopNHostsLimit + " Hosts. " +
+ "Defaulting to Top " + defaultTopNHostsLimit);
+ conditionBuilder.topN(defaultTopNHostsLimit);
+ conditionBuilder.isBottomN(false);
+ }
+
+ Condition condition = conditionBuilder.build();
+
+ TimelineMetrics metrics;
+
+ if (hostnames == null || hostnames.isEmpty()) {
+ metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions);
+ } else {
+ metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions);
+ }
+
+ metrics = postProcessMetrics(metrics);
+
+ if (metrics.getMetrics().size() == 0) {
+ return metrics;
+ }
+
+ return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics);
+ }
+
+ private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) {
+ List<TimelineMetric> metricsList = metrics.getMetrics();
+
+ for (TimelineMetric metric : metricsList){
+ String name = metric.getMetricName();
+ if (name.contains("._rate")){
+ updateValuesAsRate(metric.getMetricValues(), false);
+ } else if (name.contains("._diff")) {
+ updateValuesAsRate(metric.getMetricValues(), true);
+ }
+ }
+
+ return metrics;
+ }
+
+ private TimelineMetrics seriesAggregateMetrics(TimelineMetricsSeriesAggregateFunction seriesAggrFuncInstance,
+ TimelineMetrics metrics) {
+ if (seriesAggrFuncInstance != null) {
+ TimelineMetric appliedMetric = seriesAggrFuncInstance.apply(metrics);
+ metrics.setMetrics(Collections.singletonList(appliedMetric));
+ }
+ return metrics;
+ }
+
+ static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues, boolean isDiff) {
+ Long prevTime = null;
+ Double prevVal = null;
+ long step;
+ Double diff;
+
+ for(Iterator<Map.Entry<Long, Double>> it = metricValues.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<Long, Double> timeValueEntry = it.next();
+ Long currTime = timeValueEntry.getKey();
+ Double currVal = timeValueEntry.getValue();
+
+ if (prevTime != null) {
+ step = currTime - prevTime;
+ diff = currVal - prevVal;
+ Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
+ timeValueEntry.setValue(rate);
+ } else {
+ it.remove();
+ }
+
+ prevTime = currTime;
+ prevVal = currVal;
+ }
+
+ return metricValues;
+ }
+
+ static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) {
+ Multimap<String, List<Function>> metricsFunctions = ArrayListMultimap.create();
+
+ for (String metricName : metricNames){
+ Function function = Function.DEFAULT_VALUE_FUNCTION;
+ String cleanMetricName = metricName;
+
+ try {
+ function = Function.fromMetricName(metricName);
+ int functionStartIndex = metricName.indexOf("._");
+ if (functionStartIndex > 0) {
+ cleanMetricName = metricName.substring(0, functionStartIndex);
+ }
+ } catch (Function.FunctionFormatException ffe){
+ // unknown function so
+ // fallback to VALUE, and fullMetricName
+ }
+
+ List<Function> functionsList = new ArrayList<>();
+ functionsList.add(function);
+ metricsFunctions.put(cleanMetricName, functionsList);
+ }
+
+ return metricsFunctions;
+ }
+
+ @Override
+ public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException {
+ // Error indicated by the Sql exception
+ TimelinePutResponse response = new TimelinePutResponse();
+
+ try {
+ if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) {
+ kafkaProducer.sendMetrics(fromTimelineMetrics(metrics));
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error(e);
+ }
+ hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
+
+ return response;
+ }
+
+
+ private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) {
+ org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics();
+
+ List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>();
+ for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+ timelineMetricList.add(fromTimelineMetric(timelineMetric));
+ }
+ otherMetrics.setMetrics(timelineMetricList);
+ return otherMetrics;
+ }
+
+ private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) {
+
+ org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric();
+ otherMetric.setMetricValues(timelineMetric.getMetricValues());
+ otherMetric.setStartTime(timelineMetric.getStartTime());
+ otherMetric.setHostName(timelineMetric.getHostName());
+ otherMetric.setInstanceId(timelineMetric.getInstanceId());
+ otherMetric.setAppId(timelineMetric.getAppId());
+ otherMetric.setMetricName(timelineMetric.getMetricName());
+
+ return otherMetric;
+ }
+
+ @Override
+ public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
+ throws SQLException, IOException {
+ hBaseAccessor.insertContainerMetrics(metrics);
+ return new TimelinePutResponse();
+ }
+
+ @Override
+ public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException {
+ Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata =
+ metricMetadataManager.getMetadataCache();
+
+ boolean includeBlacklistedMetrics = StringUtils.isNotEmpty(query) && "all".equalsIgnoreCase(query);
+
+ // Group Metadata by AppId
+ Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>();
+ for (TimelineMetricMetadata metricMetadata : metadata.values()) {
+
+ if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) {
+ continue;
+ }
+ List<TimelineMetricMetadata> metadataList = metadataByAppId.get(metricMetadata.getAppId());
+ if (metadataList == null) {
+ metadataList = new ArrayList<>();
+ metadataByAppId.put(metricMetadata.getAppId(), metadataList);
+ }
+
+ metadataList.add(metricMetadata);
+ }
+
+ return metadataByAppId;
+ }
+
+ @Override
+ public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
+ return metricMetadataManager.getHostedAppsCache();
+ }
+
+ @Override
+ public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+ Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+ for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
+ aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
+ }
+ hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+
+ return new TimelinePutResponse();
+ }
+
+ @Override
+ public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
+ throws SQLException, IOException {
+
+ Map<String, Set<String>> hostedApps = metricMetadataManager.getHostedAppsCache();
+ Map<String, Set<String>> instanceHosts = metricMetadataManager.getHostedInstanceCache();
+ Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>();
+
+ if (MapUtils.isEmpty(instanceHosts)) {
+ Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
+ for (String host : hostedApps.keySet()) {
+ for (String app : hostedApps.get(host)) {
+ if (!appHostMap.containsKey(app)) {
+ appHostMap.put(app, new HashSet<String>());
+ }
+ appHostMap.get(app).add(host);
+ }
+ }
+ instanceAppHosts.put("", appHostMap);
+ } else {
+ for (String instance : instanceHosts.keySet()) {
+
+ if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) {
+ continue;
+ }
+ Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
+ instanceAppHosts.put(instance, appHostMap);
+
+ Set<String> hostsWithInstance = instanceHosts.get(instance);
+ for (String host : hostsWithInstance) {
+ for (String app : hostedApps.get(host)) {
+ if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) {
+ continue;
+ }
+
+ if (!appHostMap.containsKey(app)) {
+ appHostMap.put(app, new HashSet<String>());
+ }
+ appHostMap.get(app).add(host);
+ }
+ }
+ }
+ }
+
+ return instanceAppHosts;
+ }
+
+ @Override
+ public List<String> getLiveInstances() {
+
+ List<String> instances = null;
+ try {
+ if (haController == null) {
+ // Always return current host as live (embedded operation mode)
+ return Collections.singletonList(configuration.getInstanceHostnameFromEnv());
+ }
+ instances = haController.getLiveInstanceHostNames();
+ if (instances == null || instances.isEmpty()) {
+ // fallback
+ instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv());
+ }
+ } catch (UnknownHostException e) {
+ LOG.debug("Exception on getting hostname from env.", e);
+ }
+ return instances;
+ }
+
+ private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) {
+ if (!aggregator.isDisabled()) {
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName()));
+ }
+ }
+ );
+ scheduledExecutors.put(aggregator.getName(), executorService);
+ executorService.scheduleAtFixedRate(aggregator,
+ 0l,
+ aggregator.getSleepIntervalMillis(),
+ TimeUnit.MILLISECONDS);
+ LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
+ + aggregator.getSleepIntervalMillis() + " milliseconds.");
+ } else {
+ LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
deleted file mode 100644
index f035678..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import com.google.common.collect.Multimap;
-import junit.framework.Assert;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.SUM;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class HBaseTimelineMetricsServiceTest {
-
- public static final String MEM_METRIC = "mem";
- public static final String BYTES_IN_METRIC = "bytes_in";
- public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not._afunction";
-
- @Test
- public void testParseMetricNamesToAggregationFunctions() throws Exception {
- //giwen
- List<String> metricNames = Arrays.asList(
- MEM_METRIC + "._avg",
- MEM_METRIC + "._sum",
- MEM_METRIC + "._rate._avg",
- BYTES_IN_METRIC,
- BYTES_NOT_AFUNCTION_METRIC);
-
- //when
- Multimap<String, List<Function>> multimap =
- HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames);
-
- //then
- Assert.assertEquals(multimap.keySet().size(), 3);
- Assert.assertTrue(multimap.containsKey(MEM_METRIC));
- Assert.assertTrue(multimap.containsKey(BYTES_IN_METRIC));
- Assert.assertTrue(multimap.containsKey(BYTES_NOT_AFUNCTION_METRIC));
-
- List<List<Function>> metricEntry = (List<List<Function>>) multimap.get(MEM_METRIC);
- HashMap<String, List<Function>> mfm = new HashMap<String, List<Function>>();
- mfm.put(MEM_METRIC, metricEntry.get(0));
-
- assertThat(mfm.get(MEM_METRIC)).containsOnly(
- new Function(AVG, null));
-
- mfm = new HashMap<String, List<Function>>();
- mfm.put(MEM_METRIC, metricEntry.get(1));
- assertThat(mfm.get(MEM_METRIC)).containsOnly(
- new Function(SUM, null));
-
- mfm = new HashMap<String, List<Function>>();
- mfm.put(MEM_METRIC, metricEntry.get(2));
- assertThat(mfm.get(MEM_METRIC)).containsOnly(
- new Function(AVG, RATE));
-
- metricEntry = (List<List<Function>>) multimap.get(BYTES_IN_METRIC);
- mfm = new HashMap<String, List<Function>>();
- mfm.put(BYTES_IN_METRIC, metricEntry.get(0));
-
- assertThat(mfm.get(BYTES_IN_METRIC))
- .contains(Function.DEFAULT_VALUE_FUNCTION);
-
- metricEntry = (List<List<Function>>) multimap.get(BYTES_NOT_AFUNCTION_METRIC);
- mfm = new HashMap<String, List<Function>>();
- mfm.put(BYTES_NOT_AFUNCTION_METRIC, metricEntry.get(0));
-
- assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC))
- .contains(Function.DEFAULT_VALUE_FUNCTION);
-
- }
-
- @Test
- public void testRateCalculationOnMetricsWithEqualValues() throws Exception {
- Map<Long, Double> metricValues = new TreeMap<>();
- metricValues.put(1454016368371L, 1011.25);
- metricValues.put(1454016428371L, 1011.25);
- metricValues.put(1454016488371L, 1011.25);
- metricValues.put(1454016548371L, 1011.25);
- metricValues.put(1454016608371L, 1011.25);
- metricValues.put(1454016668371L, 1011.25);
- metricValues.put(1454016728371L, 1011.25);
-
- // Calculate rate
- Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false);
-
- // Make sure rate is zero
- for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) {
- Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey()
- + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue());
- }
- }
-
- @Test
- public void testDiffCalculation() throws Exception {
- Map<Long, Double> metricValues = new TreeMap<>();
- metricValues.put(1454016368371L, 1011.25);
- metricValues.put(1454016428371L, 1010.25);
- metricValues.put(1454016488371L, 1012.25);
- metricValues.put(1454016548371L, 1010.25);
- metricValues.put(1454016608371L, 1010.25);
-
- Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true);
-
- Assert.assertTrue(rates.size()==4);
- Assert.assertTrue(rates.containsValue(-1.0));
- Assert.assertTrue(rates.containsValue(2.0));
- Assert.assertTrue(rates.containsValue(0.0));
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/7e970233/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
new file mode 100644
index 0000000..f035678
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsServiceTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import com.google.common.collect.Multimap;
+import junit.framework.Assert;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.SUM;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class HBaseTimelineMetricsServiceTest {
+
+ public static final String MEM_METRIC = "mem";
+ public static final String BYTES_IN_METRIC = "bytes_in";
+ public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not._afunction";
+
+ @Test
+ public void testParseMetricNamesToAggregationFunctions() throws Exception {
+ //giwen
+ List<String> metricNames = Arrays.asList(
+ MEM_METRIC + "._avg",
+ MEM_METRIC + "._sum",
+ MEM_METRIC + "._rate._avg",
+ BYTES_IN_METRIC,
+ BYTES_NOT_AFUNCTION_METRIC);
+
+ //when
+ Multimap<String, List<Function>> multimap =
+ HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames);
+
+ //then
+ Assert.assertEquals(multimap.keySet().size(), 3);
+ Assert.assertTrue(multimap.containsKey(MEM_METRIC));
+ Assert.assertTrue(multimap.containsKey(BYTES_IN_METRIC));
+ Assert.assertTrue(multimap.containsKey(BYTES_NOT_AFUNCTION_METRIC));
+
+ List<List<Function>> metricEntry = (List<List<Function>>) multimap.get(MEM_METRIC);
+ HashMap<String, List<Function>> mfm = new HashMap<String, List<Function>>();
+ mfm.put(MEM_METRIC, metricEntry.get(0));
+
+ assertThat(mfm.get(MEM_METRIC)).containsOnly(
+ new Function(AVG, null));
+
+ mfm = new HashMap<String, List<Function>>();
+ mfm.put(MEM_METRIC, metricEntry.get(1));
+ assertThat(mfm.get(MEM_METRIC)).containsOnly(
+ new Function(SUM, null));
+
+ mfm = new HashMap<String, List<Function>>();
+ mfm.put(MEM_METRIC, metricEntry.get(2));
+ assertThat(mfm.get(MEM_METRIC)).containsOnly(
+ new Function(AVG, RATE));
+
+ metricEntry = (List<List<Function>>) multimap.get(BYTES_IN_METRIC);
+ mfm = new HashMap<String, List<Function>>();
+ mfm.put(BYTES_IN_METRIC, metricEntry.get(0));
+
+ assertThat(mfm.get(BYTES_IN_METRIC))
+ .contains(Function.DEFAULT_VALUE_FUNCTION);
+
+ metricEntry = (List<List<Function>>) multimap.get(BYTES_NOT_AFUNCTION_METRIC);
+ mfm = new HashMap<String, List<Function>>();
+ mfm.put(BYTES_NOT_AFUNCTION_METRIC, metricEntry.get(0));
+
+ assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC))
+ .contains(Function.DEFAULT_VALUE_FUNCTION);
+
+ }
+
+ @Test
+ public void testRateCalculationOnMetricsWithEqualValues() throws Exception {
+ Map<Long, Double> metricValues = new TreeMap<>();
+ metricValues.put(1454016368371L, 1011.25);
+ metricValues.put(1454016428371L, 1011.25);
+ metricValues.put(1454016488371L, 1011.25);
+ metricValues.put(1454016548371L, 1011.25);
+ metricValues.put(1454016608371L, 1011.25);
+ metricValues.put(1454016668371L, 1011.25);
+ metricValues.put(1454016728371L, 1011.25);
+
+ // Calculate rate
+ Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false);
+
+ // Make sure rate is zero
+ for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) {
+ Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey()
+ + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue());
+ }
+ }
+
+ @Test
+ public void testDiffCalculation() throws Exception {
+ Map<Long, Double> metricValues = new TreeMap<>();
+ metricValues.put(1454016368371L, 1011.25);
+ metricValues.put(1454016428371L, 1010.25);
+ metricValues.put(1454016488371L, 1012.25);
+ metricValues.put(1454016548371L, 1010.25);
+ metricValues.put(1454016608371L, 1010.25);
+
+ Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true);
+
+ Assert.assertTrue(rates.size()==4);
+ Assert.assertTrue(rates.containsValue(-1.0));
+ Assert.assertTrue(rates.containsValue(2.0));
+ Assert.assertTrue(rates.containsValue(0.0));
+ }
+}