You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2019/01/02 21:04:37 UTC
[ambari-metrics] branch master updated: [AMBARI-25078] Add metering
metrics to AMS Metric Monitor. (#13)
This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-metrics.git
The following commit(s) were added to refs/heads/master by this push:
new 011e39c [AMBARI-25078] Add metering metrics to AMS Metric Monitor. (#13)
011e39c is described below
commit 011e39c5b4f022b7a005fd8c8413f2e39f0c584f
Author: avijayanhwx <av...@hortonworks.com>
AuthorDate: Wed Jan 2 13:04:33 2019 -0800
[AMBARI-25078] Add metering metrics to AMS Metric Monitor. (#13)
---
ambari-metrics-assembly/pom.xml | 28 +++++++-
.../src/main/assembly/monitor.xml | 3 +
.../ambari-metrics/datasource.js | 47 ++++++++-----
.../sink/timeline/AggregatedMetricsPublisher.java | 5 +-
.../conf/unix/instance_type_provider_azure | 17 +++++
.../conf/unix/instance_type_provider_ec2 | 17 +++++
.../conf/unix/instance_type_provider_gce | 17 +++++
.../src/main/python/core/application_metric_map.py | 3 +-
.../src/main/python/core/config_reader.py | 15 +++++
.../src/main/python/core/controller.py | 2 +-
.../src/main/python/core/instance_type_provider.py | 76 ++++++++++++++++++++++
.../src/main/python/core/metering.py | 63 ++++++++++++++++++
.../src/main/python/core/metric_collector.py | 12 +++-
.../src/test/python/core/TestMetricCollector.py | 3 +-
.../conf/unix/metrics_whitelist | 3 +-
.../TimelineMetricClusterAggregator.java | 2 +-
.../aggregators/TimelineMetricHostAggregator.java | 2 +-
.../v2/TimelineMetricClusterAggregator.java | 2 +-
.../v2/TimelineMetricFilteringHostAggregator.java | 2 +-
.../v2/TimelineMetricHostAggregator.java | 2 +-
.../core/timeline/query/PhoenixTransactSQL.java | 6 +-
.../timeline/aggregators/ITMetricAggregator.java | 2 +-
22 files changed, 295 insertions(+), 34 deletions(-)
diff --git a/ambari-metrics-assembly/pom.xml b/ambari-metrics-assembly/pom.xml
index b76b928..0b17ea0 100644
--- a/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics-assembly/pom.xml
@@ -784,7 +784,33 @@
<filemode>755</filemode>
</mapper>
</data>
-
+ <data>
+ <src>${monitor.dir}/conf/unix/instance_type_provider_azure</src>
+ <type>file</type>
+ <mapper>
+ <type>perm</type>
+ <prefix>/etc/ambari-metrics-monitor/conf</prefix>
+ <filemode>755</filemode>
+ </mapper>
+ </data>
+ <data>
+ <src>${monitor.dir}/conf/unix/instance_type_provider_ec2</src>
+ <type>file</type>
+ <mapper>
+ <type>perm</type>
+ <prefix>/etc/ambari-metrics-monitor/conf</prefix>
+ <filemode>755</filemode>
+ </mapper>
+ </data>
+ <data>
+ <src>${monitor.dir}/conf/unix/instance_type_provider_gce</src>
+ <type>file</type>
+ <mapper>
+ <type>perm</type>
+ <prefix>/etc/ambari-metrics-monitor/conf</prefix>
+ <filemode>755</filemode>
+ </mapper>
+ </data>
<!-- Metric collector -->
<data>
diff --git a/ambari-metrics-assembly/src/main/assembly/monitor.xml b/ambari-metrics-assembly/src/main/assembly/monitor.xml
index 7d098e9..aa12bef 100644
--- a/ambari-metrics-assembly/src/main/assembly/monitor.xml
+++ b/ambari-metrics-assembly/src/main/assembly/monitor.xml
@@ -36,6 +36,9 @@
<includes>
<include>metric_groups.conf</include>
<include>metric_monitor.ini</include>
+ <include>instance_type_provider_azure</include>
+ <include>instance_type_provider_ec2</include>
+ <include>instance_type_provider_gce</include>
</includes>
</fileSet>
<fileSet>
diff --git a/ambari-metrics-grafana/ambari-metrics/datasource.js b/ambari-metrics-grafana/ambari-metrics/datasource.js
index 18e2709..6dc0446 100644
--- a/ambari-metrics-grafana/ambari-metrics/datasource.js
+++ b/ambari-metrics-grafana/ambari-metrics/datasource.js
@@ -112,21 +112,33 @@ define([
return $q.when(emptyData(target));
}
var series = [];
- var metricData = res.metrics[0].metrics;
- // Added hostname to legend for templated dashboards.
- var hostLegend = res.metrics[0].hostname ? ' on ' + res.metrics[0].hostname : '';
- var timeSeries = {};
- timeSeries = {
- target: alias + hostLegend,
- datapoints: []
- };
- for (var k in metricData) {
- if (metricData.hasOwnProperty(k)) {
- timeSeries.datapoints.push([metricData[k], (k - k % 1000)]);
- }
- }
- series.push(timeSeries);
- return $q.when({data: series});
+ var metricData = res.metrics;
+ _.map(metricData, function (data) {
+ // Added hostname to legend for templated dashboards.
+ var hostLegend = data.hostname ? ' on ' + data.hostname : '';
+ var alias = target.alias ? target.alias : target.metric;
+ if(!_.isEmpty(templateSrv.variables) && templateSrv.variables[0].query === "yarnqueues") {
+ alias = alias + ' on ' + target.qmetric; }
+ if(!_.isEmpty(templateSrv.variables) && templateSrv.variables[0].query === "kafka-topics") {
+ alias = alias + ' on ' + target.kbTopic; }
+ if (!alias.includes("%") || !data.metricname.includes('live_hosts')) {
+ if (!alias || alias.includes("%")) {
+ alias = data.metricname;
+ }
+ var timeSeries = {};
+ timeSeries = {
+ target: alias + hostLegend,
+ datapoints: []
+ };
+ for (var k in data.metrics) {
+ if (data.metrics.hasOwnProperty(k)) {
+ timeSeries.datapoints.push([data.metrics[k], (k - k % 1000)]);
+ }
+ }
+ series.push(timeSeries);
+ }
+ });
+ return $q.when({data: series});
};
};
// To speed up querying on templatized dashboards.
@@ -200,11 +212,12 @@ define([
aliasSuffix = '';
}
if (data.appid.indexOf('ambari_server') === 0) {
- alias = data.metricname;
aliasSuffix = '';
}
+ if (!alias || alias.includes("%")) {
+ alias = data.metricname;
+ }
timeSeries = {
- target: alias + aliasSuffix,
datapoints: []
};
for (var k in data.metrics) {
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
index fa0c8fb..2b20379 100644
--- a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
@@ -73,6 +73,7 @@ public class AggregatedMetricsPublisher extends AbstractMetricPublisher {
double max = Integer.MIN_VALUE;
double min = Integer.MAX_VALUE;
int count = 0;
+ TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().iterator().next());
for (TimelineMetric metric : metrics.getMetrics()) {
for (Double value : metric.getMetricValues().values()) {
sum+=value;
@@ -80,8 +81,10 @@ public class AggregatedMetricsPublisher extends AbstractMetricPublisher {
min = Math.min(min, value);
count++;
}
+ if (metric.getStartTime() > tmpMetric.getStartTime()) {
+ tmpMetric.setStartTime(metric.getStartTime());
+ }
}
- TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
tmpMetric.setMetricValues(new TreeMap<Long, Double>());
metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
}
diff --git a/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_azure b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_azure
new file mode 100644
index 0000000..3c8ec20
--- /dev/null
+++ b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_azure
@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific
+
+curl --silent -H Metadata:true "http://169.254.169.254/metadata/instance?api-version=2017-12-01" | grep -Po '"vmSize":.*?[^\\]",' | cut -d':' -f2 | sed 's/,/ /g' | sed 's/"//g'
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_ec2 b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_ec2
new file mode 100644
index 0000000..7e89f9e
--- /dev/null
+++ b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_ec2
@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific
+
+curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document | grep 'instanceType' | awk '{ print $3 }' | sed 's/,/ /g' | sed 's/"//g'
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_gce b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_gce
new file mode 100644
index 0000000..6c237b9
--- /dev/null
+++ b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_gce
@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific
+
+curl --silent -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/machine-type" | awk -F'/' '{print $4}'
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py b/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
index bd957a0..59d4af0 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
@@ -88,6 +88,7 @@ class ApplicationMetricMap:
pass
for appId, metrics in local_metric_map.iteritems():
+ current_app_id = "HOST" if "HOST" in appId else appId
for metricId, metricData in dict(metrics).iteritems():
# Create a timeline metric object
result_instanceid = ""
@@ -96,7 +97,7 @@ class ApplicationMetricMap:
timeline_metric = {
"hostname" : self.hostname,
"metricname" : metricId,
- "appid" : "HOST",
+ "appid" : current_app_id,
"instanceid" : result_instanceid,
"starttime" : self.get_start_time(appId, metricId),
"metrics" : self.align_values_by_minute_mark(appId, metricId, metricData) if clear_once_flattened else metricData
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 721a544..18ade5b 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -255,6 +255,21 @@ class Configuration:
hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor")
return hosts
+ def is_metering_enabled(self):
+ return "true" == str(self.get("metering", "metering_enabled", "false")).lower()
+
+ def get_metering_appId(self):
+ return self.get("metering", "metering_appId", "metering")
+
+ def get_metering_metrics(self):
+ return self.get("metering", "metering_metrics", "").split(',')
+
+ def get_instance_type_script(self):
+ return self.get("metering", "instance_type_script", "").split(',')
+
+ def get_provider_type(self):
+ return self.get("metering", "host_provider_type", None)
+
def ams_monitor_log_file(self):
"""
:returns the log file
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics-host-monitoring/src/main/python/core/controller.py
index d161269..080eaca 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/controller.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/controller.py
@@ -47,7 +47,7 @@ class Controller(threading.Thread):
self.application_metric_map = ApplicationMetricMap(hostinfo.get_hostname(),
hostinfo.get_ip_address())
self.event_queue = Queue(config.get_max_queue_size())
- self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map, hostinfo)
+ self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map, hostinfo, config)
self.sleep_interval = config.get_collector_sleep_interval()
self._stop_handler = stop_handler
self.initialize_events_cache()
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/instance_type_provider.py b/ambari-metrics-host-monitoring/src/main/python/core/instance_type_provider.py
new file mode 100644
index 0000000..2e0a321
--- /dev/null
+++ b/ambari-metrics-host-monitoring/src/main/python/core/instance_type_provider.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import subprocess
+import sys
+
+logger = logging.getLogger()
+
+class HostInstanceTypeProvider:
+
+ DEFAULT_INSTANCE_TYPE = "custom"
+ KNOWN_PROVIDER_SCRIPT_PREFIX = "instance_type_provider_"
+ KNOWN_PROVIDER_SCRIPTS = dict()
+
+ def __init__(self, config):
+
+ self.KNOWN_PROVIDER_SCRIPTS['google'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "gce"
+ self.KNOWN_PROVIDER_SCRIPTS['microsoft'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "azure"
+ self.KNOWN_PROVIDER_SCRIPTS['xen'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "ec2"
+
+ self.provider_type = config.get_provider_type()
+ logger.info("Provider type {0}".format(self.provider_type))
+
+ script = self.get_script_for_provider(self.provider_type)
+ logger.info("Script for provider {0}".format(script))
+
+ self.instance_type_script = config.get_instance_type_script()
+ logger.info("Custom Instance Type Script {0}".format(self.instance_type_script))
+
+ if script:
+ self.instance_type = self.get_instance_type_from_script(script)
+ elif self.instance_type_script:
+ self.instance_type = self.get_instance_type_from_script(self.instance_type_script)
+ else:
+ self.instance_type = self.DEFAULT_INSTANCE_TYPE
+ logger.info("Instance type {0}".format(self.instance_type))
+
+ def get_instance_type(self):
+ return self.instance_type
+
+ def get_script_for_provider(self, provider_type):
+ p_type = str(provider_type).lower()
+ if provider_type and p_type in self.KNOWN_PROVIDER_SCRIPTS:
+ return self.KNOWN_PROVIDER_SCRIPTS[p_type]
+ return None
+
+ def get_instance_type_from_script(self, script):
+ instance_type = self.DEFAULT_INSTANCE_TYPE
+ if script:
+ try:
+ osStat = subprocess.Popen([script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, err = osStat.communicate()
+ if 0 == osStat.returncode and 0 != len(out.strip()):
+ instance_type = out.strip()
+ logger.info("Read instance_type '{0}' using script '{1}'".format(instance_type, script))
+ except:
+ logger.warn("Unexpected error while retrieving instance_type: '{0}'".format(sys.exc_info()))
+ return instance_type
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/metering.py b/ambari-metrics-host-monitoring/src/main/python/core/metering.py
new file mode 100644
index 0000000..bac71e6
--- /dev/null
+++ b/ambari-metrics-host-monitoring/src/main/python/core/metering.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import time
+import json
+from instance_type_provider import HostInstanceTypeProvider
+
+logger = logging.getLogger()
+
+class MeteringMetricHandler:
+
+ METERING_ALIVE_TIME_METRIC_SUFFIX = "lastKnownAliveTime"
+
+ ## At startup,
+ def __init__(self, config):
+ self.appId = config.get_metering_appId()
+ self.instance_type_metric_appId = config.get_metering_appId() + "_instance_type"
+ self.metering_enabled = config.is_metering_enabled()
+ self.hostname = config.get_hostname_config()
+ self.instance_id = config.get_instanceid()
+ self.metering_metric_list = config.get_metering_metrics()
+ self.start_ts = int(round(time.time() * 1000))
+ if self.metering_enabled:
+ logger.info("Metering started with: appId = {0}, metering_metric_list = {1}, start time key = {2}"
+ .format(self.appId, self.metering_metric_list, self.start_ts))
+ self.instance_type_provider = HostInstanceTypeProvider(config)
+ self.instance_type = self.instance_type_provider.instance_type
+ self.metering_metric_key_prefix = self.hostname + "~" + self.instance_type + "~" + str(self.start_ts)
+ pass
+
+ # Metering Metrics
+ def get_metering_metrics(self, metrics):
+ metering_metrics = {}
+ curr_time = int(round(time.time() * 1000))
+ for metric_name, value in metrics.iteritems():
+ if metric_name in self.metering_metric_list:
+ end_time_metric_key = self.metering_metric_key_prefix + "~" + metric_name + "~" + str(value) + "~" + self.METERING_ALIVE_TIME_METRIC_SUFFIX
+ metering_metrics[end_time_metric_key] = curr_time
+
+ return metering_metrics
+
+ # Instance Type Metrics
+ def get_instance_type_metrics(self):
+ metering_metrics = {self.instance_type: 1}
+ return metering_metrics
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/metric_collector.py b/ambari-metrics-host-monitoring/src/main/python/core/metric_collector.py
index 84a4d76..f728c2d 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/metric_collector.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/metric_collector.py
@@ -20,8 +20,8 @@ limitations under the License.
import logging
from time import time
-from host_info import HostInfo
from event_definition import HostMetricCollectEvent, ProcessMetricCollectEvent
+from metering import MeteringMetricHandler
logger = logging.getLogger()
@@ -34,10 +34,12 @@ class MetricsCollector():
not required if Timer class is used for metric groups.
"""
- def __init__(self, emit_queue, application_metric_map, host_info):
+ def __init__(self, emit_queue, application_metric_map, host_info, config):
self.emit_queue = emit_queue
self.application_metric_map = application_metric_map
self.host_info = host_info
+ self.metering_enabled = config.is_metering_enabled()
+ self.metering_handler = MeteringMetricHandler(config)
pass
def process_event(self, event):
@@ -86,6 +88,12 @@ class MetricsCollector():
if metrics:
self.application_metric_map.put_metric(DEFAULT_HOST_APP_ID, metrics, startTime)
+ if self.metering_enabled:
+ metering_metrics = self.metering_handler.get_metering_metrics(metrics)
+ self.application_metric_map.put_metric(self.metering_handler.appId, metering_metrics, startTime)
+
+ instance_type_metrics = self.metering_handler.get_instance_type_metrics()
+ self.application_metric_map.put_metric(self.metering_handler.instance_type_metric_appId, instance_type_metrics, startTime)
pass
def process_process_collection_event(self, event):
diff --git a/ambari-metrics-host-monitoring/src/test/python/core/TestMetricCollector.py b/ambari-metrics-host-monitoring/src/test/python/core/TestMetricCollector.py
index 11a85eb..64df3e6 100644
--- a/ambari-metrics-host-monitoring/src/test/python/core/TestMetricCollector.py
+++ b/ambari-metrics-host-monitoring/src/test/python/core/TestMetricCollector.py
@@ -25,6 +25,7 @@ from core.application_metric_map import ApplicationMetricMap
from core.metric_collector import MetricsCollector
from core.event_definition import HostMetricCollectEvent
from core.host_info import HostInfo
+from core.config_reader import Configuration
logger = logging.getLogger()
@@ -36,7 +37,7 @@ class TestMetricCollector(TestCase):
amm_mock.return_value = None
host_info_mock.return_value = {'metric_name' : 'metric_value'}
- metric_collector = MetricsCollector(None, amm_mock, host_info_mock)
+ metric_collector = MetricsCollector(None, amm_mock, host_info_mock, Configuration())
group_config = {'collect_every' : 1, 'metrics' : 'cpu'}
diff --git a/ambari-metrics-timelineservice/conf/unix/metrics_whitelist b/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
index fd03d6e..31dc255 100644
--- a/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
+++ b/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
@@ -651,4 +651,5 @@ yarn.QueueMetrics.Queue=root.running_60
yarn.TimelineDataManagerMetrics.GetEntitiesTimeAvgTime
yarn.TimelineDataManagerMetrics.GetEntitiesTotal
yarn.TimelineDataManagerMetrics.PostEntitiesTimeAvgTime
-yarn.TimelineDataManagerMetrics.PostEntitiesTotal
\ No newline at end of file
+yarn.TimelineDataManagerMetrics.PostEntitiesTotal
+._p_*lastKnownAliveTime
\ No newline at end of file
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
index 9753f89..44e714b 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -81,7 +81,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
@Override
protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
- Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime - 1000l);
LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
hBaseAccessor.saveClusterAggregateRecordsSecond(hostAggregateMap, outputTableName);
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
index a9ee385..974dbb9 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -63,7 +63,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
@Override
protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
- Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime - 1000l);
LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName);
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
index 9c255e7..4a9a6be 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -74,7 +74,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
*/
condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL,
- outputTableName, endTime, aggregateColumnName, tableName,
+ outputTableName, endTime - 1000l, aggregateColumnName, tableName,
getDownsampledMetricSkipClause(), startTime, endTime));
if (LOG.isDebugEnabled()) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
index 72d7424..1077ff8 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
@@ -83,7 +83,7 @@ public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAgg
condition.setDoUpdate(true);
condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
- outputTableName, endTime, tableName,
+ outputTableName, endTime - 1000l, tableName,
getDownsampledMetricSkipClause() + getIncludedUuidsClause(uuids), startTime, endTime));
if (LOG.isDebugEnabled()) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
index f8757a4..5779602 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -65,7 +65,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
condition.setDoUpdate(true);
condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
- outputTableName, endTime, tableName,
+ outputTableName, endTime - 1000l, tableName,
getDownsampledMetricSkipClause(), startTime, endTime));
if (LOG.isDebugEnabled()) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
index 092da51..3a98804 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
@@ -389,7 +389,7 @@ public class PhoenixTransactSQL {
"INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " +
"SELECT UUID, %s AS SERVER_TIME, " +
"SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
- "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s GROUP BY UUID";
+ "FROM %s WHERE%s SERVER_TIME >= %s AND SERVER_TIME < %s GROUP BY UUID";
/**
* Downsample host metrics.
@@ -407,8 +407,8 @@ public class PhoenixTransactSQL {
*/
public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT " +
"INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " +
- "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " +
- "SERVER_TIME <= %s GROUP BY UUID";
+ "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME >= %s AND " +
+ "SERVER_TIME < %s GROUP BY UUID";
/**
* Downsample cluster metrics.
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
index 08c06a9..6d0749f 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
@@ -304,7 +304,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
"host", null);
long endTime = startTime + 1000 * 60 * 4;
- boolean success = aggregatorMinute.doWork(startTime - 1, endTime);
+ boolean success = aggregatorMinute.doWork(startTime - 1, endTime + 1);
assertTrue(success);
Condition condition = new DefaultCondition(uuids, metricNames, Collections.singletonList("local"), "host", null, startTime,