You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2017/05/17 16:38:37 UTC
[1/3] ambari git commit: AMBARI-20758 Aggregate local metrics for
minute aggregation time window (dsen)
Repository: ambari
Updated Branches:
refs/heads/trunk 772be786d -> 041d353b0
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2 b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
index b876a3d..28944ca 100644
--- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
@@ -23,6 +23,8 @@ port={{metric_collector_port}}
collectionFrequency={{metrics_collection_period}}000
maxRowCacheSize=10000
sendInterval={{metrics_report_interval}}000
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
instanceId={{cluster_name}}
set.instanceId={{set_instanceId}}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py
index efea167..d45aea6 100644
--- a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py
@@ -184,6 +184,9 @@ if has_metric_collector:
metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
# if hbase is selected the hbase_rs_hosts, should not be empty, but still default just in case
if 'slave_hosts' in config['clusterHostInfo']:
rs_hosts = default('/clusterHostInfo/hbase_rs_hosts', '/clusterHostInfo/slave_hosts') #if hbase_rs_hosts not given it is assumed that region servers on same nodes as slaves
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2 b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
index 24535c5..c8f2f13 100644
--- a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
+++ b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
@@ -78,6 +78,8 @@ hbase.sink.timeline.protocol={{metric_collector_protocol}}
hbase.sink.timeline.port={{metric_collector_port}}
hbase.sink.timeline.instanceId={{cluster_name}}
hbase.sink.timeline.set.instanceId={{set_instanceId}}
+hbase.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+hbase.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
# HTTPS properties
hbase.sink.timeline.truststore.path = {{metric_truststore_path}}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2 b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
index 9076269..f4e25e1 100644
--- a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
+++ b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
@@ -76,6 +76,8 @@ hbase.sink.timeline.protocol={{metric_collector_protocol}}
hbase.sink.timeline.port={{metric_collector_port}}
hbase.sink.timeline.instanceId={{cluster_name}}
hbase.sink.timeline.set.instanceId={{set_instanceId}}
+hbase.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+hbase.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
# HTTPS properties
hbase.sink.timeline.truststore.path = {{metric_truststore_path}}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml
index fae61d3..4b03880 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml
@@ -88,6 +88,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
*.sink.timeline.port={{metric_collector_port}}
*.sink.timeline.instanceId={{cluster_name}}
*.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
# HTTPS properties
*.sink.timeline.truststore.path = {{metric_truststore_path}}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py
index d854451..c1128a5 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py
@@ -566,6 +566,8 @@ if has_metric_collector:
metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
########################################################
############# Atlas related params #####################
########################################################
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2 b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
index 9328f9f..d78a342 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
@@ -53,6 +53,8 @@
hivemetastore.sink.timeline.collector.hosts={{ams_collector_hosts}}
hivemetastore.sink.timeline.port={{metric_collector_port}}
hivemetastore.sink.timeline.protocol={{metric_collector_protocol}}
+ hivemetastore.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+ hivemetastore.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
{% endif %}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2 b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
index 9a7f9dc..1f496ef 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
@@ -53,5 +53,7 @@
hiveserver2.sink.timeline.collector.hosts={{ams_collector_hosts}}
hiveserver2.sink.timeline.port={{metric_collector_port}}
hiveserver2.sink.timeline.protocol={{metric_collector_protocol}}
+ hiveserver2.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+ hiveserver2.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
{% endif %}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2 b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2
index e9fe024..01869c0 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2
@@ -52,5 +52,7 @@
llapdaemon.sink.timeline.collector.hosts={{ams_collector_hosts}}
llapdaemon.sink.timeline.port={{metric_collector_port}}
llapdaemon.sink.timeline.protocol={{metric_collector_protocol}}
+ llapdaemon.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+ llapdaemon.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
{% endif %}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2 b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
index bd7eb0c..2e25c4a 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
@@ -52,5 +52,7 @@
llaptaskscheduler.sink.timeline.collector.hosts={{ams_collector_hosts}}
llaptaskscheduler.sink.timeline.port={{metric_collector_port}}
llaptaskscheduler.sink.timeline.protocol={{metric_collector_protocol}}
+ llaptaskscheduler.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+ llaptaskscheduler.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
{% endif %}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py
index c0ac535..a12d388 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py
@@ -565,6 +565,9 @@ if has_metric_collector:
metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
########################################################
############# Atlas related params #####################
########################################################
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2 b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
index 9328f9f..d78a342 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
@@ -53,6 +53,8 @@
hivemetastore.sink.timeline.collector.hosts={{ams_collector_hosts}}
hivemetastore.sink.timeline.port={{metric_collector_port}}
hivemetastore.sink.timeline.protocol={{metric_collector_protocol}}
+ hivemetastore.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+ hivemetastore.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
{% endif %}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2 b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
index 9a7f9dc..1f496ef 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
@@ -53,5 +53,7 @@
hiveserver2.sink.timeline.collector.hosts={{ams_collector_hosts}}
hiveserver2.sink.timeline.port={{metric_collector_port}}
hiveserver2.sink.timeline.protocol={{metric_collector_protocol}}
+ hiveserver2.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+ hiveserver2.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
{% endif %}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2 b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2
index e9fe024..01869c0 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2
@@ -52,5 +52,7 @@
llapdaemon.sink.timeline.collector.hosts={{ams_collector_hosts}}
llapdaemon.sink.timeline.port={{metric_collector_port}}
llapdaemon.sink.timeline.protocol={{metric_collector_protocol}}
+ llapdaemon.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+ llapdaemon.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
{% endif %}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2 b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
index bd7eb0c..2e25c4a 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
@@ -52,5 +52,7 @@
llaptaskscheduler.sink.timeline.collector.hosts={{ams_collector_hosts}}
llaptaskscheduler.sink.timeline.port={{metric_collector_port}}
llaptaskscheduler.sink.timeline.protocol={{metric_collector_protocol}}
+ llaptaskscheduler.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+ llaptaskscheduler.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
{% endif %}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml
index e01dacd..26e7a77 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml
@@ -422,4 +422,15 @@
<description>Timeline metrics reporter send interval</description>
<on-ambari-upgrade add="true"/>
</property>
+ <property>
+ <name>kafka.timeline.metrics.host_in_memory_aggregation</name>
+ <value>{{host_in_memory_aggregation}}</value>
+ <description>if set to "true" host metrics will be aggregated in memory on each host</description>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
+ <name>kafka.timeline.metrics.host_in_memory_aggregation_port</name>
+ <value>{{host_in_memory_aggregation_port}}</value>
+ <on-ambari-upgrade add="true"/>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
index 5b0be54..9acc1ef 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
@@ -156,6 +156,9 @@ if has_metric_collector:
metric_collector_protocol = 'https'
else:
metric_collector_protocol = 'http'
+
+ host_in_memory_aggregation = str(default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)).lower()
+ host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
pass
# Security-related params
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
index d9fae76..78ec165 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
@@ -208,6 +208,8 @@ metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sin
metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-with-common-*.jar"
metric_collector_legacy_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar"
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
# Cluster Zookeeper quorum
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2 b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2
index 51162e8..67b89c4 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2
@@ -61,6 +61,8 @@ metrics_collector:
protocol: "{{metric_collector_protocol}}"
port: "{{metric_collector_port}}"
appId: "{{metric_collector_app_id}}"
+ host_in_memory_aggregation = {{host_in_memory_aggregation}}
+ host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
# HTTPS settings
truststore.path : "{{metric_truststore_path}}"
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2 b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
index 0501039..1dedffc 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
@@ -25,6 +25,8 @@ sendInterval={{metrics_report_interval}}000
clusterReporterAppId=nimbus
instanceId={{cluster_name}}
set.instanceId={{set_instanceId}}
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
# HTTPS properties
truststore.path = {{metric_truststore_path}}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
index 7282bb5..3488e75 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
@@ -164,6 +164,9 @@ if has_metric_collector:
metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
# Cluster Zookeeper quorum
zookeeper_quorum = None
if has_zk_host:
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
index 1b02a97..1f8499f 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
@@ -77,6 +77,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
*.sink.timeline.port={{metric_collector_port}}
*.sink.timeline.instanceId={{cluster_name}}
*.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
# HTTPS properties
*.sink.timeline.truststore.path = {{metric_truststore_path}}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml b/ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml
index fae61d3..4b03880 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml
@@ -88,6 +88,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
*.sink.timeline.port={{metric_collector_port}}
*.sink.timeline.instanceId={{cluster_name}}
*.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
# HTTPS properties
*.sink.timeline.truststore.path = {{metric_truststore_path}}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py
index 678bbde..a3830f7 100644
--- a/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py
@@ -158,6 +158,8 @@ if has_metric_collector:
pass
metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
# Cluster Zookeeper quorum
zookeeper_quorum = None
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2
index 1b02a97..1f8499f 100644
--- a/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2
@@ -77,6 +77,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
*.sink.timeline.port={{metric_collector_port}}
*.sink.timeline.instanceId={{cluster_name}}
*.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
# HTTPS properties
*.sink.timeline.truststore.path = {{metric_truststore_path}}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
index 4a60892..3ee8ebc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
@@ -74,6 +74,16 @@ public class TestAmbariMetricsSinkImpl extends AbstractTimelineMetricsSink imple
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return true;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 61888;
+ }
+
+ @Override
public void init(MetricsConfiguration configuration) {
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git a/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py b/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py
index 6a70675..8cc876f 100644
--- a/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py
+++ b/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py
@@ -135,6 +135,8 @@ if has_metric_collector:
metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
#hadoop params
if has_namenode or dfs_type == 'HCFS':
[2/3] ambari git commit: AMBARI-20758 Aggregate local metrics for
minute aggregation time window (dsen)
Posted by ds...@apache.org.
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
index c0feed5..e5da9ba 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
@@ -27,6 +27,9 @@ from event_definition import HostMetricCollectEvent, ProcessMetricCollectEvent
from metric_collector import MetricsCollector
from emitter import Emitter
from host_info import HostInfo
+from aggregator import Aggregator
+from aggregator import AggregatorWatchdog
+
logger = logging.getLogger()
@@ -50,11 +53,15 @@ class Controller(threading.Thread):
self.initialize_events_cache()
self.emitter = Emitter(self.config, self.application_metric_map, stop_handler)
self._t = None
+ self.aggregator = None
+ self.aggregator_watchdog = None
def run(self):
logger.info('Running Controller thread: %s' % threading.currentThread().getName())
self.start_emitter()
+ if self.config.is_inmemory_aggregation_enabled():
+ self.start_aggregator_with_watchdog()
# Wake every 5 seconds to push events to the queue
while True:
@@ -62,6 +69,10 @@ class Controller(threading.Thread):
logger.warn('Event Queue full!! Suspending further collections.')
else:
self.enqueque_events()
+ # restart aggregator if needed
+ if self.config.is_inmemory_aggregation_enabled() and not self.aggregator_watchdog.is_ok():
+ logger.warning("Aggregator is not available. Restarting aggregator.")
+ self.start_aggregator_with_watchdog()
pass
# Wait for the service stop event instead of sleeping blindly
if 0 == self._stop_handler.wait(self.sleep_interval):
@@ -75,6 +86,12 @@ class Controller(threading.Thread):
# The emitter thread should have stopped by now, just ensure it has shut
# down properly
self.emitter.join(5)
+
+ if self.config.is_inmemory_aggregation_enabled():
+ self.aggregator.stop()
+ self.aggregator_watchdog.stop()
+ self.aggregator.join(5)
+ self.aggregator_watchdog.join(5)
pass
# TODO: Optimize to not use Timer class and use the Queue instead
@@ -115,3 +132,14 @@ class Controller(threading.Thread):
def start_emitter(self):
self.emitter.start()
+
+ # Start aggregator and watcher threads
+ def start_aggregator_with_watchdog(self):
+ if self.aggregator:
+ self.aggregator.stop()
+ if self.aggregator_watchdog:
+ self.aggregator.stop()
+ self.aggregator = Aggregator(self.config, self._stop_handler)
+ self.aggregator_watchdog = AggregatorWatchdog(self.config, self._stop_handler)
+ self.aggregator.start()
+ self.aggregator_watchdog.start()
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index e2a7f0d..77b8c23 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -44,10 +44,16 @@ class Emitter(threading.Thread):
self._stop_handler = stop_handler
self.application_metric_map = application_metric_map
self.collector_port = config.get_server_port()
- self.all_metrics_collector_hosts = config.get_metrics_collector_hosts()
+ self.all_metrics_collector_hosts = config.get_metrics_collector_hosts_as_list()
self.is_server_https_enabled = config.is_server_https_enabled()
self.set_instanceid = config.is_set_instanceid()
self.instanceid = config.get_instanceid()
+ self.is_inmemory_aggregation_enabled = config.is_inmemory_aggregation_enabled()
+
+ if self.is_inmemory_aggregation_enabled:
+ self.collector_port = config.get_inmemory_aggregation_port()
+ self.all_metrics_collector_hosts = ['localhost']
+ self.is_server_https_enabled = False
if self.is_server_https_enabled:
self.ca_certs = config.get_ca_certs()
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
index bfb6957..7a9fbec 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
@@ -117,7 +117,8 @@ class StopHandlerLinux(StopHandler):
def wait(self, timeout=None):
# Stop process when stop event received
- if self.stop_event.wait(timeout):
+ self.stop_event.wait(timeout)
+ if self.stop_event.isSet():
logger.info("Stop event received")
return 0
# Timeout
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
index d218015..53d27f8 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
@@ -21,7 +21,7 @@ limitations under the License.
import logging
import os
import sys
-
+import signal
from ambari_commons.os_utils import remove_file
from core.controller import Controller
@@ -73,6 +73,10 @@ def server_process_main(stop_handler, scmStatus=None):
if scmStatus is not None:
scmStatus.reportStarted()
+ # For some reason this is needed to catch system signals like SIGTERM
+ # TODO fix if possible
+ signal.pause()
+
#The controller thread finishes when the stop event is signaled
controller.join()
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index 211e9cd..76b1c15 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -72,6 +72,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY;
private static final String TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + INSTANCE_ID_PROPERTY;
private static final String TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SET_INSTANCE_ID_PROPERTY;
+ private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY;
+ private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY;
private static final String TIMELINE_DEFAULT_HOST = "localhost";
private static final String TIMELINE_DEFAULT_PORT = "6188";
private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
@@ -96,6 +98,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
private String[] includedMetricsPrefixes;
// Local cache to avoid prefix matching everytime
private Set<String> excludedMetrics = new HashSet<>();
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
@Override
protected String getCollectorUri(String host) {
@@ -132,6 +136,17 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
return hostname;
}
+
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
public void setMetricsCache(TimelineMetricsCache metricsCache) {
this.metricsCache = metricsCache;
}
@@ -169,6 +184,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
instanceId = props.getString(TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY);
setInstanceId = props.getBoolean(TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY);
+ hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
+ hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
if (metricCollectorProtocol.contains("https")) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 08f0598..24b2c8b 100644
--- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -55,6 +55,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
private NimbusClient nimbusClient;
private String applicationId;
private int timeoutSeconds;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
public StormTimelineMetricsReporter() {
@@ -96,6 +98,16 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Map conf) {
LOG.info("Preparing Storm Metrics Reporter");
try {
@@ -130,6 +142,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
applicationId = cf.get(APP_ID).toString();
setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString());
instanceId = cf.get(INSTANCE_ID_PROPERTY).toString();
+ hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString());
+ hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString());
collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
if (protocol.contains("https")) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 20f60e1..c9c0538 100644
--- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -61,6 +61,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
private String applicationId;
private boolean setInstanceId;
private String instanceId;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
@Override
protected String getCollectorUri(String host) {
@@ -98,6 +100,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
LOG.info("Preparing Storm Metrics Sink");
try {
@@ -126,6 +138,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
// Initialize the collector write strategy
super.init();
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 14f160b..5b75065 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -50,6 +50,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
private String instanceId;
private String applicationId;
private int timeoutSeconds;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
public StormTimelineMetricsReporter() {
@@ -91,6 +93,16 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Object registrationArgument) {
LOG.info("Preparing Storm Metrics Reporter");
try {
@@ -119,6 +131,10 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY));
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
+
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+
if (protocol.contains("https")) {
String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 425201c..320e177 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -70,6 +70,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
private String applicationId;
private String instanceId;
private boolean setInstanceId;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
@Override
protected String getCollectorUri(String host) {
@@ -107,6 +109,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
LOG.info("Preparing Storm Metrics Sink");
try {
@@ -137,6 +149,10 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
port = configuration.getProperty(COLLECTOR_PORT, "6188");
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+
// Initialize the collector write strategy
super.init();
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index c242a2f..f984253 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -24,10 +24,13 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
import org.apache.hadoop.service.AbstractService;
@@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
@@ -62,6 +66,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
@@ -152,10 +157,14 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
scheduleAggregatorThread(dailyClusterAggregator);
// Start the minute host aggregator
- TimelineMetricAggregator minuteHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
- hBaseAccessor, metricsConf, haController);
- scheduleAggregatorThread(minuteHostAggregator);
+ if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) {
+ LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector");
+ } else {
+ TimelineMetricAggregator minuteHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+ hBaseAccessor, metricsConf, haController);
+ scheduleAggregatorThread(minuteHostAggregator);
+ }
// Start the hourly host aggregator
TimelineMetricAggregator hourlyHostAggregator =
@@ -390,6 +399,18 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
}
@Override
+ public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+ Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+ for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
+ aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
+ }
+ hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+
+ return new TimelinePutResponse();
+ }
+
+ @Override
public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
throws SQLException, IOException {
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index fb369e8..3b2a119 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -40,8 +42,6 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 0d5042f..023465b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -296,6 +296,8 @@ public class TimelineMetricConfiguration {
public static final String AMSHBASE_METRICS_WHITESLIST_FILE = "amshbase_metrics_whitelist";
+ public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = "timeline.metrics.host.inmemory.aggregation";
+
private Configuration hbaseConf;
private Configuration metricsConf;
private Configuration amsEnvConf;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index bde09cb..d052d54 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -80,6 +81,7 @@ public interface TimelineMetricStore {
*/
Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException;
+ TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException;
/**
* Returns all hosts that have written metrics with the apps on the host
* @return { hostname : [ appIds ] }
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
index 65d54c0..7b03b30 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
@@ -19,10 +19,10 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import java.util.Map;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
deleted file mode 100644
index 825ac25..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.annotate.JsonSubTypes;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import java.io.IOException;
-
-/**
-*
-*/
-@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
- @JsonSubTypes.Type(value = MetricHostAggregate.class)})
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class MetricAggregate {
- private static final ObjectMapper mapper = new ObjectMapper();
-
- protected Double sum = 0.0;
- protected Double deviation;
- protected Double max = Double.MIN_VALUE;
- protected Double min = Double.MAX_VALUE;
-
- public MetricAggregate() {
- }
-
- MetricAggregate(Double sum, Double deviation, Double max,
- Double min) {
- this.sum = sum;
- this.deviation = deviation;
- this.max = max;
- this.min = min;
- }
-
- public void updateSum(Double sum) {
- this.sum += sum;
- }
-
- public void updateMax(Double max) {
- if (max > this.max) {
- this.max = max;
- }
- }
-
- public void updateMin(Double min) {
- if (min < this.min) {
- this.min = min;
- }
- }
-
- @JsonProperty("sum")
- public Double getSum() {
- return sum;
- }
-
- @JsonProperty("deviation")
- public Double getDeviation() {
- return deviation;
- }
-
- @JsonProperty("max")
- public Double getMax() {
- return max;
- }
-
- @JsonProperty("min")
- public Double getMin() {
- return min;
- }
-
- public void setSum(Double sum) {
- this.sum = sum;
- }
-
- public void setDeviation(Double deviation) {
- this.deviation = deviation;
- }
-
- public void setMax(Double max) {
- this.max = max;
- }
-
- public void setMin(Double min) {
- this.min = min;
- }
-
- public String toJSON() throws IOException {
- return mapper.writeValueAsString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
deleted file mode 100644
index 9c837b6..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
-*
-*/
-public class MetricClusterAggregate extends MetricAggregate {
- private int numberOfHosts;
-
- @JsonCreator
- public MetricClusterAggregate() {
- }
-
- public MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
- Double max, Double min) {
- super(sum, deviation, max, min);
- this.numberOfHosts = numberOfHosts;
- }
-
- @JsonProperty("numberOfHosts")
- public int getNumberOfHosts() {
- return numberOfHosts;
- }
-
- public void updateNumberOfHosts(int count) {
- this.numberOfHosts += count;
- }
-
- public void setNumberOfHosts(int numberOfHosts) {
- this.numberOfHosts = numberOfHosts;
- }
-
- /**
- * Find and update min, max and avg for a minute
- */
- public void updateAggregates(MetricClusterAggregate hostAggregate) {
- updateMax(hostAggregate.getMax());
- updateMin(hostAggregate.getMin());
- updateSum(hostAggregate.getSum());
- updateNumberOfHosts(hostAggregate.getNumberOfHosts());
- }
-
- @Override
- public String toString() {
- return "MetricAggregate{" +
- "sum=" + sum +
- ", numberOfHosts=" + numberOfHosts +
- ", deviation=" + deviation +
- ", max=" + max +
- ", min=" + min +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
deleted file mode 100644
index 340ec75..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Represents a collection of minute based aggregation of values for
- * resolution greater than a minute.
- */
-public class MetricHostAggregate extends MetricAggregate {
-
- private long numberOfSamples = 0;
-
- @JsonCreator
- public MetricHostAggregate() {
- super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
- }
-
- public MetricHostAggregate(Double sum, int numberOfSamples,
- Double deviation,
- Double max, Double min) {
- super(sum, deviation, max, min);
- this.numberOfSamples = numberOfSamples;
- }
-
- @JsonProperty("numberOfSamples")
- public long getNumberOfSamples() {
- return numberOfSamples == 0 ? 1 : numberOfSamples;
- }
-
- public void updateNumberOfSamples(long count) {
- this.numberOfSamples += count;
- }
-
- public void setNumberOfSamples(long numberOfSamples) {
- this.numberOfSamples = numberOfSamples;
- }
-
- public double getAvg() {
- return sum / numberOfSamples;
- }
-
- /**
- * Find and update min, max and avg for a minute
- */
- public void updateAggregates(MetricHostAggregate hostAggregate) {
- updateMax(hostAggregate.getMax());
- updateMin(hostAggregate.getMin());
- updateSum(hostAggregate.getSum());
- updateNumberOfSamples(hostAggregate.getNumberOfSamples());
- }
-
- @Override
- public String toString() {
- return "MetricHostAggregate{" +
- "sum=" + sum +
- ", numberOfSamples=" + numberOfSamples +
- ", deviation=" + deviation +
- ", max=" + max +
- ", min=" + min +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
index 44aca03..9eaf456 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsFilter;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 0934356..ba16b43 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index a5a3499..34b1f9b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -38,6 +38,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index 0ea9c08..a17433b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index b5f49fb..672f85f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 9da921a..50cfb08 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
@@ -285,6 +286,36 @@ public class TimelineWebServices {
}
}
+ /**
+ * Store the given metrics into the timeline store, and return errors that
+ * happened during storing.
+ */
+ @Path("/metrics/aggregated")
+ @POST
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelinePutResponse postAggregatedMetrics(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ AggregationResult metrics) {
+
+ init(res);
+ if (metrics == null) {
+ return new TimelinePutResponse();
+ }
+
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing aggregated metrics: " +
+ TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
+ }
+
+ return timelineMetricStore.putHostAggregatedMetrics(metrics);
+ } catch (Exception e) {
+ LOG.error("Error saving metrics.", e);
+ throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
@Path("/containermetrics")
@POST
@Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 0087fd9..d5baaef 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -26,12 +26,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
index 37ec134..7eeb9c4 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
@@ -17,9 +17,9 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import java.util.Arrays;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index a910cc2..d668178 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -22,11 +22,11 @@ import com.google.common.collect.Multimap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
index 44f48e8..3009163 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
.timeline;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -34,7 +34,7 @@ public class TestMetricHostAggregate {
assertThat(aggregate.getSum()).isEqualTo(3.0);
assertThat(aggregate.getMin()).isEqualTo(1.0);
assertThat(aggregate.getMax()).isEqualTo(2.0);
- assertThat(aggregate.getAvg()).isEqualTo(3.0 / 2);
+ assertThat(aggregate.calculateAverage()).isEqualTo(3.0 / 2);
}
@Test
@@ -50,7 +50,7 @@ public class TestMetricHostAggregate {
assertThat(aggregate.getSum()).isEqualTo(12.0);
assertThat(aggregate.getMin()).isEqualTo(0.5);
assertThat(aggregate.getMax()).isEqualTo(7.5);
- assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
+ assertThat(aggregate.calculateAverage()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
}
static MetricHostAggregate createAggregate (Double sum, Double min,
@@ -63,4 +63,4 @@ public class TestMetricHostAggregate {
aggregate.setNumberOfSamples(samplesCount);
return aggregate;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index f00906e..ac2f9d7 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -92,6 +93,11 @@ public class TestTimelineMetricStore implements TimelineMetricStore {
}
@Override
+ public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+ return null;
+ }
+
+ @Override
public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
return Collections.emptyMap();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
index fa0cfe9..53f6f6c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
@@ -17,10 +17,10 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import java.util.Collections;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
index f083731..07fd85d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
index 9873643..75b3f91 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
@@ -124,14 +125,14 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else if ("mem_free".equals(currentMetric.getMetricName())) {
assertEquals(2.0, currentHostAggregate.getMax());
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else {
fail("Unexpected entry");
@@ -198,7 +199,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
assertEquals(12 * 15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
}
}
}
@@ -260,7 +261,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
assertEquals(12 * 15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
}
}
}
@@ -309,14 +310,14 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else if ("mem_free".equals(currentMetric.getMetricName())) {
assertEquals(2.0, currentHostAggregate.getMax());
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else {
fail("Unexpected entry");
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index 78db11d..6541b2c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml
index 2d88912..02f9574 100644
--- a/ambari-metrics/pom.xml
+++ b/ambari-metrics/pom.xml
@@ -33,6 +33,7 @@
<module>ambari-metrics-host-monitoring</module>
<module>ambari-metrics-grafana</module>
<module>ambari-metrics-assembly</module>
+ <module>ambari-metrics-host-aggregator</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
index 8d1f63f..a0765bf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
@@ -300,6 +300,16 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
return hostName;
}
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return false;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 0;
+ }
+
private List<TimelineMetric> getFilteredMetricList(List<SingleMetric> metrics) {
final List<TimelineMetric> metricList = new ArrayList<>();
for (SingleMetric metric : metrics) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
index 150b0a8..5d21514 100644
--- a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
@@ -153,6 +153,8 @@ if has_metric_collector:
pass
metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
# if accumulo is selected accumulo_tserver_hosts should not be empty, but still default just in case
if 'slave_hosts' in config['clusterHostInfo']:
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2 b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
index 6873c85..742ea3c 100644
--- a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
+++ b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
@@ -16,6 +16,9 @@
# Poll collectors every {{metrics_report_interval}} seconds
*.period={{metrics_collection_period}}
+*.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+
{% if has_metric_collector %}
*.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
index cb66537..4d33661 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
@@ -101,6 +101,14 @@
<on-ambari-upgrade add="true"/>
</property>
<property>
+ <name>timeline.metrics.host.inmemory.aggregation.jvm.arguments</name>
+ <value>-Xmx256m -Xms128m -XX:PermSize=68m</value>
+ <description>
+ Local aggregator jvm extra arguments separated with spaces
+ </description>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
<name>timeline.metrics.skip.network.interfaces.patterns</name>
<value>None</value>
<description>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index 8e1671e..1b085f6 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -787,4 +787,15 @@
<value>{{cluster_zookeeper_clientPort}}</value>
<on-ambari-upgrade add="true"/>
</property>
+ <property>
+ <name>timeline.metrics.host.inmemory.aggregation</name>
+ <value>false</value>
+ <description>if set to "true" host metrics will be aggregated in memory on each host</description>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
+ <name>timeline.metrics.host.inmemory.aggregation.port</name>
+ <value>61888</value>
+ <on-ambari-upgrade add="true"/>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
index 740a91a..9031b46 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
@@ -93,6 +93,9 @@
<primary>true</primary>
</log>
</logs>
+ <configuration-dependencies>
+ <config-type>ams-site</config-type>
+ </configuration-dependencies>
</component>
<component>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
index a929847..f49d47d 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
@@ -163,6 +163,20 @@ def ams(name=None):
create_parents = True
)
+ if params.host_in_memory_aggregation and params.log4j_props is not None:
+ File(os.path.join(params.ams_monitor_conf_dir, "log4j.properties"),
+ owner=params.ams_user,
+ content=params.log4j_props
+ )
+
+ XmlConfig("ams-site.xml",
+ conf_dir=params.ams_monitor_conf_dir,
+ configurations=params.config['configurations']['ams-site'],
+ configuration_attributes=params.config['configuration_attributes']['ams-site'],
+ owner=params.ams_user,
+ group=params.user_group
+ )
+
TemplateConfig(
os.path.join(params.ams_monitor_conf_dir, "metric_monitor.ini"),
owner=params.ams_user,
@@ -366,6 +380,22 @@ def ams(name=None, action=None):
create_parents = True
)
+ if params.host_in_memory_aggregation and params.log4j_props is not None:
+ File(format("{params.ams_monitor_conf_dir}/log4j.properties"),
+ mode=0644,
+ group=params.user_group,
+ owner=params.ams_user,
+ content=InlineTemplate(params.log4j_props)
+ )
+
+ XmlConfig("ams-site.xml",
+ conf_dir=params.ams_monitor_conf_dir,
+ configurations=params.config['configurations']['ams-site'],
+ configuration_attributes=params.config['configuration_attributes']['ams-site'],
+ owner=params.ams_user,
+ group=params.user_group
+ )
+
Execute(format("{sudo} chown -R {ams_user}:{user_group} {ams_monitor_log_dir}")
)
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index 50dde1c..b8c14f4 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -224,6 +224,11 @@ metrics_collector_heapsize = check_append_heap_property(str(metrics_collector_he
master_heapsize = check_append_heap_property(str(master_heapsize), "m")
regionserver_heapsize = check_append_heap_property(str(regionserver_heapsize), "m")
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+host_in_memory_aggregation_jvm_arguments = default("/configurations/ams-env/timeline.metrics.host.inmemory.aggregation.jvm.arguments",
+ "-Xmx256m -Xms128m -XX:PermSize=68m")
+
regionserver_xmn_max = default('/configurations/ams-hbase-env/hbase_regionserver_xmn_max', None)
if regionserver_xmn_max:
regionserver_xmn_max = int(trim_heap_property(str(regionserver_xmn_max), "m"))
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
index 9729bbe..bb0db4f 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
@@ -58,6 +58,9 @@ rpc.protocol={{metric_collector_protocol}}
*.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar
*.sink.timeline.slave.host.name={{hostname}}
+*.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+
hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
hbase.sink.timeline.period={{metrics_collection_period}}
hbase.sink.timeline.sendInterval={{metrics_report_interval}}000
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
index 769ad67..b7dee50 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
@@ -38,3 +38,10 @@ failover_strategy = {{failover_strategy}}
failover_strategy_blacklisted_interval_seconds = {{failover_strategy_blacklisted_interval_seconds}}
port = {{metric_collector_port}}
https_enabled = {{metric_collector_https_enabled}}
+
+[aggregation]
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+java_home = {{java64_home}}
+jvm_arguments = {{host_in_memory_aggregation_jvm_arguments}}
+ams_monitor_log_dir = {{ams_monitor_log_dir}}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
index 86a290f..0e0c9aa 100644
--- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
@@ -124,6 +124,9 @@ if has_metric_collector:
metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
# Cluster Zookeeper quorum
zookeeper_quorum = None
if not len(default("/clusterHostInfo/zookeeper_hosts", [])) == 0:
[3/3] ambari git commit: AMBARI-20758 Aggregate local metrics for
minute aggregation time window (dsen)
Posted by ds...@apache.org.
AMBARI-20758 Aggregate local metrics for minute aggregation time window (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/041d353b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/041d353b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/041d353b
Branch: refs/heads/trunk
Commit: 041d353b0d75b20b0322097e13a1701226e6fc97
Parents: 772be78
Author: Dmytro Sen <ds...@apache.org>
Authored: Wed May 17 19:38:29 2017 +0300
Committer: Dmytro Sen <ds...@apache.org>
Committed: Wed May 17 19:38:29 2017 +0300
----------------------------------------------------------------------
.../logfeeder/metrics/LogFeederAMSClient.java | 12 +-
ambari-metrics/ambari-metrics-assembly/pom.xml | 20 +++
.../src/main/assembly/monitor-windows.xml | 7 +
.../src/main/assembly/monitor.xml | 9 +-
.../timeline/AbstractTimelineMetricsSink.java | 24 ++-
.../sink/timeline/AggregationResult.java | 60 +++++++
.../metrics2/sink/timeline/MetricAggregate.java | 110 ++++++++++++
.../sink/timeline/MetricClusterAggregate.java | 73 ++++++++
.../sink/timeline/MetricHostAggregate.java | 81 +++++++++
.../metrics2/sink/timeline/TimelineMetric.java | 6 +-
.../TimelineMetricWithAggregatedValues.java | 65 +++++++
.../AbstractTimelineMetricSinkTest.java | 10 ++
.../availability/MetricCollectorHATest.java | 10 ++
.../cache/HandleConnectExceptionTest.java | 10 ++
.../sink/flume/FlumeTimelineMetricsSink.java | 16 ++
.../timeline/HadoopTimelineMetricsSink.java | 20 ++-
.../conf/unix/log4j.properties | 31 ++++
.../conf/windows/log4j.properties | 29 +++
.../ambari-metrics-host-aggregator/pom.xml | 120 +++++++++++++
.../AbstractMetricPublisherThread.java | 134 ++++++++++++++
.../aggregator/AggregatedMetricsPublisher.java | 101 +++++++++++
.../host/aggregator/AggregatorApplication.java | 180 +++++++++++++++++++
.../host/aggregator/AggregatorWebService.java | 56 ++++++
.../host/aggregator/RawMetricsPublisher.java | 60 +++++++
.../host/aggregator/TimelineMetricsHolder.java | 98 ++++++++++
.../conf/unix/ambari-metrics-monitor | 2 +-
.../src/main/python/core/aggregator.py | 110 ++++++++++++
.../src/main/python/core/config_reader.py | 35 +++-
.../src/main/python/core/controller.py | 28 +++
.../src/main/python/core/emitter.py | 8 +-
.../src/main/python/core/stop_handler.py | 3 +-
.../src/main/python/main.py | 6 +-
.../kafka/KafkaTimelineMetricsReporter.java | 17 ++
.../storm/StormTimelineMetricsReporter.java | 14 ++
.../sink/storm/StormTimelineMetricsSink.java | 14 ++
.../storm/StormTimelineMetricsReporter.java | 16 ++
.../sink/storm/StormTimelineMetricsSink.java | 16 ++
.../timeline/HBaseTimelineMetricStore.java | 29 ++-
.../metrics/timeline/PhoenixHBaseAccessor.java | 4 +-
.../timeline/TimelineMetricConfiguration.java | 2 +
.../metrics/timeline/TimelineMetricStore.java | 2 +
.../timeline/TimelineMetricsAggregatorSink.java | 4 +-
.../timeline/aggregators/MetricAggregate.java | 110 ------------
.../aggregators/MetricClusterAggregate.java | 73 --------
.../aggregators/MetricHostAggregate.java | 81 ---------
.../TimelineMetricAppAggregator.java | 1 +
.../TimelineMetricClusterAggregator.java | 2 +
.../TimelineMetricClusterAggregatorSecond.java | 1 +
.../TimelineMetricHostAggregator.java | 1 +
.../aggregators/TimelineMetricReadHelper.java | 2 +
.../webapp/TimelineWebServices.java | 31 ++++
.../timeline/ITPhoenixHBaseAccessor.java | 4 +-
.../metrics/timeline/MetricTestHelper.java | 2 +-
.../timeline/PhoenixHBaseAccessorTest.java | 4 +-
.../timeline/TestMetricHostAggregate.java | 8 +-
.../timeline/TestTimelineMetricStore.java | 6 +
.../TimelineMetricsAggregatorMemorySink.java | 4 +-
.../aggregators/ITClusterAggregator.java | 4 +-
.../aggregators/ITMetricAggregator.java | 13 +-
...melineMetricClusterAggregatorSecondTest.java | 1 +
ambari-metrics/pom.xml | 1 +
.../system/impl/AmbariMetricSinkImpl.java | 10 ++
.../1.6.1.2.2.0/package/scripts/params.py | 2 +
.../hadoop-metrics2-accumulo.properties.j2 | 3 +
.../0.1.0/configuration/ams-env.xml | 8 +
.../0.1.0/configuration/ams-site.xml | 11 ++
.../AMBARI_METRICS/0.1.0/metainfo.xml | 3 +
.../AMBARI_METRICS/0.1.0/package/scripts/ams.py | 30 ++++
.../0.1.0/package/scripts/params.py | 5 +
.../hadoop-metrics2-hbase.properties.j2 | 3 +
.../package/templates/metric_monitor.ini.j2 | 7 +
.../FLUME/1.4.0.2.0/package/scripts/params.py | 3 +
.../templates/flume-metrics2.properties.j2 | 2 +
.../0.96.0.2.0/package/scripts/params_linux.py | 3 +
...-metrics2-hbase.properties-GANGLIA-MASTER.j2 | 2 +
...doop-metrics2-hbase.properties-GANGLIA-RS.j2 | 2 +
.../hadoop-metrics2.properties.xml | 2 +
.../0.12.0.2.0/package/scripts/params_linux.py | 2 +
.../hadoop-metrics2-hivemetastore.properties.j2 | 2 +
.../hadoop-metrics2-hiveserver2.properties.j2 | 2 +
.../templates/hadoop-metrics2-llapdaemon.j2 | 2 +
.../hadoop-metrics2-llaptaskscheduler.j2 | 2 +
.../2.1.0.3.0/package/scripts/params_linux.py | 3 +
.../hadoop-metrics2-hivemetastore.properties.j2 | 2 +
.../hadoop-metrics2-hiveserver2.properties.j2 | 2 +
.../templates/hadoop-metrics2-llapdaemon.j2 | 2 +
.../hadoop-metrics2-llaptaskscheduler.j2 | 2 +
.../KAFKA/0.8.1/configuration/kafka-broker.xml | 11 ++
.../KAFKA/0.8.1/package/scripts/params.py | 3 +
.../STORM/0.9.1/package/scripts/params_linux.py | 2 +
.../0.9.1/package/templates/config.yaml.j2 | 2 +
.../templates/storm-metrics2.properties.j2 | 2 +
.../2.0.6/hooks/before-START/scripts/params.py | 3 +
.../templates/hadoop-metrics2.properties.j2 | 2 +
.../hadoop-metrics2.properties.xml | 2 +
.../3.0/hooks/before-START/scripts/params.py | 2 +
.../templates/hadoop-metrics2.properties.j2 | 2 +
.../system/impl/TestAmbariMetricsSinkImpl.java | 10 ++
.../2.0/hooks/before-START/scripts/params.py | 2 +
99 files changed, 1854 insertions(+), 307 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
index 2d1bf40..39526a5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
@@ -89,6 +89,16 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return false;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 0;
+ }
+
+ @Override
protected boolean emitMetrics(TimelineMetrics metrics) {
return super.emitMetrics(metrics);
}
@@ -103,4 +113,4 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
return collectorPort;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/pom.xml b/ambari-metrics/ambari-metrics-assembly/pom.xml
index a4b87de..6b81de5 100644
--- a/ambari-metrics/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics/ambari-metrics-assembly/pom.xml
@@ -35,6 +35,7 @@
<properties>
<collector.dir>${project.basedir}/../ambari-metrics-timelineservice</collector.dir>
<monitor.dir>${project.basedir}/../ambari-metrics-host-monitoring</monitor.dir>
+ <aggregator.dir>${project.basedir}/../ambari-metrics-host-aggregator</aggregator.dir>
<grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
<hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
<storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
@@ -599,6 +600,19 @@
</sources>
</mapping>
<mapping>
+ <directory>/var/lib/ambari-metrics-monitor/lib</directory>
+ <sources>
+ <source>
+ <location>
+ ${aggregator.dir}/target/
+ </location>
+ <includes>
+ <include>ambari-metrics-host-aggregator-${project.version}.jar</include>
+ </includes>
+ </source>
+ </sources>
+ </mapping>
+ <mapping>
<directory>/etc/ambari-metrics-monitor/conf</directory>
<configuration>true</configuration>
</mapping>
@@ -744,6 +758,7 @@
<path>/var/run/ambari-metrics-grafana</path>
<path>/var/log/ambari-metrics-grafana</path>
<path>/var/lib/ambari-metrics-collector</path>
+ <path>/var/lib/ambari-metrics-monitor/lib</path>
<path>/var/lib/ambari-metrics-grafana</path>
<path>/usr/lib/ambari-metrics-hadoop-sink</path>
<path>/usr/lib/ambari-metrics-kafka-sink</path>
@@ -1331,6 +1346,11 @@
<type>pom</type>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-host-aggregator</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
index ab309a1..d015d31 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
@@ -64,6 +64,13 @@
</includes>
</fileSet>
<fileSet>
+ <directory>${aggregator.dir}/conf/windows</directory>
+ <outputDirectory>conf</outputDirectory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ </fileSet>
+ <fileSet>
<directory>${monitor.dir}/conf/windows</directory>
<outputDirectory>/</outputDirectory>
<includes>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
index 99a41c3..448fe62 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
@@ -46,6 +46,13 @@
</includes>
</fileSet>
<fileSet>
+ <directory>${aggregator.dir}/conf/unix</directory>
+ <outputDirectory>conf</outputDirectory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ </fileSet>
+ <fileSet>
<directory>${monitor.dir}/conf/unix</directory>
<outputDirectory>bin</outputDirectory>
<includes>
@@ -68,4 +75,4 @@
-</assembly>
\ No newline at end of file
+</assembly>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 2c6fae2..a8dc571 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -78,6 +78,8 @@ public abstract class AbstractTimelineMetricsSink {
public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path";
public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type";
public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password";
+ public static final String HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = "host_in_memory_aggregation";
+ public static final String HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = "host_in_memory_aggregation_port";
public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
public static final String INSTANCE_ID_PROPERTY = "instanceId";
public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
@@ -241,8 +243,14 @@ public abstract class AbstractTimelineMetricsSink {
}
protected boolean emitMetrics(TimelineMetrics metrics) {
- String collectorHost = getCurrentCollectorHost();
- String connectUrl = getCollectorUri(collectorHost);
+ String connectUrl;
+ if (isHostInMemoryAggregationEnabled()) {
+ connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort()));
+ } else {
+ String collectorHost = getCurrentCollectorHost();
+ connectUrl = getCollectorUri(collectorHost);
+ }
+
String jsonData = null;
LOG.debug("EmitMetrics connectUrl = " + connectUrl);
try {
@@ -562,4 +570,16 @@ public abstract class AbstractTimelineMetricsSink {
* @return String "host1"
*/
abstract protected String getHostname();
+
+ /**
+ * Check if host in-memory aggregation is enabled
+ * @return
+ */
+ abstract protected boolean isHostInMemoryAggregationEnabled();
+
+ /**
+ * In memory aggregation port
+ * @return
+ */
+ abstract protected int getHostInMemoryAggregationPort();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
new file mode 100644
index 0000000..c903e3d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Set;
+
+@XmlRootElement(name="AggregationResult")
+public class AggregationResult {
+ protected Set<TimelineMetricWithAggregatedValues> result;
+ protected Long timeInMilis;
+
+ @Override
+ public String toString() {
+ return "AggregationResult{" +
+ "result=" + result +
+ ", timeInMilis=" + timeInMilis +
+ '}';
+ }
+
+ public AggregationResult() {
+ }
+
+ public AggregationResult(Set<TimelineMetricWithAggregatedValues> result, Long timeInMilis) {
+ this.result = result;
+ this.timeInMilis = timeInMilis;
+ }
+ @XmlElement
+ public Set<TimelineMetricWithAggregatedValues> getResult() {
+ return result;
+ }
+
+ public void setResult(Set<TimelineMetricWithAggregatedValues> result) {
+ this.result = result;
+ }
+ @XmlElement
+ public Long getTimeInMilis() {
+ return timeInMilis;
+ }
+
+ public void setTimeInMilis(Long timeInMilis) {
+ this.timeInMilis = timeInMilis;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
new file mode 100644
index 0000000..84cba0e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+ @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ protected Double sum = 0.0;
+ protected Double deviation;
+ protected Double max = Double.MIN_VALUE;
+ protected Double min = Double.MAX_VALUE;
+
+ public MetricAggregate() {
+ }
+
+ MetricAggregate(Double sum, Double deviation, Double max,
+ Double min) {
+ this.sum = sum;
+ this.deviation = deviation;
+ this.max = max;
+ this.min = min;
+ }
+
+ public void updateSum(Double sum) {
+ this.sum += sum;
+ }
+
+ public void updateMax(Double max) {
+ if (max > this.max) {
+ this.max = max;
+ }
+ }
+
+ public void updateMin(Double min) {
+ if (min < this.min) {
+ this.min = min;
+ }
+ }
+
+ @JsonProperty("sum")
+ public Double getSum() {
+ return sum;
+ }
+
+ @JsonProperty("deviation")
+ public Double getDeviation() {
+ return deviation;
+ }
+
+ @JsonProperty("max")
+ public Double getMax() {
+ return max;
+ }
+
+ @JsonProperty("min")
+ public Double getMin() {
+ return min;
+ }
+
+ public void setSum(Double sum) {
+ this.sum = sum;
+ }
+
+ public void setDeviation(Double deviation) {
+ this.deviation = deviation;
+ }
+
+ public void setMax(Double max) {
+ this.max = max;
+ }
+
+ public void setMin(Double min) {
+ this.min = min;
+ }
+
+ public String toJSON() throws IOException {
+ return mapper.writeValueAsString(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
new file mode 100644
index 0000000..7ef2c1d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+ private int numberOfHosts;
+
+ @JsonCreator
+ public MetricClusterAggregate() {
+ }
+
+ public MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ @JsonProperty("numberOfHosts")
+ public int getNumberOfHosts() {
+ return numberOfHosts;
+ }
+
+ public void updateNumberOfHosts(int count) {
+ this.numberOfHosts += count;
+ }
+
+ public void setNumberOfHosts(int numberOfHosts) {
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ public void updateAggregates(MetricClusterAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+ }
+
+ @Override
+ public String toString() {
+ return "MetricAggregate{" +
+ "sum=" + sum +
+ ", numberOfHosts=" + numberOfHosts +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
new file mode 100644
index 0000000..e190913
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+ private long numberOfSamples = 0;
+
+ @JsonCreator
+ public MetricHostAggregate() {
+ super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+ }
+
+ public MetricHostAggregate(Double sum, int numberOfSamples,
+ Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ @JsonProperty("numberOfSamples")
+ public long getNumberOfSamples() {
+ return numberOfSamples == 0 ? 1 : numberOfSamples;
+ }
+
+ public void updateNumberOfSamples(long count) {
+ this.numberOfSamples += count;
+ }
+
+ public void setNumberOfSamples(long numberOfSamples) {
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ public double calculateAverage() {
+ return sum / numberOfSamples;
+ }
+
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ public void updateAggregates(MetricHostAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+ }
+
+ @Override
+ public String toString() {
+ return "MetricHostAggregate{" +
+ "sum=" + sum +
+ ", numberOfSamples=" + numberOfSamples +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 44c9d4a..edace52 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -45,7 +45,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
private String type;
private String units;
private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
- private Map<String, String> metadata = new HashMap<>();
+ private HashMap<String, String> metadata = new HashMap<>();
// default
public TimelineMetric() {
@@ -151,11 +151,11 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
}
@XmlElement(name = "metadata")
- public Map<String,String> getMetadata () {
+ public HashMap<String,String> getMetadata () {
return metadata;
}
- public void setMetadata (Map<String,String> metadata) {
+ public void setMetadata (HashMap<String,String> metadata) {
this.metadata = metadata;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
new file mode 100644
index 0000000..626ac5f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+
+@XmlRootElement(name = "TimelineMetricWithAggregatedValues")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetricWithAggregatedValues {
+ private TimelineMetric timelineMetric;
+ private MetricHostAggregate metricAggregate;
+
+ public TimelineMetricWithAggregatedValues() {
+ }
+
+ public TimelineMetricWithAggregatedValues(TimelineMetric metric, MetricHostAggregate metricAggregate) {
+ timelineMetric = metric;
+ this.metricAggregate = metricAggregate;
+ }
+
+ @XmlElement
+ public MetricHostAggregate getMetricAggregate() {
+ return metricAggregate;
+ }
+ @XmlElement
+ public TimelineMetric getTimelineMetric() {
+ return timelineMetric;
+ }
+
+ public void setTimelineMetric(TimelineMetric timelineMetric) {
+ this.timelineMetric = timelineMetric;
+ }
+
+ public void setMetricAggregate(MetricHostAggregate metricAggregate) {
+ this.metricAggregate = metricAggregate;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineMetricWithAggregatedValues{" +
+ "timelineMetric=" + timelineMetric +
+ ", metricAggregate=" + metricAggregate +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
index 9b0cdbe..ce2cf79 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
@@ -90,6 +90,16 @@ public class AbstractTimelineMetricSinkTest {
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return true;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 61888;
+ }
+
+ @Override
public boolean emitMetrics(TimelineMetrics metrics) {
super.init();
return super.emitMetrics(metrics);
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
index a393a96..f0174d5 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
@@ -192,5 +192,15 @@ public class MetricCollectorHATest {
protected String getHostname() {
return "h1";
}
+
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return true;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 61888;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index 32fe32e..4eb75eb 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -125,6 +125,16 @@ public class HandleConnectExceptionTest {
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return false;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 61888;
+ }
+
+ @Override
public boolean emitMetrics(TimelineMetrics metrics) {
super.init();
return super.emitMetrics(metrics);
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 904c916..6277907 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -63,6 +63,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
private int timeoutSeconds = 10;
private boolean setInstanceId;
private String instanceId;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
+
@Override
public void start() {
@@ -110,6 +113,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
port = configuration.getProperty(COLLECTOR_PORT, "6188");
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
+
+ hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
// Initialize the collector write strategy
super.init();
@@ -162,6 +168,16 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
return hostname;
}
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
public void setPollFrequency(long pollFrequency) {
this.pollFrequency = pollFrequency;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 11e16c2..c235c7c 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -75,6 +75,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
return t;
}
});
+ private int hostInMemoryAggregationPort;
+ private boolean hostInMemoryAggregationEnabled;
@Override
public void init(SubsetConfiguration conf) {
@@ -107,7 +109,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
collectorHosts = parseHostsStringArrayIntoCollection(conf.getStringArray(COLLECTOR_HOSTS_PROPERTY));
port = conf.getString(COLLECTOR_PORT, "6188");
-
+ hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY);
+ hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY);
if (collectorHosts.isEmpty()) {
LOG.error("No Metric collector configured.");
} else {
@@ -249,6 +252,16 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void putMetrics(MetricsRecord record) {
try {
String recordName = record.name();
@@ -308,9 +321,10 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
int sbBaseLen = sb.length();
List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
- Map<String, String> metadata = null;
+ HashMap<String, String> metadata = null;
if (skipAggregation) {
- metadata = Collections.singletonMap("skipAggregation", "true");
+ metadata = new HashMap<>();
+ metadata.put("skipAggregation", "true");
}
long startTime = record.timestamp();
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
new file mode 100644
index 0000000..d7ceedd
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/var/log/ambari-metrics-monitor/ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
new file mode 100644
index 0000000..d9aabab
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=\\var\\log\\ambari-metrics-monitor\\ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
new file mode 100644
index 0000000..c2c7897
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>ambari-metrics</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>ambari-metrics-host-aggregator</artifactId>
+ <packaging>jar</packaging>
+
+ <name>ambari-metrics-host-aggregator</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>14.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-common</artifactId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <version>2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ <version>1.11</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ <version>1.11</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>2.2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ <version>1.11</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.7.1.2.3.4.0-3347</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.6</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
new file mode 100644
index 0000000..b1f60fa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Map;
+
+/**
+ * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
+ */
+public abstract class AbstractMetricPublisherThread extends Thread {
+ protected int publishIntervalInSeconds;
+ protected String publishURL;
+ protected ObjectMapper objectMapper;
+ private Log LOG;
+ protected TimelineMetricsHolder timelineMetricsHolder;
+
+ public AbstractMetricPublisherThread(TimelineMetricsHolder timelineMetricsHolder, String publishURL, int publishIntervalInSeconds) {
+ LOG = LogFactory.getLog(this.getClass());
+ this.publishURL = publishURL;
+ this.publishIntervalInSeconds = publishIntervalInSeconds;
+ this.timelineMetricsHolder = timelineMetricsHolder;
+ objectMapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+ objectMapper.setAnnotationIntrospector(introspector);
+ objectMapper.getSerializationConfig()
+ .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+ }
+
+ /**
+ * Publishes metrics to collector in specified intervals while not interrupted.
+ */
+ @Override
+ public void run() {
+ while (!isInterrupted()) {
+ try {
+ sleep(this.publishIntervalInSeconds * 1000);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ try {
+ processAndPublishMetrics(getMetricsFromCache());
+ } catch (Exception e) {
+ LOG.error("Couldn't process and send metrics : ",e);
+ }
+ }
+ }
+
+ /**
+ * Processes and sends metrics to collector.
+ * @param metricsFromCache
+ * @throws Exception
+ */
+ protected void processAndPublishMetrics(Map<Long, TimelineMetrics> metricsFromCache) throws Exception {
+ if (metricsFromCache.size()==0) return;
+
+ LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
+ publishMetricsJson(processMetrics(metricsFromCache));
+ }
+
+ /**
+ * Returns metrics map. Source is based on implementation.
+ * @return
+ */
+ protected abstract Map<Long,TimelineMetrics> getMetricsFromCache();
+
+ /**
+ * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
+ * @param metricValues
+ * @return
+ */
+ protected abstract String processMetrics(Map<Long, TimelineMetrics> metricValues);
+
+ protected void publishMetricsJson(String jsonData) throws Exception {
+ int timeout = 5 * 1000;
+ HttpURLConnection connection = null;
+ if (this.publishURL == null) {
+ throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+ }
+ LOG.info("Collector URL : " + publishURL);
+ connection = (HttpURLConnection) new URL(this.publishURL).openConnection();
+
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("Connection", "Keep-Alive");
+ connection.setConnectTimeout(timeout);
+ connection.setReadTimeout(timeout);
+ connection.setDoOutput(true);
+
+ if (jsonData != null) {
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write(jsonData.getBytes("UTF-8"));
+ }
+ }
+ int responseCode = connection.getResponseCode();
+ if (responseCode != 200) {
+ throw new Exception("responseCode is " + responseCode);
+ }
+ LOG.info("Successfully sent metrics.");
+ }
+
+ /**
+ * Interrupts the thread.
+ */
+ protected void stopPublisher() {
+ this.interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
new file mode 100644
index 0000000..0540ec9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Thread that aggregates and publishes metrics to collector on specified interval.
+ */
+public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
+
+ private Log LOG;
+
+ public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
+ super(timelineMetricsHolder, collectorURL, interval);
+ LOG = LogFactory.getLog(this.getClass());
+ }
+
+ /**
+ * get metrics map form @TimelineMetricsHolder
+ * @return
+ */
+ @Override
+ protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+ return timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ }
+
+ /**
+ * Aggregates given metrics and converts them into json string that will be send to collector
+ * @param metricForAggregationValues
+ * @return
+ */
+ @Override
+ protected String processMetrics(Map<Long, TimelineMetrics> metricForAggregationValues) {
+ HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
+ for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
+ for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+ if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
+ nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
+ }
+ nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
+ }
+ }
+ Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
+ for (TimelineMetrics metrics : nameToMetricMap.values()) {
+ double sum = 0;
+ double max = Integer.MIN_VALUE;
+ double min = Integer.MAX_VALUE;
+ int count = 0;
+ for (TimelineMetric metric : metrics.getMetrics()) {
+ for (Double value : metric.getMetricValues().values()) {
+ sum+=value;
+ max = Math.max(max, value);
+ min = Math.min(min, value);
+ count++;
+ }
+ }
+ TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
+ tmpMetric.setMetricValues(new TreeMap<Long, Double>());
+ metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
+ }
+ String json = null;
+ try {
+ json = objectMapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
+ LOG.debug(json);
+ } catch (Exception e) {
+ LOG.error("Failed to convert result into json", e);
+ }
+
+ return json;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
new file mode 100644
index 0000000..c6b703b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.sun.jersey.api.container.httpserver.HttpServerFactory;
+import com.sun.jersey.api.core.PackagesResourceConfig;
+import com.sun.jersey.api.core.ResourceConfig;
+import com.sun.net.httpserver.HttpServer;
+
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * WEB application with 2 publisher threads that processes received metrics and submits results to the collector
+ */
+public class AggregatorApplication
+{
+ private static final int STOP_SECONDS_DELAY = 0;
+ private static final int JOIN_SECONDS_TIMEOUT = 2;
+ private static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
+ private static String AGGREGATED_POST_PREFIX = "/aggregated";
+ private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+ private static Log LOG = LogFactory.getLog("AggregatorApplication.class");
+ private final int webApplicationPort;
+ private final int rawPublishingInterval;
+ private final int aggregationInterval;
+ private Configuration configuration;
+ private String [] collectorHosts;
+ private AggregatedMetricsPublisher aggregatePublisher;
+ private RawMetricsPublisher rawPublisher;
+ private TimelineMetricsHolder timelineMetricsHolder;
+ private HttpServer httpServer;
+
+ public AggregatorApplication(String collectorHosts) {
+ initConfiguration();
+ this.collectorHosts = collectorHosts.split(",");
+ this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
+ this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
+ this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
+ this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
+ try {
+ this.httpServer = createHttpServer();
+ } catch (IOException e) {
+ LOG.error("Exception while starting HTTP server. Exiting", e);
+ System.exit(1);
+ }
+ }
+
+ private void initConfiguration() {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = getClass().getClassLoader();
+ }
+
+ URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+ LOG.info("Found metric service configuration: " + amsResUrl);
+ if (amsResUrl == null) {
+ throw new IllegalStateException("Unable to initialize the metrics " +
+ "subsystem. No ams-site present in the classpath.");
+ }
+ configuration = new Configuration(true);
+ try {
+ configuration.addResource(amsResUrl.toURI().toURL());
+ } catch (Exception e) {
+ LOG.error("Couldn't init configuration. ", e);
+ System.exit(1);
+ }
+ }
+
+ private String getHostName() {
+ String hostName = "localhost";
+ try {
+ hostName = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ LOG.error(e);
+ }
+ return hostName;
+ }
+
+ private URI getURI() {
+ URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
+ LOG.info(String.format("Web server at %s", uri));
+ return uri;
+ }
+
+ private HttpServer createHttpServer() throws IOException {
+ ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
+ HashMap<String, Object> params = new HashMap();
+ params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
+ resourceConfig.setPropertiesAndFeatures(params);
+ return HttpServerFactory.create(getURI(), resourceConfig);
+ }
+
+ private void startWebServer() {
+ LOG.info("Starting web server.");
+ this.httpServer.start();
+ }
+
+ private void startAggregatePublisherThread() {
+ LOG.info("Starting aggregated metrics publisher.");
+ String collectorURL = buildBasicCollectorURL(collectorHosts[0]) + AGGREGATED_POST_PREFIX;
+ aggregatePublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, collectorURL, aggregationInterval);
+ aggregatePublisher.start();
+ }
+
+ private void startRawPublisherThread() {
+ LOG.info("Starting raw metrics publisher.");
+ String collectorURL = buildBasicCollectorURL(collectorHosts[0]);
+ rawPublisher = new RawMetricsPublisher(timelineMetricsHolder, collectorURL, rawPublishingInterval);
+ rawPublisher.start();
+ }
+
+
+
+ private void stop() {
+ aggregatePublisher.stopPublisher();
+ rawPublisher.stopPublisher();
+ httpServer.stop(STOP_SECONDS_DELAY);
+ LOG.info("Stopped web server.");
+ try {
+ LOG.info("Waiting for threads to join.");
+ aggregatePublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+ rawPublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+ LOG.info("Gracefully stopped Aggregator Application.");
+ } catch (InterruptedException e) {
+ LOG.error("Received exception during stop : ", e);
+
+ }
+
+ }
+
+ private String buildBasicCollectorURL(String host) {
+ String port = configuration.get("timeline.metrics.service.webapp.address", "0.0.0.0:6188").split(":")[1];
+ String protocol = configuration.get("timeline.metrics.service.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
+ return String.format(BASE_POST_URL, protocol, host, port);
+ }
+
+ public static void main( String[] args ) throws Exception {
+ LOG.info("Starting aggregator application");
+ if (args.length != 1) {
+ throw new Exception("This jar should be run with 1 argument - collector hosts separated with coma");
+ }
+
+ final AggregatorApplication app = new AggregatorApplication(args[0]);
+ app.startAggregatePublisherThread();
+ app.startRawPublisherThread();
+ app.startWebServer();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ LOG.info("Stopping aggregator application");
+ app.stop();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
new file mode 100644
index 0000000..f96d0ed
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+
+@Singleton
+@Path("/ws/v1/timeline")
+public class AggregatorWebService {
+ TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance();
+
+ @GET
+ @Produces("text/json")
+ @Path("/metrics")
+ public Response helloWorld() throws IOException {
+ return Response.ok().build();
+ }
+
+ @POST
+ @Produces(MediaType.TEXT_PLAIN)
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ @Path("/metrics")
+ public Response postMetrics(
+ TimelineMetrics metrics) {
+ metricsHolder.putMetricsForAggregationPublishing(metrics);
+ metricsHolder.putMetricsForRawPublishing(metrics);
+ return Response.ok().build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
new file mode 100644
index 0000000..f317ed9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.Map;
+
+public class RawMetricsPublisher extends AbstractMetricPublisherThread {
+ private final Log LOG;
+
+ public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
+ super(timelineMetricsHolder, collectorURL, interval);
+ LOG = LogFactory.getLog(this.getClass());
+ }
+
+
+ @Override
+ protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+ return timelineMetricsHolder.extractMetricsForRawPublishing();
+ }
+
+ @Override
+ protected String processMetrics(Map<Long, TimelineMetrics> metricValues) {
+ //merge everything in one TimelineMetrics object
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ for (TimelineMetrics metrics : metricValues.values()) {
+ for (TimelineMetric timelineMetric : metrics.getMetrics())
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ }
+ //map TimelineMetrics to json string
+ String json = null;
+ try {
+ json = objectMapper.writeValueAsString(timelineMetrics);
+ LOG.debug(json);
+ } catch (Exception e) {
+ LOG.error("Failed to convert result into json", e);
+ }
+ return json;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
new file mode 100644
index 0000000..b355c97
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Singleton class with 2 guava caches for raw and aggregated metrics storing
+ */
+public class TimelineMetricsHolder {
+ private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
+ private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
+ private Cache<Long, TimelineMetrics> aggregationMetricsCache;
+ private Cache<Long, TimelineMetrics> rawMetricsCache;
+ private static TimelineMetricsHolder instance = null;
+ //to ensure no metric values are expired
+ private static int EXPIRE_DELAY = 30;
+ ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock();
+ ReadWriteLock rawCacheLock = new ReentrantReadWriteLock();
+
+ private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+ this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+ this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+ }
+
+ public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+ if (instance == null) {
+ instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime);
+ }
+ return instance;
+ }
+
+ /**
+ * Uses default expiration time for caches initialization if they are not initialized yet.
+ * @return
+ */
+ public static TimelineMetricsHolder getInstance() {
+ return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME);
+ }
+
+ public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
+ aggregationCacheLock.writeLock().lock();
+ aggregationMetricsCache.put(System.currentTimeMillis(), timelineMetrics);
+ aggregationCacheLock.writeLock().unlock();
+ }
+
+ public Map<Long, TimelineMetrics> extractMetricsForAggregationPublishing() {
+ return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
+ }
+
+ public void putMetricsForRawPublishing(TimelineMetrics metrics) {
+ rawCacheLock.writeLock().lock();
+ rawMetricsCache.put(System.currentTimeMillis(), metrics);
+ rawCacheLock.writeLock().unlock();
+ }
+
+ public Map<Long, TimelineMetrics> extractMetricsForRawPublishing() {
+ return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
+ }
+
+ /**
+ * Returns values from cache and clears the cache
+ * @param cache
+ * @param lock
+ * @return
+ */
+ private Map<Long, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<Long, TimelineMetrics> cache, ReadWriteLock lock) {
+ lock.writeLock().lock();
+ Map<Long, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
+ cache.invalidateAll();
+ lock.writeLock().unlock();
+ return metricsMap;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
index 967e133..9bbb271 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
+++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
@@ -24,7 +24,7 @@ METRIC_MONITOR_PY_SCRIPT=${RESOURCE_MONITORING_DIR}/main.py
PIDFILE=/var/run/ambari-metrics-monitor/ambari-metrics-monitor.pid
OUTFILE=/var/log/ambari-metrics-monitor/ambari-metrics-monitor.out
-STOP_TIMEOUT=5
+STOP_TIMEOUT=10
OK=0
NOTOK=1
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
new file mode 100644
index 0000000..2249e53
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import threading
+import subprocess
+import logging
+import urllib2
+
+logger = logging.getLogger()
+class Aggregator(threading.Thread):
+ def __init__(self, config, stop_handler):
+ threading.Thread.__init__(self)
+ self._config = config
+ self._stop_handler = stop_handler
+ self._aggregator_process = None
+ self._sleep_interval = config.get_collector_sleep_interval()
+ self.stopped = False
+
+ def run(self):
+ java_home = self._config.get_java_home()
+ collector_hosts = self._config.get_metrics_collector_hosts_as_string()
+ jvm_agrs = self._config.get_aggregator_jvm_agrs()
+ config_dir = self._config.get_config_dir()
+ class_name = "org.apache.hadoop.metrics2.host.aggregator.AggregatorApplication"
+ ams_log_file = "ambari-metrics-aggregator.log"
+ additional_classpath = ':{0}'.format(config_dir)
+ ams_log_dir = self._config.ams_monitor_log_dir()
+ logger.info('Starting Aggregator thread.')
+ cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6}"\
+ .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, collector_hosts)
+
+ logger.info("Executing : {0}".format(cmd))
+
+ self._aggregator_process = subprocess.Popen([cmd], stdout = None, stderr = None, shell = True)
+ while not self.stopped:
+ if 0 == self._stop_handler.wait(self._sleep_interval):
+ break
+ pass
+ self.stop()
+
+ def stop(self):
+ self.stopped = True
+ if self._aggregator_process :
+ logger.info('Stopping Aggregator thread.')
+ self._aggregator_process.terminate()
+
+class AggregatorWatchdog(threading.Thread):
+ SLEEP_TIME = 30
+ CONNECTION_TIMEOUT = 5
+ AMS_AGGREGATOR_METRICS_CHECK_URL = "/ws/v1/timeline/metrics/"
+ def __init__(self, config, stop_handler):
+ threading.Thread.__init__(self)
+ self._config = config
+ self._stop_handler = stop_handler
+ self.URL = 'http://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
+ self._is_ok = threading.Event()
+ self.set_is_ok(True)
+ self.stopped = False
+
+ def run(self):
+ logger.info('Starting Aggregator Watchdog thread.')
+ while not self.stopped:
+ if 0 == self._stop_handler.wait(self.SLEEP_TIME):
+ break
+ try:
+ conn = urllib2.urlopen(self.URL, timeout=self.CONNECTION_TIMEOUT)
+ self.set_is_ok(True)
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except Exception, e:
+ self.set_is_ok(False)
+ continue
+ if conn.code != 200:
+ self.set_is_ok(False)
+ continue
+ conn.close()
+
+ def is_ok(self):
+ return self._is_ok.is_set()
+
+ def set_is_ok(self, value):
+ if value == False and self.is_ok() != value:
+ logger.warning("Watcher couldn't connect to aggregator.")
+ self._is_ok.clear()
+ else:
+ self._is_ok.set()
+
+
+ def stop(self):
+ logger.info('Stopping watcher thread.')
+ self.stopped = True
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 2670e76..d1429ed 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -30,6 +30,8 @@ from ambari_commons.os_family_impl import OsFamilyImpl
# Abstraction for OS-dependent configuration defaults
#
class ConfigDefaults(object):
+ def get_config_dir(self):
+ pass
def get_config_file_path(self):
pass
def get_metric_file_path(self):
@@ -40,11 +42,14 @@ class ConfigDefaults(object):
@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
class ConfigDefaultsWindows(ConfigDefaults):
def __init__(self):
+ self._CONFIG_DIR = "conf"
self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini"
self._METRIC_FILE_PATH = "conf\\metric_groups.conf"
self._METRIC_FILE_PATH = "conf\\ca.pem"
pass
+ def get_config_dir(self):
+ return self._CONFIG_DIR
def get_config_file_path(self):
return self._CONFIG_FILE_PATH
def get_metric_file_path(self):
@@ -55,11 +60,13 @@ class ConfigDefaultsWindows(ConfigDefaults):
@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
class ConfigDefaultsLinux(ConfigDefaults):
def __init__(self):
+ self._CONFIG_DIR = "/etc/ambari-metrics-monitor/conf/"
self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini"
self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf"
self._CA_CERTS_FILE_PATH = "/etc/ambari-metrics-monitor/conf/ca.pem"
pass
-
+ def get_config_dir(self):
+ return self._CONFIG_DIR
def get_config_file_path(self):
return self._CONFIG_FILE_PATH
def get_metric_file_path(self):
@@ -71,6 +78,7 @@ configDefaults = ConfigDefaults()
config = ConfigParser.RawConfigParser()
+CONFIG_DIR = configDefaults.get_config_dir()
CONFIG_FILE_PATH = configDefaults.get_config_file_path()
METRIC_FILE_PATH = configDefaults.get_metric_file_path()
CA_CERTS_FILE_PATH = configDefaults.get_ca_certs_file_path()
@@ -191,6 +199,8 @@ class Configuration:
# No hostname script identified in the ambari agent conf
pass
pass
+ def get_config_dir(self):
+ return CONFIG_DIR
def getConfig(self):
return self.config
@@ -214,10 +224,14 @@ class Configuration:
def get_hostname_config(self):
return self.get("default", "hostname", None)
- def get_metrics_collector_hosts(self):
+ def get_metrics_collector_hosts_as_list(self):
hosts = self.get("default", "metrics_servers", "localhost")
return hosts.split(",")
+ def get_metrics_collector_hosts_as_string(self):
+ hosts = self.get("default", "metrics_servers", "localhost")
+ return hosts
+
def get_failover_strategy(self):
return self.get("collector", "failover_strategy", ROUND_ROBIN_FAILOVER_STRATEGY)
@@ -239,6 +253,23 @@ class Configuration:
def is_server_https_enabled(self):
return "true" == str(self.get("collector", "https_enabled")).lower()
+ def get_java_home(self):
+ return self.get("aggregation", "java_home")
+
+ def is_inmemory_aggregation_enabled(self):
+ return "true" == str(self.get("aggregation", "host_in_memory_aggregation")).lower()
+
+ def get_inmemory_aggregation_port(self):
+ return self.get("aggregation", "host_in_memory_aggregation_port")
+
+ def get_aggregator_jvm_agrs(self):
+ hosts = self.get("aggregation", "jvm_arguments", "-Xmx256m -Xms128m -XX:PermSize=68m")
+ return hosts
+
+ def ams_monitor_log_dir(self):
+ hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor")
+ return hosts
+
def is_set_instanceid(self):
return "true" == str(self.get("default", "set.instanceId", 'false')).lower()