You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by GitBox <gi...@apache.org> on 2019/01/02 21:04:35 UTC

[GitHub] avijayanhwx closed pull request #13: [AMBARI-25078] Add metering metrics to AMS Metric Monitor.

avijayanhwx closed pull request #13: [AMBARI-25078] Add metering metrics to AMS Metric Monitor.
URL: https://github.com/apache/ambari-metrics/pull/13
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ambari-metrics-assembly/pom.xml b/ambari-metrics-assembly/pom.xml
index b76b928..0b17ea0 100644
--- a/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics-assembly/pom.xml
@@ -784,7 +784,33 @@
                     <filemode>755</filemode>
                   </mapper>
                 </data>
-
+                <data>
+                  <src>${monitor.dir}/conf/unix/instance_type_provider_azure</src>
+                  <type>file</type>
+                  <mapper>
+                    <type>perm</type>
+                    <prefix>/etc/ambari-metrics-monitor/conf</prefix>
+                    <filemode>755</filemode>
+                  </mapper>
+                </data>
+                <data>
+                  <src>${monitor.dir}/conf/unix/instance_type_provider_ec2</src>
+                  <type>file</type>
+                  <mapper>
+                    <type>perm</type>
+                    <prefix>/etc/ambari-metrics-monitor/conf</prefix>
+                    <filemode>755</filemode>
+                  </mapper>
+                </data>
+                <data>
+                  <src>${monitor.dir}/conf/unix/instance_type_provider_gce</src>
+                  <type>file</type>
+                  <mapper>
+                    <type>perm</type>
+                    <prefix>/etc/ambari-metrics-monitor/conf</prefix>
+                    <filemode>755</filemode>
+                  </mapper>
+                </data>
                 <!-- Metric collector -->
 
                 <data>
diff --git a/ambari-metrics-assembly/src/main/assembly/monitor.xml b/ambari-metrics-assembly/src/main/assembly/monitor.xml
index 7d098e9..aa12bef 100644
--- a/ambari-metrics-assembly/src/main/assembly/monitor.xml
+++ b/ambari-metrics-assembly/src/main/assembly/monitor.xml
@@ -36,6 +36,9 @@
       <includes>
         <include>metric_groups.conf</include>
         <include>metric_monitor.ini</include>
+        <include>instance_type_provider_azure</include>
+        <include>instance_type_provider_ec2</include>
+        <include>instance_type_provider_gce</include>
       </includes>
     </fileSet>
     <fileSet>
diff --git a/ambari-metrics-grafana/ambari-metrics/datasource.js b/ambari-metrics-grafana/ambari-metrics/datasource.js
index 18e2709..6dc0446 100644
--- a/ambari-metrics-grafana/ambari-metrics/datasource.js
+++ b/ambari-metrics-grafana/ambari-metrics/datasource.js
@@ -112,21 +112,33 @@ define([
               return $q.when(emptyData(target));
             }
             var series = [];
-            var metricData = res.metrics[0].metrics;
-            // Added hostname to legend for templated dashboards.
-            var hostLegend = res.metrics[0].hostname ? ' on ' + res.metrics[0].hostname : '';
-            var timeSeries = {};
-            timeSeries = {
-              target: alias + hostLegend,
-              datapoints: []
-            };
-            for (var k in metricData) {
-              if (metricData.hasOwnProperty(k)) {
-                timeSeries.datapoints.push([metricData[k], (k - k % 1000)]);
-              }
-            }
-            series.push(timeSeries);
-            return $q.when({data: series});
+            var metricData = res.metrics;
+            _.map(metricData, function (data) {
+              // Added hostname to legend for templated dashboards.
+              var hostLegend = data.hostname ? ' on ' + data.hostname : '';
+              var alias = target.alias ? target.alias : target.metric;
+              if(!_.isEmpty(templateSrv.variables) && templateSrv.variables[0].query === "yarnqueues") {
+                alias = alias + ' on ' + target.qmetric; }
+              if(!_.isEmpty(templateSrv.variables) && templateSrv.variables[0].query === "kafka-topics") {
+                alias = alias + ' on ' + target.kbTopic; }
+              if (!alias.includes("%") || !data.metricname.includes('live_hosts')) {
+                if (!alias || alias.includes("%")) {
+                  alias = data.metricname;
+                }
+              var timeSeries = {};
+              timeSeries = {
+                target: alias + hostLegend,
+                datapoints: []
+                };
+                for (var k in data.metrics) {
+                  if (data.metrics.hasOwnProperty(k)) {
+                    timeSeries.datapoints.push([data.metrics[k], (k - k % 1000)]);
+                  }
+                }
+                series.push(timeSeries);
+                }
+               });
+               return $q.when({data: series});
           };
         };
         // To speed up querying on templatized dashboards.
