You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2017/05/17 16:38:37 UTC

[1/3] ambari git commit: AMBARI-20758 Aggregate local metrics for minute aggregation time window (dsen)

Repository: ambari
Updated Branches:
  refs/heads/trunk 772be786d -> 041d353b0


http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2 b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
index b876a3d..28944ca 100644
--- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
@@ -23,6 +23,8 @@ port={{metric_collector_port}}
 collectionFrequency={{metrics_collection_period}}000
 maxRowCacheSize=10000
 sendInterval={{metrics_report_interval}}000
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 instanceId={{cluster_name}}
 set.instanceId={{set_instanceId}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py
index efea167..d45aea6 100644
--- a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py
@@ -184,6 +184,9 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
 # if hbase is selected the hbase_rs_hosts, should not be empty, but still default just in case
 if 'slave_hosts' in config['clusterHostInfo']:
   rs_hosts = default('/clusterHostInfo/hbase_rs_hosts', '/clusterHostInfo/slave_hosts') #if hbase_rs_hosts not given it is assumed that region servers on same nodes as slaves

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2 b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
index 24535c5..c8f2f13 100644
--- a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
+++ b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
@@ -78,6 +78,8 @@ hbase.sink.timeline.protocol={{metric_collector_protocol}}
 hbase.sink.timeline.port={{metric_collector_port}}
 hbase.sink.timeline.instanceId={{cluster_name}}
 hbase.sink.timeline.set.instanceId={{set_instanceId}}
+hbase.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+hbase.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 hbase.sink.timeline.truststore.path = {{metric_truststore_path}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2 b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
index 9076269..f4e25e1 100644
--- a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
+++ b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
@@ -76,6 +76,8 @@ hbase.sink.timeline.protocol={{metric_collector_protocol}}
 hbase.sink.timeline.port={{metric_collector_port}}
 hbase.sink.timeline.instanceId={{cluster_name}}
 hbase.sink.timeline.set.instanceId={{set_instanceId}}
+hbase.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+hbase.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 hbase.sink.timeline.truststore.path = {{metric_truststore_path}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml
index fae61d3..4b03880 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml
@@ -88,6 +88,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
 *.sink.timeline.port={{metric_collector_port}}
 *.sink.timeline.instanceId={{cluster_name}}
 *.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 *.sink.timeline.truststore.path = {{metric_truststore_path}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py
index d854451..c1128a5 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py
@@ -566,6 +566,8 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
 ########################################################
 ############# Atlas related params #####################
 ########################################################

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2 b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
index 9328f9f..d78a342 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
@@ -53,6 +53,8 @@
   hivemetastore.sink.timeline.collector.hosts={{ams_collector_hosts}}
   hivemetastore.sink.timeline.port={{metric_collector_port}}
   hivemetastore.sink.timeline.protocol={{metric_collector_protocol}}
+  hivemetastore.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  hivemetastore.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 
 {% endif %}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2 b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
index 9a7f9dc..1f496ef 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
@@ -53,5 +53,7 @@
   hiveserver2.sink.timeline.collector.hosts={{ams_collector_hosts}}
   hiveserver2.sink.timeline.port={{metric_collector_port}}
   hiveserver2.sink.timeline.protocol={{metric_collector_protocol}}
+  hiveserver2.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  hiveserver2.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2 b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2
index e9fe024..01869c0 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2
@@ -52,5 +52,7 @@
   llapdaemon.sink.timeline.collector.hosts={{ams_collector_hosts}}
   llapdaemon.sink.timeline.port={{metric_collector_port}}
   llapdaemon.sink.timeline.protocol={{metric_collector_protocol}}
+  llapdaemon.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  llapdaemon.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2 b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
index bd7eb0c..2e25c4a 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
@@ -52,5 +52,7 @@
   llaptaskscheduler.sink.timeline.collector.hosts={{ams_collector_hosts}}
   llaptaskscheduler.sink.timeline.port={{metric_collector_port}}
   llaptaskscheduler.sink.timeline.protocol={{metric_collector_protocol}}
+  llaptaskscheduler.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  llaptaskscheduler.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py
index c0ac535..a12d388 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py
@@ -565,6 +565,9 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
 ########################################################
 ############# Atlas related params #####################
 ########################################################

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2 b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
index 9328f9f..d78a342 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
@@ -53,6 +53,8 @@
   hivemetastore.sink.timeline.collector.hosts={{ams_collector_hosts}}
   hivemetastore.sink.timeline.port={{metric_collector_port}}
   hivemetastore.sink.timeline.protocol={{metric_collector_protocol}}
+  hivemetastore.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  hivemetastore.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 
 {% endif %}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2 b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
index 9a7f9dc..1f496ef 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
@@ -53,5 +53,7 @@
   hiveserver2.sink.timeline.collector.hosts={{ams_collector_hosts}}
   hiveserver2.sink.timeline.port={{metric_collector_port}}
   hiveserver2.sink.timeline.protocol={{metric_collector_protocol}}
+  hiveserver2.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  hiveserver2.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2 b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2
index e9fe024..01869c0 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2
@@ -52,5 +52,7 @@
   llapdaemon.sink.timeline.collector.hosts={{ams_collector_hosts}}
   llapdaemon.sink.timeline.port={{metric_collector_port}}
   llapdaemon.sink.timeline.protocol={{metric_collector_protocol}}
+  llapdaemon.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  llapdaemon.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2 b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
index bd7eb0c..2e25c4a 100644
--- a/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
+++ b/ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
@@ -52,5 +52,7 @@
   llaptaskscheduler.sink.timeline.collector.hosts={{ams_collector_hosts}}
   llaptaskscheduler.sink.timeline.port={{metric_collector_port}}
   llaptaskscheduler.sink.timeline.protocol={{metric_collector_protocol}}
+  llaptaskscheduler.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  llaptaskscheduler.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml
index e01dacd..26e7a77 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml
@@ -422,4 +422,15 @@
     <description>Timeline metrics reporter send interval</description>
     <on-ambari-upgrade add="true"/>
   </property>
+  <property>
+    <name>kafka.timeline.metrics.host_in_memory_aggregation</name>
+    <value>{{host_in_memory_aggregation}}</value>
+    <description>if set to "true" host metrics will be aggregated in memory on each host</description>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.host_in_memory_aggregation_port</name>
+    <value>{{host_in_memory_aggregation_port}}</value>
+    <on-ambari-upgrade add="true"/>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
index 5b0be54..9acc1ef 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
@@ -156,6 +156,9 @@ if has_metric_collector:
     metric_collector_protocol = 'https'
   else:
     metric_collector_protocol = 'http'
+
+  host_in_memory_aggregation = str(default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)).lower()
+  host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
   pass
 
 # Security-related params

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
index d9fae76..78ec165 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
@@ -208,6 +208,8 @@ metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sin
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-with-common-*.jar"
 metric_collector_legacy_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar"
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
 
 
 # Cluster Zookeeper quorum

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2 b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2
index 51162e8..67b89c4 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2
@@ -61,6 +61,8 @@ metrics_collector:
   protocol: "{{metric_collector_protocol}}"
   port: "{{metric_collector_port}}"
   appId: "{{metric_collector_app_id}}"
+  host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
   # HTTPS settings
   truststore.path : "{{metric_truststore_path}}"

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2 b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
index 0501039..1dedffc 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
@@ -25,6 +25,8 @@ sendInterval={{metrics_report_interval}}000
 clusterReporterAppId=nimbus
 instanceId={{cluster_name}}
 set.instanceId={{set_instanceId}}
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 truststore.path = {{metric_truststore_path}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
index 7282bb5..3488e75 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
@@ -164,6 +164,9 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
 # Cluster Zookeeper quorum
 zookeeper_quorum = None
 if has_zk_host:

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
index 1b02a97..1f8499f 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
@@ -77,6 +77,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
 *.sink.timeline.port={{metric_collector_port}}
 *.sink.timeline.instanceId={{cluster_name}}
 *.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 *.sink.timeline.truststore.path = {{metric_truststore_path}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml b/ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml
index fae61d3..4b03880 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml
@@ -88,6 +88,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
 *.sink.timeline.port={{metric_collector_port}}
 *.sink.timeline.instanceId={{cluster_name}}
 *.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 *.sink.timeline.truststore.path = {{metric_truststore_path}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py
index 678bbde..a3830f7 100644
--- a/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py
@@ -158,6 +158,8 @@ if has_metric_collector:
   pass
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
 
 # Cluster Zookeeper quorum
 zookeeper_quorum = None

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2
index 1b02a97..1f8499f 100644
--- a/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2
@@ -77,6 +77,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
 *.sink.timeline.port={{metric_collector_port}}
 *.sink.timeline.instanceId={{cluster_name}}
 *.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 *.sink.timeline.truststore.path = {{metric_truststore_path}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
index 4a60892..3ee8ebc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
@@ -74,6 +74,16 @@ public class TestAmbariMetricsSinkImpl extends AbstractTimelineMetricsSink imple
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return true;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return 61888;
+  }
+
+  @Override
   public void init(MetricsConfiguration configuration) {
 
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git a/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py b/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py
index 6a70675..8cc876f 100644
--- a/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py
+++ b/contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py
@@ -135,6 +135,8 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
 #hadoop params
 
 if has_namenode or dfs_type == 'HCFS':


[2/3] ambari git commit: AMBARI-20758 Aggregate local metrics for minute aggregation time window (dsen)

Posted by ds...@apache.org.
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
index c0feed5..e5da9ba 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
@@ -27,6 +27,9 @@ from event_definition import HostMetricCollectEvent, ProcessMetricCollectEvent
 from metric_collector import MetricsCollector
 from emitter import Emitter
 from host_info import HostInfo
+from aggregator import Aggregator
+from aggregator import AggregatorWatchdog
+
 
 logger = logging.getLogger()
 
@@ -50,11 +53,15 @@ class Controller(threading.Thread):
     self.initialize_events_cache()
     self.emitter = Emitter(self.config, self.application_metric_map, stop_handler)
     self._t = None
+    self.aggregator = None
+    self.aggregator_watchdog = None
 
   def run(self):
     logger.info('Running Controller thread: %s' % threading.currentThread().getName())
 
     self.start_emitter()
+    if self.config.is_inmemory_aggregation_enabled():
+      self.start_aggregator_with_watchdog()
 
     # Wake every 5 seconds to push events to the queue
     while True:
@@ -62,6 +69,10 @@ class Controller(threading.Thread):
         logger.warn('Event Queue full!! Suspending further collections.')
       else:
         self.enqueque_events()
+      # restart aggregator if needed
+      if self.config.is_inmemory_aggregation_enabled() and not self.aggregator_watchdog.is_ok():
+        logger.warning("Aggregator is not available. Restarting aggregator.")
+        self.start_aggregator_with_watchdog()
       pass
       # Wait for the service stop event instead of sleeping blindly
       if 0 == self._stop_handler.wait(self.sleep_interval):
@@ -75,6 +86,12 @@ class Controller(threading.Thread):
     # The emitter thread should have stopped by now, just ensure it has shut
     # down properly
     self.emitter.join(5)
+
+    if self.config.is_inmemory_aggregation_enabled():
+      self.aggregator.stop()
+      self.aggregator_watchdog.stop()
+      self.aggregator.join(5)
+      self.aggregator_watchdog.join(5)
     pass
 
   # TODO: Optimize to not use Timer class and use the Queue instead
@@ -115,3 +132,14 @@ class Controller(threading.Thread):
 
   def start_emitter(self):
     self.emitter.start()
+
+  # Start aggregator and watcher threads
+  def start_aggregator_with_watchdog(self):
+    if self.aggregator:
+      self.aggregator.stop()
+    if self.aggregator_watchdog:
+      self.aggregator.stop()
+    self.aggregator = Aggregator(self.config, self._stop_handler)
+    self.aggregator_watchdog = AggregatorWatchdog(self.config, self._stop_handler)
+    self.aggregator.start()
+    self.aggregator_watchdog.start()

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index e2a7f0d..77b8c23 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -44,10 +44,16 @@ class Emitter(threading.Thread):
     self._stop_handler = stop_handler
     self.application_metric_map = application_metric_map
     self.collector_port = config.get_server_port()
-    self.all_metrics_collector_hosts = config.get_metrics_collector_hosts()
+    self.all_metrics_collector_hosts = config.get_metrics_collector_hosts_as_list()
     self.is_server_https_enabled = config.is_server_https_enabled()
     self.set_instanceid = config.is_set_instanceid()
     self.instanceid = config.get_instanceid()
+    self.is_inmemory_aggregation_enabled = config.is_inmemory_aggregation_enabled()
+
+    if self.is_inmemory_aggregation_enabled:
+      self.collector_port = config.get_inmemory_aggregation_port()
+      self.all_metrics_collector_hosts = ['localhost']
+      self.is_server_https_enabled = False
 
     if self.is_server_https_enabled:
       self.ca_certs = config.get_ca_certs()

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
index bfb6957..7a9fbec 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
@@ -117,7 +117,8 @@ class StopHandlerLinux(StopHandler):
 
   def wait(self, timeout=None):
     # Stop process when stop event received
-    if self.stop_event.wait(timeout):
+    self.stop_event.wait(timeout)
+    if self.stop_event.isSet():
       logger.info("Stop event received")
       return 0
     # Timeout

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
index d218015..53d27f8 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
@@ -21,7 +21,7 @@ limitations under the License.
 import logging
 import os
 import sys
-
+import signal
 from ambari_commons.os_utils import remove_file
 
 from core.controller import Controller
@@ -73,6 +73,10 @@ def server_process_main(stop_handler, scmStatus=None):
   if scmStatus is not None:
     scmStatus.reportStarted()
 
+  # For some reason this is needed to catch system signals like SIGTERM
+  # TODO fix if possible
+  signal.pause()
+
   #The controller thread finishes when the stop event is signaled
   controller.join()
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index 211e9cd..76b1c15 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -72,6 +72,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY;
   private static final String TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + INSTANCE_ID_PROPERTY;
   private static final String TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SET_INSTANCE_ID_PROPERTY;
+  private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY;
+  private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY;
   private static final String TIMELINE_DEFAULT_HOST = "localhost";
   private static final String TIMELINE_DEFAULT_PORT = "6188";
   private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
@@ -96,6 +98,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private String[] includedMetricsPrefixes;
   // Local cache to avoid prefix matching everytime
   private Set<String> excludedMetrics = new HashSet<>();
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -132,6 +136,17 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
     return hostname;
   }
 
+
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }
@@ -169,6 +184,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
         instanceId = props.getString(TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY);
         setInstanceId = props.getBoolean(TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY);
 
+        hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
+        hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
 
         if (metricCollectorProtocol.contains("https")) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 08f0598..24b2c8b 100644
--- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -55,6 +55,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private NimbusClient nimbusClient;
   private String applicationId;
   private int timeoutSeconds;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   public StormTimelineMetricsReporter() {
 
@@ -96,6 +98,16 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
+  @Override
   public void prepare(Map conf) {
     LOG.info("Preparing Storm Metrics Reporter");
     try {
@@ -130,6 +142,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
       applicationId = cf.get(APP_ID).toString();
       setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString());
       instanceId = cf.get(INSTANCE_ID_PROPERTY).toString();
+      hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString());
+      hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString());
 
       collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
       if (protocol.contains("https")) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 20f60e1..c9c0538 100644
--- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -61,6 +61,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private String applicationId;
   private boolean setInstanceId;
   private String instanceId;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -98,6 +100,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
+  @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
     try {
@@ -126,6 +138,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
 
     instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
     setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+    hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
     // Initialize the collector write strategy
     super.init();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 14f160b..5b75065 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -50,6 +50,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private String instanceId;
   private String applicationId;
   private int timeoutSeconds;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   public StormTimelineMetricsReporter() {
 
@@ -91,6 +93,16 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
+  @Override
   public void prepare(Object registrationArgument) {
     LOG.info("Preparing Storm Metrics Reporter");
     try {
@@ -119,6 +131,10 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
       applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
       setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY));
       instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
+
+      hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+      hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+
       if (protocol.contains("https")) {
         String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
         String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 425201c..320e177 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -70,6 +70,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private String applicationId;
   private String instanceId;
   private boolean setInstanceId;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -107,6 +109,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
+  @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
     try {
@@ -137,6 +149,10 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     port = configuration.getProperty(COLLECTOR_PORT, "6188");
     instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
     setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+
+    hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+
     // Initialize the collector write strategy
     super.init();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index c242a2f..f984253 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -24,10 +24,13 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 import org.apache.hadoop.service.AbstractService;
@@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
@@ -62,6 +66,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
@@ -152,10 +157,14 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
       scheduleAggregatorThread(dailyClusterAggregator);
 
       // Start the minute host aggregator
-      TimelineMetricAggregator minuteHostAggregator =
-        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
-          hBaseAccessor, metricsConf, haController);
-      scheduleAggregatorThread(minuteHostAggregator);
+      if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) {
+        LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector");
+      } else {
+        TimelineMetricAggregator minuteHostAggregator =
+          TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+            hBaseAccessor, metricsConf, haController);
+        scheduleAggregatorThread(minuteHostAggregator);
+      }
 
       // Start the hourly host aggregator
       TimelineMetricAggregator hourlyHostAggregator =
@@ -390,6 +399,18 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
   }
 
   @Override
+  public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+    Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+    for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
+      aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
+    }
+    hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+
+    return new TimelinePutResponse();
+  }
+
+  @Override
   public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
           throws SQLException, IOException {
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index fb369e8..3b2a119 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -40,8 +42,6 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 0d5042f..023465b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -296,6 +296,8 @@ public class TimelineMetricConfiguration {
 
   public static final String AMSHBASE_METRICS_WHITESLIST_FILE = "amshbase_metrics_whitelist";
 
+  public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = "timeline.metrics.host.inmemory.aggregation";
+
   private Configuration hbaseConf;
   private Configuration metricsConf;
   private Configuration amsEnvConf;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index bde09cb..d052d54 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -80,6 +81,7 @@ public interface TimelineMetricStore {
    */
   Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException;
 
+  TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException;
   /**
    * Returns all hosts that have written metrics with the apps on the host
    * @return { hostname : [ appIds ] }

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
index 65d54c0..7b03b30 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
@@ -19,10 +19,10 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import java.util.Map;
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 
 /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
deleted file mode 100644
index 825ac25..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.annotate.JsonSubTypes;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import java.io.IOException;
-
-/**
-*
-*/
-@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
-  @JsonSubTypes.Type(value = MetricHostAggregate.class)})
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class MetricAggregate {
-  private static final ObjectMapper mapper = new ObjectMapper();
-
-  protected Double sum = 0.0;
-  protected Double deviation;
-  protected Double max = Double.MIN_VALUE;
-  protected Double min = Double.MAX_VALUE;
-
-  public MetricAggregate() {
-  }
-
-  MetricAggregate(Double sum, Double deviation, Double max,
-                  Double min) {
-    this.sum = sum;
-    this.deviation = deviation;
-    this.max = max;
-    this.min = min;
-  }
-
-  public void updateSum(Double sum) {
-    this.sum += sum;
-  }
-
-  public void updateMax(Double max) {
-    if (max > this.max) {
-      this.max = max;
-    }
-  }
-
-  public void updateMin(Double min) {
-    if (min < this.min) {
-      this.min = min;
-    }
-  }
-
-  @JsonProperty("sum")
-  public Double getSum() {
-    return sum;
-  }
-
-  @JsonProperty("deviation")
-  public Double getDeviation() {
-    return deviation;
-  }
-
-  @JsonProperty("max")
-  public Double getMax() {
-    return max;
-  }
-
-  @JsonProperty("min")
-  public Double getMin() {
-    return min;
-  }
-
-  public void setSum(Double sum) {
-    this.sum = sum;
-  }
-
-  public void setDeviation(Double deviation) {
-    this.deviation = deviation;
-  }
-
-  public void setMax(Double max) {
-    this.max = max;
-  }
-
-  public void setMin(Double min) {
-    this.min = min;
-  }
-
-  public String toJSON() throws IOException {
-    return mapper.writeValueAsString(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
deleted file mode 100644
index 9c837b6..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
-*
-*/
-public class MetricClusterAggregate extends MetricAggregate {
-  private int numberOfHosts;
-
-  @JsonCreator
-  public MetricClusterAggregate() {
-  }
-
-  public MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
-                         Double max, Double min) {
-    super(sum, deviation, max, min);
-    this.numberOfHosts = numberOfHosts;
-  }
-
-  @JsonProperty("numberOfHosts")
-  public int getNumberOfHosts() {
-    return numberOfHosts;
-  }
-
-  public void updateNumberOfHosts(int count) {
-    this.numberOfHosts += count;
-  }
-
-  public void setNumberOfHosts(int numberOfHosts) {
-    this.numberOfHosts = numberOfHosts;
-  }
-
-  /**
-   * Find and update min, max and avg for a minute
-   */
-  public void updateAggregates(MetricClusterAggregate hostAggregate) {
-    updateMax(hostAggregate.getMax());
-    updateMin(hostAggregate.getMin());
-    updateSum(hostAggregate.getSum());
-    updateNumberOfHosts(hostAggregate.getNumberOfHosts());
-  }
-
-  @Override
-  public String toString() {
-    return "MetricAggregate{" +
-      "sum=" + sum +
-      ", numberOfHosts=" + numberOfHosts +
-      ", deviation=" + deviation +
-      ", max=" + max +
-      ", min=" + min +
-      '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
deleted file mode 100644
index 340ec75..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Represents a collection of minute based aggregation of values for
- * resolution greater than a minute.
- */
-public class MetricHostAggregate extends MetricAggregate {
-
-  private long numberOfSamples = 0;
-
-  @JsonCreator
-  public MetricHostAggregate() {
-    super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
-  }
-
-  public MetricHostAggregate(Double sum, int numberOfSamples,
-                             Double deviation,
-                             Double max, Double min) {
-    super(sum, deviation, max, min);
-    this.numberOfSamples = numberOfSamples;
-  }
-
-  @JsonProperty("numberOfSamples")
-  public long getNumberOfSamples() {
-    return numberOfSamples == 0 ? 1 : numberOfSamples;
-  }
-
-  public void updateNumberOfSamples(long count) {
-    this.numberOfSamples += count;
-  }
-
-  public void setNumberOfSamples(long numberOfSamples) {
-    this.numberOfSamples = numberOfSamples;
-  }
-
-  public double getAvg() {
-    return sum / numberOfSamples;
-  }
-
-  /**
-   * Find and update min, max and avg for a minute
-   */
-  public void updateAggregates(MetricHostAggregate hostAggregate) {
-    updateMax(hostAggregate.getMax());
-    updateMin(hostAggregate.getMin());
-    updateSum(hostAggregate.getSum());
-    updateNumberOfSamples(hostAggregate.getNumberOfSamples());
-  }
-
-  @Override
-  public String toString() {
-    return "MetricHostAggregate{" +
-      "sum=" + sum +
-      ", numberOfSamples=" + numberOfSamples +
-      ", deviation=" + deviation +
-      ", max=" + max +
-      ", min=" + min +
-      '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
index 44aca03..9eaf456 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsFilter;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 0934356..ba16b43 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index a5a3499..34b1f9b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -38,6 +38,7 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index 0ea9c08..a17433b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index b5f49fb..672f85f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 9da921a..50cfb08 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
@@ -285,6 +286,36 @@ public class TimelineWebServices {
     }
   }
 
+  /**
+   * Store the given metrics into the timeline store, and return errors that
+   * happened during storing.
+   */
+  @Path("/metrics/aggregated")
+  @POST
+  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelinePutResponse postAggregatedMetrics(
+    @Context HttpServletRequest req,
+    @Context HttpServletResponse res,
+    AggregationResult metrics) {
+
+    init(res);
+    if (metrics == null) {
+      return new TimelinePutResponse();
+    }
+
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing aggregated metrics: " +
+                TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
+      }
+
+      return timelineMetricStore.putHostAggregatedMetrics(metrics);
+    } catch (Exception e) {
+      LOG.error("Error saving metrics.", e);
+      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
   @Path("/containermetrics")
   @POST
   @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 0087fd9..d5baaef 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -26,12 +26,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
index 37ec134..7eeb9c4 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index a910cc2..d668178 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -22,11 +22,11 @@ import com.google.common.collect.Multimap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
index 44f48e8..3009163 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline;
 
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -34,7 +34,7 @@ public class TestMetricHostAggregate {
     assertThat(aggregate.getSum()).isEqualTo(3.0);
     assertThat(aggregate.getMin()).isEqualTo(1.0);
     assertThat(aggregate.getMax()).isEqualTo(2.0);
-    assertThat(aggregate.getAvg()).isEqualTo(3.0 / 2);
+    assertThat(aggregate.calculateAverage()).isEqualTo(3.0 / 2);
   }
 
   @Test
@@ -50,7 +50,7 @@ public class TestMetricHostAggregate {
     assertThat(aggregate.getSum()).isEqualTo(12.0);
     assertThat(aggregate.getMin()).isEqualTo(0.5);
     assertThat(aggregate.getMax()).isEqualTo(7.5);
-    assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
+    assertThat(aggregate.calculateAverage()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
   }
 
   static MetricHostAggregate createAggregate (Double sum, Double min,
@@ -63,4 +63,4 @@ public class TestMetricHostAggregate {
     aggregate.setNumberOfSamples(samplesCount);
     return aggregate;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index f00906e..ac2f9d7 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -92,6 +93,11 @@ public class TestTimelineMetricStore implements TimelineMetricStore {
   }
 
   @Override
+  public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+    return null;
+  }
+
+  @Override
   public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
     return Collections.emptyMap();
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
index fa0cfe9..53f6f6c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
@@ -17,10 +17,10 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
index f083731..07fd85d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
index 9873643..75b3f91 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
@@ -124,14 +125,14 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else if ("mem_free".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else {
         fail("Unexpected entry");
@@ -198,7 +199,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
         assertEquals(12 * 15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
       }
     }
   }
@@ -260,7 +261,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
         assertEquals(12 * 15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
       }
     }
   }
@@ -309,14 +310,14 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else if ("mem_free".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else {
         fail("Unexpected entry");

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index 78db11d..6541b2c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml
index 2d88912..02f9574 100644
--- a/ambari-metrics/pom.xml
+++ b/ambari-metrics/pom.xml
@@ -33,6 +33,7 @@
     <module>ambari-metrics-host-monitoring</module>
     <module>ambari-metrics-grafana</module>
     <module>ambari-metrics-assembly</module>
+    <module>ambari-metrics-host-aggregator</module>
   </modules>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
index 8d1f63f..a0765bf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
@@ -300,6 +300,16 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
     return hostName;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return false;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return 0;
+  }
+
   private List<TimelineMetric> getFilteredMetricList(List<SingleMetric> metrics) {
     final List<TimelineMetric> metricList = new ArrayList<>();
     for (SingleMetric metric : metrics) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
index 150b0a8..5d21514 100644
--- a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
@@ -153,6 +153,8 @@ if has_metric_collector:
   pass
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
 
 # if accumulo is selected accumulo_tserver_hosts should not be empty, but still default just in case
 if 'slave_hosts' in config['clusterHostInfo']:

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2 b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
index 6873c85..742ea3c 100644
--- a/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
+++ b/ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
@@ -16,6 +16,9 @@
 # Poll collectors every {{metrics_report_interval}} seconds
 *.period={{metrics_collection_period}}
 
+*.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+
 {% if has_metric_collector %}
 
 *.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
index cb66537..4d33661 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
@@ -101,6 +101,14 @@
     <on-ambari-upgrade add="true"/>
   </property>
   <property>
+    <name>timeline.metrics.host.inmemory.aggregation.jvm.arguments</name>
+    <value>-Xmx256m -Xms128m -XX:PermSize=68m</value>
+    <description>
+      Local aggregator jvm extra arguments separated with spaces
+    </description>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
     <name>timeline.metrics.skip.network.interfaces.patterns</name>
     <value>None</value>
     <description>

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index 8e1671e..1b085f6 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -787,4 +787,15 @@
     <value>{{cluster_zookeeper_clientPort}}</value>
     <on-ambari-upgrade add="true"/>
   </property>
+  <property>
+    <name>timeline.metrics.host.inmemory.aggregation</name>
+    <value>false</value>
+    <description>if set to "true" host metrics will be aggregated in memory on each host</description>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>timeline.metrics.host.inmemory.aggregation.port</name>
+    <value>61888</value>
+    <on-ambari-upgrade add="true"/>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
index 740a91a..9031b46 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
@@ -93,6 +93,9 @@
               <primary>true</primary>
             </log>
           </logs>
+          <configuration-dependencies>
+            <config-type>ams-site</config-type>
+          </configuration-dependencies>
         </component>
 
         <component>

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
index a929847..f49d47d 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
@@ -163,6 +163,20 @@ def ams(name=None):
               create_parents = True
     )
 
+    if params.host_in_memory_aggregation and params.log4j_props is not None:
+      File(os.path.join(params.ams_monitor_conf_dir, "log4j.properties"),
+           owner=params.ams_user,
+           content=params.log4j_props
+           )
+
+    XmlConfig("ams-site.xml",
+              conf_dir=params.ams_monitor_conf_dir,
+              configurations=params.config['configurations']['ams-site'],
+              configuration_attributes=params.config['configuration_attributes']['ams-site'],
+              owner=params.ams_user,
+              group=params.user_group
+              )
+
     TemplateConfig(
       os.path.join(params.ams_monitor_conf_dir, "metric_monitor.ini"),
       owner=params.ams_user,
@@ -366,6 +380,22 @@ def ams(name=None, action=None):
               create_parents = True
     )
 
+    if params.host_in_memory_aggregation and params.log4j_props is not None:
+      File(format("{params.ams_monitor_conf_dir}/log4j.properties"),
+           mode=0644,
+           group=params.user_group,
+           owner=params.ams_user,
+           content=InlineTemplate(params.log4j_props)
+           )
+
+    XmlConfig("ams-site.xml",
+              conf_dir=params.ams_monitor_conf_dir,
+              configurations=params.config['configurations']['ams-site'],
+              configuration_attributes=params.config['configuration_attributes']['ams-site'],
+              owner=params.ams_user,
+              group=params.user_group
+              )
+
     Execute(format("{sudo} chown -R {ams_user}:{user_group} {ams_monitor_log_dir}")
             )
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index 50dde1c..b8c14f4 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -224,6 +224,11 @@ metrics_collector_heapsize = check_append_heap_property(str(metrics_collector_he
 master_heapsize = check_append_heap_property(str(master_heapsize), "m")
 regionserver_heapsize = check_append_heap_property(str(regionserver_heapsize), "m")
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+host_in_memory_aggregation_jvm_arguments = default("/configurations/ams-env/timeline.metrics.host.inmemory.aggregation.jvm.arguments",
+                                                   "-Xmx256m -Xms128m -XX:PermSize=68m")
+
 regionserver_xmn_max = default('/configurations/ams-hbase-env/hbase_regionserver_xmn_max', None)
 if regionserver_xmn_max:
   regionserver_xmn_max = int(trim_heap_property(str(regionserver_xmn_max), "m"))

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
index 9729bbe..bb0db4f 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
@@ -58,6 +58,9 @@ rpc.protocol={{metric_collector_protocol}}
 
 *.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar
 *.sink.timeline.slave.host.name={{hostname}}
+*.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+
 hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
 hbase.sink.timeline.period={{metrics_collection_period}}
 hbase.sink.timeline.sendInterval={{metrics_report_interval}}000

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
index 769ad67..b7dee50 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
@@ -38,3 +38,10 @@ failover_strategy = {{failover_strategy}}
 failover_strategy_blacklisted_interval_seconds = {{failover_strategy_blacklisted_interval_seconds}}
 port = {{metric_collector_port}}
 https_enabled = {{metric_collector_https_enabled}}
+
+[aggregation]
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+java_home = {{java64_home}}
+jvm_arguments = {{host_in_memory_aggregation_jvm_arguments}}
+ams_monitor_log_dir = {{ams_monitor_log_dir}}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
index 86a290f..0e0c9aa 100644
--- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
@@ -124,6 +124,9 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
 # Cluster Zookeeper quorum
 zookeeper_quorum = None
 if not len(default("/clusterHostInfo/zookeeper_hosts", [])) == 0:


[3/3] ambari git commit: AMBARI-20758 Aggregate local metrics for minute aggregation time window (dsen)

Posted by ds...@apache.org.
AMBARI-20758 Aggregate local metrics for minute aggregation time window (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/041d353b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/041d353b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/041d353b

Branch: refs/heads/trunk
Commit: 041d353b0d75b20b0322097e13a1701226e6fc97
Parents: 772be78
Author: Dmytro Sen <ds...@apache.org>
Authored: Wed May 17 19:38:29 2017 +0300
Committer: Dmytro Sen <ds...@apache.org>
Committed: Wed May 17 19:38:29 2017 +0300

----------------------------------------------------------------------
 .../logfeeder/metrics/LogFeederAMSClient.java   |  12 +-
 ambari-metrics/ambari-metrics-assembly/pom.xml  |  20 +++
 .../src/main/assembly/monitor-windows.xml       |   7 +
 .../src/main/assembly/monitor.xml               |   9 +-
 .../timeline/AbstractTimelineMetricsSink.java   |  24 ++-
 .../sink/timeline/AggregationResult.java        |  60 +++++++
 .../metrics2/sink/timeline/MetricAggregate.java | 110 ++++++++++++
 .../sink/timeline/MetricClusterAggregate.java   |  73 ++++++++
 .../sink/timeline/MetricHostAggregate.java      |  81 +++++++++
 .../metrics2/sink/timeline/TimelineMetric.java  |   6 +-
 .../TimelineMetricWithAggregatedValues.java     |  65 +++++++
 .../AbstractTimelineMetricSinkTest.java         |  10 ++
 .../availability/MetricCollectorHATest.java     |  10 ++
 .../cache/HandleConnectExceptionTest.java       |  10 ++
 .../sink/flume/FlumeTimelineMetricsSink.java    |  16 ++
 .../timeline/HadoopTimelineMetricsSink.java     |  20 ++-
 .../conf/unix/log4j.properties                  |  31 ++++
 .../conf/windows/log4j.properties               |  29 +++
 .../ambari-metrics-host-aggregator/pom.xml      | 120 +++++++++++++
 .../AbstractMetricPublisherThread.java          | 134 ++++++++++++++
 .../aggregator/AggregatedMetricsPublisher.java  | 101 +++++++++++
 .../host/aggregator/AggregatorApplication.java  | 180 +++++++++++++++++++
 .../host/aggregator/AggregatorWebService.java   |  56 ++++++
 .../host/aggregator/RawMetricsPublisher.java    |  60 +++++++
 .../host/aggregator/TimelineMetricsHolder.java  |  98 ++++++++++
 .../conf/unix/ambari-metrics-monitor            |   2 +-
 .../src/main/python/core/aggregator.py          | 110 ++++++++++++
 .../src/main/python/core/config_reader.py       |  35 +++-
 .../src/main/python/core/controller.py          |  28 +++
 .../src/main/python/core/emitter.py             |   8 +-
 .../src/main/python/core/stop_handler.py        |   3 +-
 .../src/main/python/main.py                     |   6 +-
 .../kafka/KafkaTimelineMetricsReporter.java     |  17 ++
 .../storm/StormTimelineMetricsReporter.java     |  14 ++
 .../sink/storm/StormTimelineMetricsSink.java    |  14 ++
 .../storm/StormTimelineMetricsReporter.java     |  16 ++
 .../sink/storm/StormTimelineMetricsSink.java    |  16 ++
 .../timeline/HBaseTimelineMetricStore.java      |  29 ++-
 .../metrics/timeline/PhoenixHBaseAccessor.java  |   4 +-
 .../timeline/TimelineMetricConfiguration.java   |   2 +
 .../metrics/timeline/TimelineMetricStore.java   |   2 +
 .../timeline/TimelineMetricsAggregatorSink.java |   4 +-
 .../timeline/aggregators/MetricAggregate.java   | 110 ------------
 .../aggregators/MetricClusterAggregate.java     |  73 --------
 .../aggregators/MetricHostAggregate.java        |  81 ---------
 .../TimelineMetricAppAggregator.java            |   1 +
 .../TimelineMetricClusterAggregator.java        |   2 +
 .../TimelineMetricClusterAggregatorSecond.java  |   1 +
 .../TimelineMetricHostAggregator.java           |   1 +
 .../aggregators/TimelineMetricReadHelper.java   |   2 +
 .../webapp/TimelineWebServices.java             |  31 ++++
 .../timeline/ITPhoenixHBaseAccessor.java        |   4 +-
 .../metrics/timeline/MetricTestHelper.java      |   2 +-
 .../timeline/PhoenixHBaseAccessorTest.java      |   4 +-
 .../timeline/TestMetricHostAggregate.java       |   8 +-
 .../timeline/TestTimelineMetricStore.java       |   6 +
 .../TimelineMetricsAggregatorMemorySink.java    |   4 +-
 .../aggregators/ITClusterAggregator.java        |   4 +-
 .../aggregators/ITMetricAggregator.java         |  13 +-
 ...melineMetricClusterAggregatorSecondTest.java |   1 +
 ambari-metrics/pom.xml                          |   1 +
 .../system/impl/AmbariMetricSinkImpl.java       |  10 ++
 .../1.6.1.2.2.0/package/scripts/params.py       |   2 +
 .../hadoop-metrics2-accumulo.properties.j2      |   3 +
 .../0.1.0/configuration/ams-env.xml             |   8 +
 .../0.1.0/configuration/ams-site.xml            |  11 ++
 .../AMBARI_METRICS/0.1.0/metainfo.xml           |   3 +
 .../AMBARI_METRICS/0.1.0/package/scripts/ams.py |  30 ++++
 .../0.1.0/package/scripts/params.py             |   5 +
 .../hadoop-metrics2-hbase.properties.j2         |   3 +
 .../package/templates/metric_monitor.ini.j2     |   7 +
 .../FLUME/1.4.0.2.0/package/scripts/params.py   |   3 +
 .../templates/flume-metrics2.properties.j2      |   2 +
 .../0.96.0.2.0/package/scripts/params_linux.py  |   3 +
 ...-metrics2-hbase.properties-GANGLIA-MASTER.j2 |   2 +
 ...doop-metrics2-hbase.properties-GANGLIA-RS.j2 |   2 +
 .../hadoop-metrics2.properties.xml              |   2 +
 .../0.12.0.2.0/package/scripts/params_linux.py  |   2 +
 .../hadoop-metrics2-hivemetastore.properties.j2 |   2 +
 .../hadoop-metrics2-hiveserver2.properties.j2   |   2 +
 .../templates/hadoop-metrics2-llapdaemon.j2     |   2 +
 .../hadoop-metrics2-llaptaskscheduler.j2        |   2 +
 .../2.1.0.3.0/package/scripts/params_linux.py   |   3 +
 .../hadoop-metrics2-hivemetastore.properties.j2 |   2 +
 .../hadoop-metrics2-hiveserver2.properties.j2   |   2 +
 .../templates/hadoop-metrics2-llapdaemon.j2     |   2 +
 .../hadoop-metrics2-llaptaskscheduler.j2        |   2 +
 .../KAFKA/0.8.1/configuration/kafka-broker.xml  |  11 ++
 .../KAFKA/0.8.1/package/scripts/params.py       |   3 +
 .../STORM/0.9.1/package/scripts/params_linux.py |   2 +
 .../0.9.1/package/templates/config.yaml.j2      |   2 +
 .../templates/storm-metrics2.properties.j2      |   2 +
 .../2.0.6/hooks/before-START/scripts/params.py  |   3 +
 .../templates/hadoop-metrics2.properties.j2     |   2 +
 .../hadoop-metrics2.properties.xml              |   2 +
 .../3.0/hooks/before-START/scripts/params.py    |   2 +
 .../templates/hadoop-metrics2.properties.j2     |   2 +
 .../system/impl/TestAmbariMetricsSinkImpl.java  |  10 ++
 .../2.0/hooks/before-START/scripts/params.py    |   2 +
 99 files changed, 1854 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
index 2d1bf40..39526a5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
@@ -89,6 +89,16 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return false;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return 0;
+  }
+
+  @Override
   protected boolean emitMetrics(TimelineMetrics metrics) {
     return super.emitMetrics(metrics);
   }
@@ -103,4 +113,4 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
     return collectorPort;
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/pom.xml b/ambari-metrics/ambari-metrics-assembly/pom.xml
index a4b87de..6b81de5 100644
--- a/ambari-metrics/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics/ambari-metrics-assembly/pom.xml
@@ -35,6 +35,7 @@
   <properties>
     <collector.dir>${project.basedir}/../ambari-metrics-timelineservice</collector.dir>
     <monitor.dir>${project.basedir}/../ambari-metrics-host-monitoring</monitor.dir>
+    <aggregator.dir>${project.basedir}/../ambari-metrics-host-aggregator</aggregator.dir>
     <grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
     <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
     <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
@@ -599,6 +600,19 @@
                       </sources>
                     </mapping>
                     <mapping>
+                      <directory>/var/lib/ambari-metrics-monitor/lib</directory>
+                      <sources>
+                        <source>
+                          <location>
+                            ${aggregator.dir}/target/
+                          </location>
+                          <includes>
+                            <include>ambari-metrics-host-aggregator-${project.version}.jar</include>
+                          </includes>
+                        </source>
+                      </sources>
+                    </mapping>
+                    <mapping>
                       <directory>/etc/ambari-metrics-monitor/conf</directory>
                       <configuration>true</configuration>
                     </mapping>
@@ -744,6 +758,7 @@
                     <path>/var/run/ambari-metrics-grafana</path>
                     <path>/var/log/ambari-metrics-grafana</path>
                     <path>/var/lib/ambari-metrics-collector</path>
+                    <path>/var/lib/ambari-metrics-monitor/lib</path>
                     <path>/var/lib/ambari-metrics-grafana</path>
                     <path>/usr/lib/ambari-metrics-hadoop-sink</path>
                     <path>/usr/lib/ambari-metrics-kafka-sink</path>
@@ -1331,6 +1346,11 @@
       <type>pom</type>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-host-aggregator</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
index ab309a1..d015d31 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
@@ -64,6 +64,13 @@
       </includes>
     </fileSet>
     <fileSet>
+      <directory>${aggregator.dir}/conf/windows</directory>
+      <outputDirectory>conf</outputDirectory>
+      <includes>
+        <include>log4j.properties</include>
+      </includes>
+    </fileSet>
+    <fileSet>
       <directory>${monitor.dir}/conf/windows</directory>
       <outputDirectory>/</outputDirectory>
       <includes>

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
index 99a41c3..448fe62 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
@@ -46,6 +46,13 @@
       </includes>
     </fileSet>
     <fileSet>
+      <directory>${aggregator.dir}/conf/unix</directory>
+      <outputDirectory>conf</outputDirectory>
+      <includes>
+        <include>log4j.properties</include>
+      </includes>
+    </fileSet>
+    <fileSet>
       <directory>${monitor.dir}/conf/unix</directory>
       <outputDirectory>bin</outputDirectory>
       <includes>
@@ -68,4 +75,4 @@
 
 
 
-</assembly>
\ No newline at end of file
+</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 2c6fae2..a8dc571 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -78,6 +78,8 @@ public abstract class AbstractTimelineMetricsSink {
   public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path";
   public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type";
   public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password";
+  public static final String HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = "host_in_memory_aggregation";
+  public static final String HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = "host_in_memory_aggregation_port";
   public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
   public static final String INSTANCE_ID_PROPERTY = "instanceId";
   public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
@@ -241,8 +243,14 @@ public abstract class AbstractTimelineMetricsSink {
   }
 
   protected boolean emitMetrics(TimelineMetrics metrics) {
-    String collectorHost = getCurrentCollectorHost();
-    String connectUrl = getCollectorUri(collectorHost);
+    String connectUrl;
+    if (isHostInMemoryAggregationEnabled()) {
+      connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort()));
+    } else {
+      String collectorHost  = getCurrentCollectorHost();
+      connectUrl = getCollectorUri(collectorHost);
+    }
+
     String jsonData = null;
     LOG.debug("EmitMetrics connectUrl = "  + connectUrl);
     try {
@@ -562,4 +570,16 @@ public abstract class AbstractTimelineMetricsSink {
    * @return String "host1"
    */
   abstract protected String getHostname();
+
+  /**
+   * Check if host in-memory aggregation is enabled
+   * @return
+   */
+  abstract protected boolean isHostInMemoryAggregationEnabled();
+
+  /**
+   * In memory aggregation port
+   * @return
+   */
+  abstract protected int getHostInMemoryAggregationPort();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
new file mode 100644
index 0000000..c903e3d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Set;
+
+@XmlRootElement(name="AggregationResult")
+public class AggregationResult {
+    protected Set<TimelineMetricWithAggregatedValues> result;
+    protected Long timeInMilis;
+
+    @Override
+    public String toString() {
+        return "AggregationResult{" +
+                "result=" + result +
+                ", timeInMilis=" + timeInMilis +
+                '}';
+    }
+
+    public AggregationResult() {
+    }
+
+    public AggregationResult(Set<TimelineMetricWithAggregatedValues> result, Long timeInMilis) {
+        this.result = result;
+        this.timeInMilis = timeInMilis;
+    }
+    @XmlElement
+    public Set<TimelineMetricWithAggregatedValues> getResult() {
+        return result;
+    }
+
+    public void setResult(Set<TimelineMetricWithAggregatedValues> result) {
+        this.result = result;
+    }
+    @XmlElement
+    public Long getTimeInMilis() {
+        return timeInMilis;
+    }
+
+    public void setTimeInMilis(Long timeInMilis) {
+        this.timeInMilis = timeInMilis;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
new file mode 100644
index 0000000..84cba0e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+  @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  protected Double sum = 0.0;
+  protected Double deviation;
+  protected Double max = Double.MIN_VALUE;
+  protected Double min = Double.MAX_VALUE;
+
+  public MetricAggregate() {
+  }
+
+  MetricAggregate(Double sum, Double deviation, Double max,
+                  Double min) {
+    this.sum = sum;
+    this.deviation = deviation;
+    this.max = max;
+    this.min = min;
+  }
+
+  public void updateSum(Double sum) {
+    this.sum += sum;
+  }
+
+  public void updateMax(Double max) {
+    if (max > this.max) {
+      this.max = max;
+    }
+  }
+
+  public void updateMin(Double min) {
+    if (min < this.min) {
+      this.min = min;
+    }
+  }
+
+  @JsonProperty("sum")
+  public Double getSum() {
+    return sum;
+  }
+
+  @JsonProperty("deviation")
+  public Double getDeviation() {
+    return deviation;
+  }
+
+  @JsonProperty("max")
+  public Double getMax() {
+    return max;
+  }
+
+  @JsonProperty("min")
+  public Double getMin() {
+    return min;
+  }
+
+  public void setSum(Double sum) {
+    this.sum = sum;
+  }
+
+  public void setDeviation(Double deviation) {
+    this.deviation = deviation;
+  }
+
+  public void setMax(Double max) {
+    this.max = max;
+  }
+
+  public void setMin(Double min) {
+    this.min = min;
+  }
+
+  public String toJSON() throws IOException {
+    return mapper.writeValueAsString(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
new file mode 100644
index 0000000..7ef2c1d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+  private int numberOfHosts;
+
+  @JsonCreator
+  public MetricClusterAggregate() {
+  }
+
+  public MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+                         Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  @JsonProperty("numberOfHosts")
+  public int getNumberOfHosts() {
+    return numberOfHosts;
+  }
+
+  public void updateNumberOfHosts(int count) {
+    this.numberOfHosts += count;
+  }
+
+  public void setNumberOfHosts(int numberOfHosts) {
+    this.numberOfHosts = numberOfHosts;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  public void updateAggregates(MetricClusterAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+  }
+
+  @Override
+  public String toString() {
+    return "MetricAggregate{" +
+      "sum=" + sum +
+      ", numberOfHosts=" + numberOfHosts +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
new file mode 100644
index 0000000..e190913
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+  private long numberOfSamples = 0;
+
+  @JsonCreator
+  public MetricHostAggregate() {
+    super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+  }
+
+  public MetricHostAggregate(Double sum, int numberOfSamples,
+                             Double deviation,
+                             Double max, Double min) {
+    super(sum, deviation, max, min);
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  @JsonProperty("numberOfSamples")
+  public long getNumberOfSamples() {
+    return numberOfSamples == 0 ? 1 : numberOfSamples;
+  }
+
+  public void updateNumberOfSamples(long count) {
+    this.numberOfSamples += count;
+  }
+
+  public void setNumberOfSamples(long numberOfSamples) {
+    this.numberOfSamples = numberOfSamples;
+  }
+
+  public double calculateAverage() {
+    return sum / numberOfSamples;
+  }
+
+  /**
+   * Find and update min, max and avg for a minute
+   */
+  public void updateAggregates(MetricHostAggregate hostAggregate) {
+    updateMax(hostAggregate.getMax());
+    updateMin(hostAggregate.getMin());
+    updateSum(hostAggregate.getSum());
+    updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+  }
+
+  @Override
+  public String toString() {
+    return "MetricHostAggregate{" +
+      "sum=" + sum +
+      ", numberOfSamples=" + numberOfSamples +
+      ", deviation=" + deviation +
+      ", max=" + max +
+      ", min=" + min +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 44c9d4a..edace52 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -45,7 +45,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
   private String type;
   private String units;
   private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-  private Map<String, String> metadata = new HashMap<>();
+  private HashMap<String, String> metadata = new HashMap<>();
 
   // default
   public TimelineMetric() {
@@ -151,11 +151,11 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
   }
 
   @XmlElement(name = "metadata")
-  public Map<String,String> getMetadata () {
+  public HashMap<String,String> getMetadata () {
     return metadata;
   }
 
-  public void setMetadata (Map<String,String> metadata) {
+  public void setMetadata (HashMap<String,String> metadata) {
     this.metadata = metadata;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
new file mode 100644
index 0000000..626ac5f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+
+@XmlRootElement(name = "TimelineMetricWithAggregatedValues")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetricWithAggregatedValues {
+    private TimelineMetric timelineMetric;
+    private MetricHostAggregate metricAggregate;
+
+    public TimelineMetricWithAggregatedValues() {
+    }
+
+    public TimelineMetricWithAggregatedValues(TimelineMetric metric, MetricHostAggregate metricAggregate) {
+        timelineMetric = metric;
+        this.metricAggregate = metricAggregate;
+    }
+
+    @XmlElement
+    public MetricHostAggregate getMetricAggregate() {
+        return metricAggregate;
+    }
+    @XmlElement
+    public TimelineMetric getTimelineMetric() {
+        return timelineMetric;
+    }
+
+    public void setTimelineMetric(TimelineMetric timelineMetric) {
+        this.timelineMetric = timelineMetric;
+    }
+
+    public void setMetricAggregate(MetricHostAggregate metricAggregate) {
+        this.metricAggregate = metricAggregate;
+    }
+
+    @Override
+    public String toString() {
+        return "TimelineMetricWithAggregatedValues{" +
+                "timelineMetric=" + timelineMetric +
+                ", metricAggregate=" + metricAggregate +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
index 9b0cdbe..ce2cf79 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
@@ -90,6 +90,16 @@ public class AbstractTimelineMetricSinkTest {
     }
 
     @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return true;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
+
+    @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
       super.init();
       return super.emitMetrics(metrics);

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
index a393a96..f0174d5 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
@@ -192,5 +192,15 @@ public class MetricCollectorHATest {
     protected String getHostname() {
       return "h1";
     }
+
+    @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return true;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index 32fe32e..4eb75eb 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -125,6 +125,16 @@ public class HandleConnectExceptionTest {
     }
 
     @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return false;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
+
+    @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
       super.init();
       return super.emitMetrics(metrics);

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 904c916..6277907 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -63,6 +63,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private int timeoutSeconds = 10;
   private boolean setInstanceId;
   private String instanceId;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
+
 
   @Override
   public void start() {
@@ -110,6 +113,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     port = configuration.getProperty(COLLECTOR_PORT, "6188");
     setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
     instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
+
+    hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
     // Initialize the collector write strategy
     super.init();
 
@@ -162,6 +168,16 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     return hostname;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   public void setPollFrequency(long pollFrequency) {
     this.pollFrequency = pollFrequency;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 11e16c2..c235c7c 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -75,6 +75,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
       return t;
     }
   });
+  private int hostInMemoryAggregationPort;
+  private boolean hostInMemoryAggregationEnabled;
 
   @Override
   public void init(SubsetConfiguration conf) {
@@ -107,7 +109,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
     collectorHosts = parseHostsStringArrayIntoCollection(conf.getStringArray(COLLECTOR_HOSTS_PROPERTY));
     port = conf.getString(COLLECTOR_PORT, "6188");
-
+    hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY);
+    hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY);
     if (collectorHosts.isEmpty()) {
       LOG.error("No Metric collector configured.");
     } else {
@@ -249,6 +252,16 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
+  @Override
   public void putMetrics(MetricsRecord record) {
     try {
       String recordName = record.name();
@@ -308,9 +321,10 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
 
       int sbBaseLen = sb.length();
       List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
-      Map<String, String> metadata = null;
+      HashMap<String, String> metadata = null;
       if (skipAggregation) {
-        metadata = Collections.singletonMap("skipAggregation", "true");
+        metadata = new HashMap<>();
+        metadata.put("skipAggregation", "true");
       }
       long startTime = record.timestamp();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
new file mode 100644
index 0000000..d7ceedd
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/var/log/ambari-metrics-monitor/ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
new file mode 100644
index 0000000..d9aabab
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=\\var\\log\\ambari-metrics-monitor\\ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
new file mode 100644
index 0000000..c2c7897
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>ambari-metrics</artifactId>
+        <groupId>org.apache.ambari</groupId>
+        <version>2.0.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>ambari-metrics-host-aggregator</artifactId>
+    <packaging>jar</packaging>
+
+    <name>ambari-metrics-host-aggregator</name>
+    <url>http://maven.apache.org</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>3.8.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>14.0.1</version>
+        </dependency>
+        <dependency>
+              <groupId>org.apache.ambari</groupId>
+              <artifactId>ambari-metrics-common</artifactId>
+              <version>2.0.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+            <version>2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-json</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-server</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.xml.bind</groupId>
+            <artifactId>jaxb-api</artifactId>
+            <version>2.2.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-core</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.7.1.2.3.4.0-3347</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>1.6</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                </configuration>
+
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
new file mode 100644
index 0000000..b1f60fa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Map;
+
+/**
+ * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
+ */
+public abstract class AbstractMetricPublisherThread extends Thread {
+    protected int publishIntervalInSeconds;
+    protected String publishURL;
+    protected ObjectMapper objectMapper;
+    private Log LOG;
+    protected TimelineMetricsHolder timelineMetricsHolder;
+
+    public AbstractMetricPublisherThread(TimelineMetricsHolder timelineMetricsHolder, String publishURL, int publishIntervalInSeconds) {
+        LOG = LogFactory.getLog(this.getClass());
+        this.publishURL = publishURL;
+        this.publishIntervalInSeconds = publishIntervalInSeconds;
+        this.timelineMetricsHolder = timelineMetricsHolder;
+        objectMapper = new ObjectMapper();
+        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+        objectMapper.setAnnotationIntrospector(introspector);
+        objectMapper.getSerializationConfig()
+                .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+    }
+
+    /**
+     * Publishes metrics to collector in specified intervals while not interrupted.
+     */
+    @Override
+    public void run() {
+        while (!isInterrupted()) {
+            try {
+                sleep(this.publishIntervalInSeconds * 1000);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            try {
+                processAndPublishMetrics(getMetricsFromCache());
+            } catch (Exception e) {
+                LOG.error("Couldn't process and send metrics : ",e);
+            }
+        }
+    }
+
+    /**
+     * Processes and sends metrics to collector.
+     * @param metricsFromCache
+     * @throws Exception
+     */
+    protected void processAndPublishMetrics(Map<Long, TimelineMetrics> metricsFromCache) throws Exception {
+        if (metricsFromCache.size()==0) return;
+
+        LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
+        publishMetricsJson(processMetrics(metricsFromCache));
+    }
+
+    /**
+     * Returns metrics map. Source is based on implementation.
+     * @return
+     */
+    protected abstract Map<Long,TimelineMetrics> getMetricsFromCache();
+
+    /**
+     * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
+     * @param metricValues
+     * @return
+     */
+    protected abstract String processMetrics(Map<Long, TimelineMetrics> metricValues);
+
+    protected void publishMetricsJson(String jsonData) throws Exception {
+        int timeout = 5 * 1000;
+        HttpURLConnection connection = null;
+        if (this.publishURL == null) {
+            throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+        }
+        LOG.info("Collector URL : " + publishURL);
+        connection = (HttpURLConnection) new URL(this.publishURL).openConnection();
+
+        connection.setRequestMethod("POST");
+        connection.setRequestProperty("Content-Type", "application/json");
+        connection.setRequestProperty("Connection", "Keep-Alive");
+        connection.setConnectTimeout(timeout);
+        connection.setReadTimeout(timeout);
+        connection.setDoOutput(true);
+
+        if (jsonData != null) {
+            try (OutputStream os = connection.getOutputStream()) {
+                os.write(jsonData.getBytes("UTF-8"));
+            }
+        }
+        int responseCode = connection.getResponseCode();
+        if (responseCode != 200) {
+            throw new Exception("responseCode is " + responseCode);
+        }
+        LOG.info("Successfully sent metrics.");
+    }
+
+    /**
+     * Interrupts the thread.
+     */
+    protected void stopPublisher() {
+        this.interrupt();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
new file mode 100644
index 0000000..0540ec9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Thread that aggregates and publishes metrics to collector on specified interval.
+ */
+public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
+
+    private Log LOG;
+
+    public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
+        super(timelineMetricsHolder, collectorURL, interval);
+        LOG = LogFactory.getLog(this.getClass());
+    }
+
+    /**
+     * get metrics map form @TimelineMetricsHolder
+     * @return
+     */
+    @Override
+    protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+        return timelineMetricsHolder.extractMetricsForAggregationPublishing();
+    }
+
+    /**
+     * Aggregates given metrics and converts them into json string that will be send to collector
+     * @param metricForAggregationValues
+     * @return
+     */
+    @Override
+    protected String processMetrics(Map<Long, TimelineMetrics> metricForAggregationValues) {
+        HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
+        for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
+            for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+                if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
+                    nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
+                }
+                nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
+            }
+        }
+        Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
+        for (TimelineMetrics metrics : nameToMetricMap.values()) {
+            double sum = 0;
+            double max = Integer.MIN_VALUE;
+            double min = Integer.MAX_VALUE;
+            int count = 0;
+            for (TimelineMetric metric : metrics.getMetrics()) {
+                for (Double value : metric.getMetricValues().values()) {
+                    sum+=value;
+                    max = Math.max(max, value);
+                    min = Math.min(min, value);
+                    count++;
+                }
+            }
+            TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
+            tmpMetric.setMetricValues(new TreeMap<Long, Double>());
+            metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
+        }
+        String json = null;
+        try {
+            json = objectMapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
+            LOG.debug(json);
+        } catch (Exception e) {
+            LOG.error("Failed to convert result into json", e);
+        }
+
+        return json;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
new file mode 100644
index 0000000..c6b703b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.sun.jersey.api.container.httpserver.HttpServerFactory;
+import com.sun.jersey.api.core.PackagesResourceConfig;
+import com.sun.jersey.api.core.ResourceConfig;
+import com.sun.net.httpserver.HttpServer;
+
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * WEB application with 2 publisher threads that processes received metrics and submits results to the collector
+ */
+public class AggregatorApplication
+{
+    private static final int STOP_SECONDS_DELAY = 0;
+    private static final int JOIN_SECONDS_TIMEOUT = 2;
+    private static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
+    private static String AGGREGATED_POST_PREFIX = "/aggregated";
+    private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+    private static Log LOG = LogFactory.getLog("AggregatorApplication.class");
+    private final int webApplicationPort;
+    private final int rawPublishingInterval;
+    private final int aggregationInterval;
+    private Configuration configuration;
+    private String [] collectorHosts;
+    private AggregatedMetricsPublisher aggregatePublisher;
+    private RawMetricsPublisher rawPublisher;
+    private TimelineMetricsHolder timelineMetricsHolder;
+    private HttpServer httpServer;
+
+    public AggregatorApplication(String collectorHosts) {
+        initConfiguration();
+        this.collectorHosts = collectorHosts.split(",");
+        this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
+        this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
+        this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
+        this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
+        try {
+            this.httpServer = createHttpServer();
+        } catch (IOException e) {
+            LOG.error("Exception while starting HTTP server. Exiting", e);
+            System.exit(1);
+        }
+    }
+
+    private void initConfiguration() {
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        if (classLoader == null) {
+            classLoader = getClass().getClassLoader();
+        }
+
+        URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+        LOG.info("Found metric service configuration: " + amsResUrl);
+        if (amsResUrl == null) {
+            throw new IllegalStateException("Unable to initialize the metrics " +
+                    "subsystem. No ams-site present in the classpath.");
+        }
+        configuration = new Configuration(true);
+        try {
+            configuration.addResource(amsResUrl.toURI().toURL());
+        } catch (Exception e) {
+            LOG.error("Couldn't init configuration. ", e);
+            System.exit(1);
+        }
+    }
+
+    private String getHostName() {
+        String hostName = "localhost";
+        try {
+            hostName = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            LOG.error(e);
+        }
+        return hostName;
+    }
+
+    private URI getURI() {
+        URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
+        LOG.info(String.format("Web server at %s", uri));
+        return uri;
+    }
+
+    private HttpServer createHttpServer() throws IOException {
+        ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
+        HashMap<String, Object> params = new HashMap();
+        params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
+        resourceConfig.setPropertiesAndFeatures(params);
+        return HttpServerFactory.create(getURI(), resourceConfig);
+    }
+
+    private void startWebServer() {
+        LOG.info("Starting web server.");
+        this.httpServer.start();
+    }
+
+    private void startAggregatePublisherThread() {
+        LOG.info("Starting aggregated metrics publisher.");
+        String collectorURL = buildBasicCollectorURL(collectorHosts[0]) + AGGREGATED_POST_PREFIX;
+        aggregatePublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, collectorURL, aggregationInterval);
+        aggregatePublisher.start();
+    }
+
+    private void startRawPublisherThread() {
+        LOG.info("Starting raw metrics publisher.");
+        String collectorURL = buildBasicCollectorURL(collectorHosts[0]);
+        rawPublisher = new RawMetricsPublisher(timelineMetricsHolder, collectorURL, rawPublishingInterval);
+        rawPublisher.start();
+    }
+
+
+
+    private void stop() {
+        aggregatePublisher.stopPublisher();
+        rawPublisher.stopPublisher();
+        httpServer.stop(STOP_SECONDS_DELAY);
+        LOG.info("Stopped web server.");
+        try {
+            LOG.info("Waiting for threads to join.");
+            aggregatePublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+            rawPublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+            LOG.info("Gracefully stopped Aggregator Application.");
+        } catch (InterruptedException e) {
+            LOG.error("Received exception during stop : ", e);
+
+        }
+
+    }
+
+    private String buildBasicCollectorURL(String host) {
+        String port = configuration.get("timeline.metrics.service.webapp.address", "0.0.0.0:6188").split(":")[1];
+        String protocol = configuration.get("timeline.metrics.service.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
+        return String.format(BASE_POST_URL, protocol, host, port);
+    }
+
+    public static void main( String[] args ) throws Exception {
+        LOG.info("Starting aggregator application");
+        if (args.length != 1) {
+            throw new Exception("This jar should be run with 1 argument - collector hosts separated with coma");
+        }
+
+        final AggregatorApplication app = new AggregatorApplication(args[0]);
+        app.startAggregatePublisherThread();
+        app.startRawPublisherThread();
+        app.startWebServer();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                LOG.info("Stopping aggregator application");
+                app.stop();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
new file mode 100644
index 0000000..f96d0ed
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+
+@Singleton
+@Path("/ws/v1/timeline")
+public class AggregatorWebService {
+    TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance();
+
+    @GET
+    @Produces("text/json")
+    @Path("/metrics")
+    public Response helloWorld() throws IOException {
+        return Response.ok().build();
+    }
+
+    @POST
+    @Produces(MediaType.TEXT_PLAIN)
+    @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+    @Path("/metrics")
+    public Response postMetrics(
+            TimelineMetrics metrics) {
+        metricsHolder.putMetricsForAggregationPublishing(metrics);
+        metricsHolder.putMetricsForRawPublishing(metrics);
+        return Response.ok().build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
new file mode 100644
index 0000000..f317ed9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.Map;
+
+public class RawMetricsPublisher extends AbstractMetricPublisherThread {
+    private final Log LOG;
+
+    public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
+        super(timelineMetricsHolder, collectorURL, interval);
+        LOG = LogFactory.getLog(this.getClass());
+    }
+
+
+    @Override
+    protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+        return timelineMetricsHolder.extractMetricsForRawPublishing();
+    }
+
+    @Override
+    protected String processMetrics(Map<Long, TimelineMetrics> metricValues) {
+        //merge everything in one TimelineMetrics object
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        for (TimelineMetrics metrics : metricValues.values()) {
+            for (TimelineMetric timelineMetric : metrics.getMetrics())
+                timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        }
+        //map TimelineMetrics to json string
+        String json = null;
+        try {
+            json = objectMapper.writeValueAsString(timelineMetrics);
+            LOG.debug(json);
+        } catch (Exception e) {
+            LOG.error("Failed to convert result into json", e);
+        }
+        return json;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
new file mode 100644
index 0000000..b355c97
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Singleton class with 2 guava caches for raw and aggregated metrics storing
+ */
+public class TimelineMetricsHolder {
+    private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
+    private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
+    private Cache<Long, TimelineMetrics> aggregationMetricsCache;
+    private Cache<Long, TimelineMetrics> rawMetricsCache;
+    private static TimelineMetricsHolder instance = null;
+    //to ensure no metric values are expired
+    private static int EXPIRE_DELAY = 30;
+    ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock();
+    ReadWriteLock rawCacheLock = new ReentrantReadWriteLock();
+
+    private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+        this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+        this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+    }
+
+    public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+        if (instance == null) {
+            instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime);
+        }
+        return instance;
+    }
+
+    /**
+     * Uses default expiration time for caches initialization if they are not initialized yet.
+     * @return
+     */
+    public static TimelineMetricsHolder getInstance() {
+        return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME);
+    }
+
+    public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
+        aggregationCacheLock.writeLock().lock();
+        aggregationMetricsCache.put(System.currentTimeMillis(), timelineMetrics);
+        aggregationCacheLock.writeLock().unlock();
+    }
+
+    public Map<Long, TimelineMetrics> extractMetricsForAggregationPublishing() {
+        return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
+    }
+
+    public void putMetricsForRawPublishing(TimelineMetrics metrics) {
+        rawCacheLock.writeLock().lock();
+        rawMetricsCache.put(System.currentTimeMillis(), metrics);
+        rawCacheLock.writeLock().unlock();
+    }
+
+    public Map<Long, TimelineMetrics> extractMetricsForRawPublishing() {
+        return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
+    }
+
+    /**
+     * Returns values from cache and clears the cache
+     * @param cache
+     * @param lock
+     * @return
+     */
+    private Map<Long, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<Long, TimelineMetrics> cache, ReadWriteLock lock) {
+        lock.writeLock().lock();
+        Map<Long, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
+        cache.invalidateAll();
+        lock.writeLock().unlock();
+        return metricsMap;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
index 967e133..9bbb271 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
+++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
@@ -24,7 +24,7 @@ METRIC_MONITOR_PY_SCRIPT=${RESOURCE_MONITORING_DIR}/main.py
 PIDFILE=/var/run/ambari-metrics-monitor/ambari-metrics-monitor.pid
 OUTFILE=/var/log/ambari-metrics-monitor/ambari-metrics-monitor.out
 
-STOP_TIMEOUT=5
+STOP_TIMEOUT=10
 
 OK=0
 NOTOK=1

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
new file mode 100644
index 0000000..2249e53
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import threading
+import subprocess
+import logging
+import urllib2
+
+logger = logging.getLogger()
+class Aggregator(threading.Thread):
+  def __init__(self, config, stop_handler):
+    threading.Thread.__init__(self)
+    self._config = config
+    self._stop_handler = stop_handler
+    self._aggregator_process = None
+    self._sleep_interval = config.get_collector_sleep_interval()
+    self.stopped = False
+
+  def run(self):
+    java_home = self._config.get_java_home()
+    collector_hosts = self._config.get_metrics_collector_hosts_as_string()
+    jvm_agrs = self._config.get_aggregator_jvm_agrs()
+    config_dir = self._config.get_config_dir()
+    class_name = "org.apache.hadoop.metrics2.host.aggregator.AggregatorApplication"
+    ams_log_file = "ambari-metrics-aggregator.log"
+    additional_classpath = ':{0}'.format(config_dir)
+    ams_log_dir = self._config.ams_monitor_log_dir()
+    logger.info('Starting Aggregator thread.')
+    cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6}"\
+      .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, collector_hosts)
+
+    logger.info("Executing : {0}".format(cmd))
+
+    self._aggregator_process = subprocess.Popen([cmd], stdout = None, stderr = None, shell = True)
+    while not self.stopped:
+      if 0 == self._stop_handler.wait(self._sleep_interval):
+        break
+    pass
+    self.stop()
+
+  def stop(self):
+    self.stopped = True
+    if self._aggregator_process :
+      logger.info('Stopping Aggregator thread.')
+      self._aggregator_process.terminate()
+
+class AggregatorWatchdog(threading.Thread):
+  SLEEP_TIME = 30
+  CONNECTION_TIMEOUT = 5
+  AMS_AGGREGATOR_METRICS_CHECK_URL = "/ws/v1/timeline/metrics/"
+  def __init__(self, config, stop_handler):
+    threading.Thread.__init__(self)
+    self._config = config
+    self._stop_handler = stop_handler
+    self.URL = 'http://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
+    self._is_ok = threading.Event()
+    self.set_is_ok(True)
+    self.stopped = False
+
+  def run(self):
+    logger.info('Starting Aggregator Watchdog thread.')
+    while not self.stopped:
+      if 0 == self._stop_handler.wait(self.SLEEP_TIME):
+        break
+      try:
+        conn = urllib2.urlopen(self.URL, timeout=self.CONNECTION_TIMEOUT)
+        self.set_is_ok(True)
+      except (KeyboardInterrupt, SystemExit):
+        raise
+      except Exception, e:
+        self.set_is_ok(False)
+        continue
+      if conn.code != 200:
+        self.set_is_ok(False)
+        continue
+      conn.close()
+
+  def is_ok(self):
+    return self._is_ok.is_set()
+
+  def set_is_ok(self, value):
+    if value == False and self.is_ok() != value:
+      logger.warning("Watcher couldn't connect to aggregator.")
+      self._is_ok.clear()
+    else:
+      self._is_ok.set()
+
+
+  def stop(self):
+    logger.info('Stopping watcher thread.')
+    self.stopped = True
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 2670e76..d1429ed 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -30,6 +30,8 @@ from ambari_commons.os_family_impl import OsFamilyImpl
 # Abstraction for OS-dependent configuration defaults
 #
 class ConfigDefaults(object):
+  def get_config_dir(self):
+    pass
   def get_config_file_path(self):
     pass
   def get_metric_file_path(self):
@@ -40,11 +42,14 @@ class ConfigDefaults(object):
 @OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
 class ConfigDefaultsWindows(ConfigDefaults):
   def __init__(self):
+    self._CONFIG_DIR = "conf"
     self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini"
     self._METRIC_FILE_PATH = "conf\\metric_groups.conf"
     self._METRIC_FILE_PATH = "conf\\ca.pem"
     pass
 
+  def get_config_dir(self):
+    return self._CONFIG_DIR
   def get_config_file_path(self):
     return self._CONFIG_FILE_PATH
   def get_metric_file_path(self):
@@ -55,11 +60,13 @@ class ConfigDefaultsWindows(ConfigDefaults):
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class ConfigDefaultsLinux(ConfigDefaults):
   def __init__(self):
+    self._CONFIG_DIR = "/etc/ambari-metrics-monitor/conf/"
     self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini"
     self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf"
     self._CA_CERTS_FILE_PATH = "/etc/ambari-metrics-monitor/conf/ca.pem"
     pass
-
+  def get_config_dir(self):
+    return self._CONFIG_DIR
   def get_config_file_path(self):
     return self._CONFIG_FILE_PATH
   def get_metric_file_path(self):
@@ -71,6 +78,7 @@ configDefaults = ConfigDefaults()
 
 config = ConfigParser.RawConfigParser()
 
+CONFIG_DIR = configDefaults.get_config_dir()
 CONFIG_FILE_PATH = configDefaults.get_config_file_path()
 METRIC_FILE_PATH = configDefaults.get_metric_file_path()
 CA_CERTS_FILE_PATH = configDefaults.get_ca_certs_file_path()
@@ -191,6 +199,8 @@ class Configuration:
         # No hostname script identified in the ambari agent conf
         pass
     pass
+  def get_config_dir(self):
+    return CONFIG_DIR
 
   def getConfig(self):
     return self.config
@@ -214,10 +224,14 @@ class Configuration:
   def get_hostname_config(self):
     return self.get("default", "hostname", None)
 
-  def get_metrics_collector_hosts(self):
+  def get_metrics_collector_hosts_as_list(self):
     hosts = self.get("default", "metrics_servers", "localhost")
     return hosts.split(",")
 
+  def get_metrics_collector_hosts_as_string(self):
+    hosts = self.get("default", "metrics_servers", "localhost")
+    return hosts
+
   def get_failover_strategy(self):
     return self.get("collector", "failover_strategy", ROUND_ROBIN_FAILOVER_STRATEGY)
 
@@ -239,6 +253,23 @@ class Configuration:
   def is_server_https_enabled(self):
     return "true" == str(self.get("collector", "https_enabled")).lower()
 
+  def get_java_home(self):
+    return self.get("aggregation", "java_home")
+
+  def is_inmemory_aggregation_enabled(self):
+    return "true" == str(self.get("aggregation", "host_in_memory_aggregation")).lower()
+
+  def get_inmemory_aggregation_port(self):
+    return self.get("aggregation", "host_in_memory_aggregation_port")
+
+  def get_aggregator_jvm_agrs(self):
+    hosts = self.get("aggregation", "jvm_arguments", "-Xmx256m -Xms128m -XX:PermSize=68m")
+    return hosts
+
+  def ams_monitor_log_dir(self):
+    hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor")
+    return hosts
+
   def is_set_instanceid(self):
     return "true" == str(self.get("default", "set.instanceId", 'false')).lower()