@@ -200,11 +212,12 @@ define([
                 aliasSuffix = '';
               }
               if (data.appid.indexOf('ambari_server') === 0) {
-                alias = data.metricname;
                 aliasSuffix = '';
               }
+              if (!alias || alias.includes("%")) {
+                alias = data.metricname;
+              }
               timeSeries = {
-                target: alias + aliasSuffix,
                 datapoints: []
               };
               for (var k in data.metrics) {
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
index fa0c8fb..2b20379 100644
--- a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
@@ -73,6 +73,7 @@ protected String processMetrics(Map<String, TimelineMetrics> metricForAggregatio
             double max = Integer.MIN_VALUE;
             double min = Integer.MAX_VALUE;
             int count = 0;
+            TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().iterator().next());
             for (TimelineMetric metric : metrics.getMetrics()) {
                 for (Double value : metric.getMetricValues().values()) {
                     sum+=value;
@@ -80,8 +81,10 @@ protected String processMetrics(Map<String, TimelineMetrics> metricForAggregatio
                     min = Math.min(min, value);
                     count++;
                 }
+                if (metric.getStartTime() > tmpMetric.getStartTime()) {
+                    tmpMetric.setStartTime(metric.getStartTime());
+                }
             }
-            TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
             tmpMetric.setMetricValues(new TreeMap<Long, Double>());
             metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
         }
diff --git a/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_azure b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_azure
new file mode 100644
index 0000000..3c8ec20
--- /dev/null
+++ b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_azure
@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific
+
+curl --silent -H Metadata:true "http://169.254.169.254/metadata/instance?api-version=2017-12-01" | grep -Po '"vmSize":.*?[^\\]",' | cut -d':' -f2 | sed 's/,/ /g' | sed 's/"//g'
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_ec2 b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_ec2
new file mode 100644
index 0000000..7e89f9e
--- /dev/null
+++ b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_ec2
@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific
+
+curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document | grep 'instanceType' | awk '{ print $3 }' | sed 's/,/ /g' | sed 's/"//g'
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_gce b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_gce
new file mode 100644
index 0000000..6c237b9
--- /dev/null
+++ b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_gce
@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific
+
+curl --silent -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/machine-type" | awk -F'/' '{print $4}'
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py b/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
index bd957a0..59d4af0 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
@@ -88,6 +88,7 @@ def flatten(self, application_id = None, clear_once_flattened = False, set_insta
       pass
   
       for appId, metrics in local_metric_map.iteritems():
+        current_app_id = "HOST" if "HOST" in appId else appId
         for metricId, metricData in dict(metrics).iteritems():
           # Create a timeline metric object
           result_instanceid = ""
@@ -96,7 +97,7 @@ def flatten(self, application_id = None, clear_once_flattened = False, set_insta
           timeline_metric = {
             "hostname" : self.hostname,
             "metricname" : metricId,
-            "appid" : "HOST",
+            "appid" : current_app_id,
             "instanceid" : result_instanceid,
             "starttime" : self.get_start_time(appId, metricId),
             "metrics" : self.align_values_by_minute_mark(appId, metricId, metricData) if clear_once_flattened else metricData
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 721a544..18ade5b 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -255,6 +255,21 @@ def ams_monitor_log_dir(self):
     hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor")
     return hosts
 
+  def is_metering_enabled(self):
+    return "true" == str(self.get("metering", "metering_enabled", "false")).lower()
+
+  def get_metering_appId(self):
+    return self.get("metering", "metering_appId", "metering")
+
+  def get_metering_metrics(self):
+    return self.get("metering", "metering_metrics", "").split(',')
+
+  def get_instance_type_script(self):
+    return self.get("metering", "instance_type_script", "").split(',')
+
+  def get_provider_type(self):
+    return self.get("metering", "host_provider_type", None)
+
   def ams_monitor_log_file(self):
     """
     :returns the log file
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics-host-monitoring/src/main/python/core/controller.py
index d161269..080eaca 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/controller.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/controller.py
@@ -47,7 +47,7 @@ def __init__(self, config, stop_handler):
     self.application_metric_map = ApplicationMetricMap(hostinfo.get_hostname(),
                                                        hostinfo.get_ip_address())
     self.event_queue = Queue(config.get_max_queue_size())
-    self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map, hostinfo)
+    self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map, hostinfo, config)
     self.sleep_interval = config.get_collector_sleep_interval()
     self._stop_handler = stop_handler
     self.initialize_events_cache()
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/instance_type_provider.py b/ambari-metrics-host-monitoring/src/main/python/core/instance_type_provider.py
new file mode 100644
index 0000000..2e0a321
--- /dev/null
+++ b/ambari-metrics-host-monitoring/src/main/python/core/instance_type_provider.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import subprocess
+import sys
+
+logger = logging.getLogger()
+
+class HostInstanceTypeProvider:
+
+  DEFAULT_INSTANCE_TYPE = "custom"
+  KNOWN_PROVIDER_SCRIPT_PREFIX = "instance_type_provider_"
+  KNOWN_PROVIDER_SCRIPTS = dict()
+
+  def __init__(self, config):
+
+    self.KNOWN_PROVIDER_SCRIPTS['google'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "gce"
+    self.KNOWN_PROVIDER_SCRIPTS['microsoft'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "azure"
+    self.KNOWN_PROVIDER_SCRIPTS['xen'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "ec2"
+
+    self.provider_type = config.get_provider_type()
+    logger.info("Provider type {0}".format(self.provider_type))
+
+    script = self.get_script_for_provider(self.provider_type)
+    logger.info("Script for provider {0}".format(script))
+
+    self.instance_type_script = config.get_instance_type_script()
+    logger.info("Custom Instance Type Script {0}".format(self.instance_type_script))
+
+    if script:
+      self.instance_type = self.get_instance_type_from_script(script)
+    elif self.instance_type_script:
+      self.instance_type = self.get_instance_type_from_script(self.instance_type_script)
+    else:
+      self.instance_type = self.DEFAULT_INSTANCE_TYPE
+    logger.info("Instance type {0}".format(self.instance_type))
+
+  def get_instance_type(self):
+    return self.instance_type
+
+  def get_script_for_provider(self, provider_type):
+    p_type = str(provider_type).lower()
+    if provider_type and p_type in self.KNOWN_PROVIDER_SCRIPTS:
+      return self.KNOWN_PROVIDER_SCRIPTS[p_type]
+    return None
+
+  def get_instance_type_from_script(self, script):
+    instance_type = self.DEFAULT_INSTANCE_TYPE
+    if script:
+      try:
+        osStat = subprocess.Popen([script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        out, err = osStat.communicate()
+        if 0 == osStat.returncode and 0 != len(out.strip()):
+          instance_type = out.strip()
+          logger.info("Read instance_type '{0}' using script '{1}'".format(instance_type, script))
+      except:
+        logger.warn("Unexpected error while retrieving instance_type: '{0}'".format(sys.exc_info()))
+    return instance_type
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/metering.py b/ambari-metrics-host-monitoring/src/main/python/core/metering.py
new file mode 100644
index 0000000..bac71e6
--- /dev/null
+++ b/ambari-metrics-host-monitoring/src/main/python/core/metering.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import time
+import json
+from instance_type_provider import HostInstanceTypeProvider
+
+logger = logging.getLogger()
+
+class MeteringMetricHandler:
+
+  METERING_ALIVE_TIME_METRIC_SUFFIX = "lastKnownAliveTime"
+
+  ## At startup,
+  def __init__(self, config):
+    self.appId = config.get_metering_appId()
+    self.instance_type_metric_appId = config.get_metering_appId() + "_instance_type"
+    self.metering_enabled = config.is_metering_enabled()
+    self.hostname = config.get_hostname_config()
+    self.instance_id = config.get_instanceid()
+    self.metering_metric_list = config.get_metering_metrics()
+    self.start_ts = int(round(time.time() * 1000))
+    if self.metering_enabled:
+      logger.info("Metering started with: appId = {0}, metering_metric_list = {1}, start time key = {2}"
+                   .format(self.appId, self.metering_metric_list, self.start_ts))
+      self.instance_type_provider = HostInstanceTypeProvider(config)
+      self.instance_type = self.instance_type_provider.instance_type
+      self.metering_metric_key_prefix = self.hostname + "~" + self.instance_type + "~" + str(self.start_ts)
+    pass
+
+  # Metering Metrics
+  def get_metering_metrics(self, metrics):
+    metering_metrics = {}
+    curr_time = int(round(time.time() * 1000))
+    for metric_name, value in metrics.iteritems():
+      if metric_name in self.metering_metric_list:
+        end_time_metric_key = self.metering_metric_key_prefix + "~" + metric_name + "~" + str(value) + "~" + self.METERING_ALIVE_TIME_METRIC_SUFFIX
+        metering_metrics[end_time_metric_key] = curr_time
+
+    return metering_metrics
+
+    # Instance Type Metrics
+  def get_instance_type_metrics(self):
+    metering_metrics = {self.instance_type: 1}
+    return metering_metrics
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/metric_collector.py b/ambari-metrics-host-monitoring/src/main/python/core/metric_collector.py
index 84a4d76..f728c2d 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/metric_collector.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/metric_collector.py
@@ -20,8 +20,8 @@
 
 import logging
 from time import time
-from host_info import HostInfo
 from event_definition import HostMetricCollectEvent, ProcessMetricCollectEvent
+from metering import MeteringMetricHandler
 
 logger = logging.getLogger()
 
@@ -34,10 +34,12 @@ class MetricsCollector():
   not required if Timer class is used for metric groups.
   """
 
-  def __init__(self, emit_queue, application_metric_map, host_info):
+  def __init__(self, emit_queue, application_metric_map, host_info, config):
     self.emit_queue = emit_queue
     self.application_metric_map = application_metric_map
     self.host_info = host_info
+    self.metering_enabled = config.is_metering_enabled()
+    self.metering_handler = MeteringMetricHandler(config)
   pass
 
   def process_event(self, event):
@@ -86,6 +88,12 @@ def process_host_collection_event(self, event):
 
     if metrics:
       self.application_metric_map.put_metric(DEFAULT_HOST_APP_ID, metrics, startTime)
+      if self.metering_enabled:
+        metering_metrics = self.metering_handler.get_metering_metrics(metrics)
+        self.application_metric_map.put_metric(self.metering_handler.appId, metering_metrics, startTime)
+
+        instance_type_metrics = self.metering_handler.get_instance_type_metrics()
+        self.application_metric_map.put_metric(self.metering_handler.instance_type_metric_appId, instance_type_metrics, startTime)
     pass
 
   def process_process_collection_event(self, event):
diff --git a/ambari-metrics-host-monitoring/src/test/python/core/TestMetricCollector.py b/ambari-metrics-host-monitoring/src/test/python/core/TestMetricCollector.py
index 11a85eb..64df3e6 100644
--- a/ambari-metrics-host-monitoring/src/test/python/core/TestMetricCollector.py
+++ b/ambari-metrics-host-monitoring/src/test/python/core/TestMetricCollector.py
@@ -25,6 +25,7 @@
 from core.metric_collector import MetricsCollector
 from core.event_definition import HostMetricCollectEvent
 from core.host_info import HostInfo
+from core.config_reader import Configuration
 
 logger = logging.getLogger()
 
@@ -36,7 +37,7 @@ def testCollectEvent(self, amm_mock, host_info_mock):
     amm_mock.return_value = None
     host_info_mock.return_value = {'metric_name' : 'metric_value'}
 
-    metric_collector = MetricsCollector(None, amm_mock, host_info_mock)
+    metric_collector = MetricsCollector(None, amm_mock, host_info_mock, Configuration())
 
     group_config = {'collect_every' : 1, 'metrics' : 'cpu'}
     
diff --git a/ambari-metrics-timelineservice/conf/unix/metrics_whitelist b/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
index fd03d6e..31dc255 100644
--- a/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
+++ b/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
@@ -651,4 +651,5 @@ yarn.QueueMetrics.Queue=root.running_60
 yarn.TimelineDataManagerMetrics.GetEntitiesTimeAvgTime
 yarn.TimelineDataManagerMetrics.GetEntitiesTotal
 yarn.TimelineDataManagerMetrics.PostEntitiesTimeAvgTime
-yarn.TimelineDataManagerMetrics.PostEntitiesTotal
\ No newline at end of file
+yarn.TimelineDataManagerMetrics.PostEntitiesTotal
+._p_*lastKnownAliveTime
\ No newline at end of file
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
index 9753f89..44e714b 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -81,7 +81,7 @@ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
 
   @Override
   protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
-    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
+    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime - 1000l);
 
     LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
     hBaseAccessor.saveClusterAggregateRecordsSecond(hostAggregateMap, outputTableName);
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
index a9ee385..974dbb9 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -63,7 +63,7 @@ public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
   @Override
   protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
 
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime - 1000l);
 
     LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
     hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName);
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
index 9c255e7..4a9a6be 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -74,7 +74,7 @@ UPSERT INTO METRIC_AGGREGATE_HOURLY (METRIC_NAME, APP_ID, INSTANCE_ID,
      */
 
     condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL,
-      outputTableName, endTime, aggregateColumnName, tableName,
+      outputTableName, endTime - 1000l, aggregateColumnName, tableName,
       getDownsampledMetricSkipClause(), startTime, endTime));
 
     if (LOG.isDebugEnabled()) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
index 72d7424..1077ff8 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
@@ -83,7 +83,7 @@ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
     condition.setDoUpdate(true);
 
     condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
-      outputTableName, endTime, tableName,
+      outputTableName, endTime - 1000l, tableName,
       getDownsampledMetricSkipClause() + getIncludedUuidsClause(uuids), startTime, endTime));
 
     if (LOG.isDebugEnabled()) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
index f8757a4..5779602 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -65,7 +65,7 @@ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
     condition.setDoUpdate(true);
 
     condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
-      outputTableName, endTime, tableName,
+      outputTableName, endTime - 1000l, tableName,
       getDownsampledMetricSkipClause(), startTime, endTime));
 
     if (LOG.isDebugEnabled()) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
index 092da51..3a98804 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
@@ -389,7 +389,7 @@
     "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " +
     "SELECT UUID, %s AS SERVER_TIME, " +
     "SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
-    "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s GROUP BY UUID";
+    "FROM %s WHERE%s SERVER_TIME >= %s AND SERVER_TIME < %s GROUP BY UUID";
 
   /**
    * Downsample host metrics.
@@ -407,8 +407,8 @@
    */
   public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT " +
          "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " +
-         "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " +
-         "SERVER_TIME <= %s GROUP BY UUID";
+         "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME >= %s AND " +
+         "SERVER_TIME < %s GROUP BY UUID";
 
   /**
    * Downsample cluster metrics.
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
index 08c06a9..6d0749f 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
@@ -304,7 +304,7 @@ public void testAggregationUsingGroupByQuery() throws Exception {
       "host", null);
 
     long endTime = startTime + 1000 * 60 * 4;
-    boolean success = aggregatorMinute.doWork(startTime - 1, endTime);
+    boolean success = aggregatorMinute.doWork(startTime - 1, endTime + 1);
     assertTrue(success);
 
     Condition condition = new DefaultCondition(uuids, metricNames, Collections.singletonList("local"), "host", null, startTime,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services