You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2017/06/09 18:47:16 UTC

[8/9] ambari git commit: AMBARI-21128 Add AMS HA support to local metrics aggregator application (dsen)

AMBARI-21128 Add AMS HA support to local metrics aggregator application (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/29f75089
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/29f75089
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/29f75089

Branch: refs/heads/branch-feature-AMBARI-20859
Commit: 29f750894754ed2112fcedaa0b2f5ec693b5cd0e
Parents: 190ecad
Author: Dmytro Sen <ds...@apache.org>
Authored: Fri Jun 9 14:36:11 2017 +0300
Committer: Dmytro Sen <ds...@apache.org>
Committed: Fri Jun 9 14:36:11 2017 +0300

----------------------------------------------------------------------
 .../timeline/AbstractTimelineMetricsSink.java   |   4 +-
 .../ambari-metrics-host-aggregator/pom.xml      |  30 +++-
 .../AbstractMetricPublisherThread.java          | 134 ---------------
 .../aggregator/AggregatedMetricsPublisher.java  | 101 -----------
 .../host/aggregator/AggregatorApplication.java  |  98 +++++++----
 .../host/aggregator/AggregatorWebService.java   |   2 +-
 .../host/aggregator/RawMetricsPublisher.java    |  60 -------
 .../host/aggregator/TimelineMetricsHolder.java  |  26 ++-
 .../sink/timeline/AbstractMetricPublisher.java  | 169 +++++++++++++++++++
 .../timeline/AggregatedMetricsPublisher.java    | 103 +++++++++++
 .../sink/timeline/RawMetricsPublisher.java      |  65 +++++++
 .../aggregator/AggregatorApplicationTest.java   |  55 ++++++
 .../aggregator/AggregatorWebServiceTest.java    | 135 +++++++++++++++
 .../aggregator/TimelineMetricsHolderTest.java   | 107 ++++++++++++
 .../timeline/AbstractMetricPublisherTest.java   |  82 +++++++++
 .../AggregatedMetricsPublisherTest.java         | 154 +++++++++++++++++
 .../sink/timeline/RawMetricsPublisherTest.java  | 151 +++++++++++++++++
 .../src/main/python/core/aggregator.py          |   6 +-
 .../src/main/python/core/controller.py          |   2 +-
 19 files changed, 1133 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index fddf4b3..644d978 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -88,7 +88,7 @@ public abstract class AbstractTimelineMetricsSink {
   private static final String WWW_AUTHENTICATE = "WWW-Authenticate";
   private static final String NEGOTIATE = "Negotiate";
 
-  protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
+  protected final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
   public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
   protected static final AtomicInteger nullCollectorCounter = new AtomicInteger(0);
   public static int NUMBER_OF_NULL_COLLECTOR_EXCEPTIONS = 20;
@@ -120,7 +120,7 @@ public abstract class AbstractTimelineMetricsSink {
   private volatile boolean isInitializedForHA = false;
 
   @SuppressWarnings("all")
-  private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 5;
+  private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 3;
 
   private final Gson gson = new Gson();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
index 0598bef..24432dd 100644
--- a/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
+++ b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
@@ -38,12 +38,6 @@
 
     <dependencies>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>3.8.1</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>14.0.1</version>
@@ -83,6 +77,30 @@
             <artifactId>hadoop-common</artifactId>
             <version>2.7.1.2.3.4.0-3347</version>
         </dependency>
+        <dependency>
+            <groupId>com.sun.jersey.jersey-test-framework</groupId>
+            <artifactId>jersey-test-framework-core</artifactId>
+            <version>1.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey.jersey-test-framework</groupId>
+            <artifactId>jersey-test-framework-grizzly2</artifactId>
+            <version>1.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>3.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.2</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
deleted file mode 100644
index b1f60fa..0000000
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.host.aggregator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Map;
-
-/**
- * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
- */
-public abstract class AbstractMetricPublisherThread extends Thread {
-    protected int publishIntervalInSeconds;
-    protected String publishURL;
-    protected ObjectMapper objectMapper;
-    private Log LOG;
-    protected TimelineMetricsHolder timelineMetricsHolder;
-
-    public AbstractMetricPublisherThread(TimelineMetricsHolder timelineMetricsHolder, String publishURL, int publishIntervalInSeconds) {
-        LOG = LogFactory.getLog(this.getClass());
-        this.publishURL = publishURL;
-        this.publishIntervalInSeconds = publishIntervalInSeconds;
-        this.timelineMetricsHolder = timelineMetricsHolder;
-        objectMapper = new ObjectMapper();
-        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
-        objectMapper.setAnnotationIntrospector(introspector);
-        objectMapper.getSerializationConfig()
-                .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
-    }
-
-    /**
-     * Publishes metrics to collector in specified intervals while not interrupted.
-     */
-    @Override
-    public void run() {
-        while (!isInterrupted()) {
-            try {
-                sleep(this.publishIntervalInSeconds * 1000);
-            } catch (InterruptedException e) {
-                //Ignore
-            }
-            try {
-                processAndPublishMetrics(getMetricsFromCache());
-            } catch (Exception e) {
-                LOG.error("Couldn't process and send metrics : ",e);
-            }
-        }
-    }
-
-    /**
-     * Processes and sends metrics to collector.
-     * @param metricsFromCache
-     * @throws Exception
-     */
-    protected void processAndPublishMetrics(Map<Long, TimelineMetrics> metricsFromCache) throws Exception {
-        if (metricsFromCache.size()==0) return;
-
-        LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
-        publishMetricsJson(processMetrics(metricsFromCache));
-    }
-
-    /**
-     * Returns metrics map. Source is based on implementation.
-     * @return
-     */
-    protected abstract Map<Long,TimelineMetrics> getMetricsFromCache();
-
-    /**
-     * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
-     * @param metricValues
-     * @return
-     */
-    protected abstract String processMetrics(Map<Long, TimelineMetrics> metricValues);
-
-    protected void publishMetricsJson(String jsonData) throws Exception {
-        int timeout = 5 * 1000;
-        HttpURLConnection connection = null;
-        if (this.publishURL == null) {
-            throw new IOException("Unknown URL. Unable to connect to metrics collector.");
-        }
-        LOG.info("Collector URL : " + publishURL);
-        connection = (HttpURLConnection) new URL(this.publishURL).openConnection();
-
-        connection.setRequestMethod("POST");
-        connection.setRequestProperty("Content-Type", "application/json");
-        connection.setRequestProperty("Connection", "Keep-Alive");
-        connection.setConnectTimeout(timeout);
-        connection.setReadTimeout(timeout);
-        connection.setDoOutput(true);
-
-        if (jsonData != null) {
-            try (OutputStream os = connection.getOutputStream()) {
-                os.write(jsonData.getBytes("UTF-8"));
-            }
-        }
-        int responseCode = connection.getResponseCode();
-        if (responseCode != 200) {
-            throw new Exception("responseCode is " + responseCode);
-        }
-        LOG.info("Successfully sent metrics.");
-    }
-
-    /**
-     * Interrupts the thread.
-     */
-    protected void stopPublisher() {
-        this.interrupt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
deleted file mode 100644
index 0540ec9..0000000
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.host.aggregator;
-
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-/**
- * Thread that aggregates and publishes metrics to collector on specified interval.
- */
-public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
-
-    private Log LOG;
-
-    public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
-        super(timelineMetricsHolder, collectorURL, interval);
-        LOG = LogFactory.getLog(this.getClass());
-    }
-
-    /**
-     * get metrics map form @TimelineMetricsHolder
-     * @return
-     */
-    @Override
-    protected Map<Long, TimelineMetrics> getMetricsFromCache() {
-        return timelineMetricsHolder.extractMetricsForAggregationPublishing();
-    }
-
-    /**
-     * Aggregates given metrics and converts them into json string that will be send to collector
-     * @param metricForAggregationValues
-     * @return
-     */
-    @Override
-    protected String processMetrics(Map<Long, TimelineMetrics> metricForAggregationValues) {
-        HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
-        for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
-            for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
-                if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
-                    nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
-                }
-                nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
-            }
-        }
-        Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
-        for (TimelineMetrics metrics : nameToMetricMap.values()) {
-            double sum = 0;
-            double max = Integer.MIN_VALUE;
-            double min = Integer.MAX_VALUE;
-            int count = 0;
-            for (TimelineMetric metric : metrics.getMetrics()) {
-                for (Double value : metric.getMetricValues().values()) {
-                    sum+=value;
-                    max = Math.max(max, value);
-                    min = Math.min(min, value);
-                    count++;
-                }
-            }
-            TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
-            tmpMetric.setMetricValues(new TreeMap<Long, Double>());
-            metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
-        }
-        String json = null;
-        try {
-            json = objectMapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
-            LOG.debug(json);
-        } catch (Exception e) {
-            LOG.error("Failed to convert result into json", e);
-        }
-
-        return json;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
index c6b703b..1e5cc82 100644
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
@@ -33,6 +33,9 @@ import java.util.HashMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher;
+import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher;
+import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher;
 
 /**
  * WEB application with 2 publisher threads that processes received metrics and submits results to the collector
@@ -40,24 +43,25 @@ import org.apache.hadoop.conf.Configuration;
 public class AggregatorApplication
 {
     private static final int STOP_SECONDS_DELAY = 0;
-    private static final int JOIN_SECONDS_TIMEOUT = 2;
-    private static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
-    private static String AGGREGATED_POST_PREFIX = "/aggregated";
+    private static final int JOIN_SECONDS_TIMEOUT = 5;
     private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
-    private static Log LOG = LogFactory.getLog("AggregatorApplication.class");
+    private Log LOG;
     private final int webApplicationPort;
     private final int rawPublishingInterval;
     private final int aggregationInterval;
     private Configuration configuration;
-    private String [] collectorHosts;
-    private AggregatedMetricsPublisher aggregatePublisher;
-    private RawMetricsPublisher rawPublisher;
+    private Thread aggregatePublisherThread;
+    private Thread rawPublisherThread;
     private TimelineMetricsHolder timelineMetricsHolder;
     private HttpServer httpServer;
 
-    public AggregatorApplication(String collectorHosts) {
+    public AggregatorApplication(String hostname, String collectorHosts) {
+        LOG = LogFactory.getLog(this.getClass());
+        configuration = new Configuration(true);
         initConfiguration();
-        this.collectorHosts = collectorHosts.split(",");
+        configuration.set("timeline.metrics.collector.hosts", collectorHosts);
+        configuration.set("timeline.metrics.hostname", hostname);
+        configuration.set("timeline.metrics.zk.quorum", getZkQuorumFromConfiguration());
         this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
         this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
         this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
@@ -70,7 +74,13 @@ public class AggregatorApplication
         }
     }
 
-    private void initConfiguration() {
+    private String getZkQuorumFromConfiguration() {
+        String zkClientPort = configuration.getTrimmed("cluster.zookeeper.property.clientPort", "2181");
+        String zkServerHosts = configuration.getTrimmed("cluster.zookeeper.quorum", "");
+        return getZkConnectionUrl(zkClientPort, zkServerHosts);
+    }
+
+    protected void initConfiguration() {
         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
         if (classLoader == null) {
             classLoader = getClass().getClassLoader();
@@ -82,7 +92,7 @@ public class AggregatorApplication
             throw new IllegalStateException("Unable to initialize the metrics " +
                     "subsystem. No ams-site present in the classpath.");
         }
-        configuration = new Configuration(true);
+
         try {
             configuration.addResource(amsResUrl.toURI().toURL());
         } catch (Exception e) {
@@ -91,7 +101,7 @@ public class AggregatorApplication
         }
     }
 
-    private String getHostName() {
+    protected String getHostName() {
         String hostName = "localhost";
         try {
             hostName = InetAddress.getLocalHost().getCanonicalHostName();
@@ -101,13 +111,13 @@ public class AggregatorApplication
         return hostName;
     }
 
-    private URI getURI() {
+    protected URI getURI() {
         URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
         LOG.info(String.format("Web server at %s", uri));
         return uri;
     }
 
-    private HttpServer createHttpServer() throws IOException {
+    protected HttpServer createHttpServer() throws IOException {
         ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
         HashMap<String, Object> params = new HashMap();
         params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
@@ -122,29 +132,30 @@ public class AggregatorApplication
 
     private void startAggregatePublisherThread() {
         LOG.info("Starting aggregated metrics publisher.");
-        String collectorURL = buildBasicCollectorURL(collectorHosts[0]) + AGGREGATED_POST_PREFIX;
-        aggregatePublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, collectorURL, aggregationInterval);
-        aggregatePublisher.start();
+        AbstractMetricPublisher metricPublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, configuration, aggregationInterval);
+        aggregatePublisherThread = new Thread(metricPublisher);
+        aggregatePublisherThread.start();
     }
 
     private void startRawPublisherThread() {
         LOG.info("Starting raw metrics publisher.");
-        String collectorURL = buildBasicCollectorURL(collectorHosts[0]);
-        rawPublisher = new RawMetricsPublisher(timelineMetricsHolder, collectorURL, rawPublishingInterval);
-        rawPublisher.start();
+        AbstractMetricPublisher metricPublisher = new RawMetricsPublisher(timelineMetricsHolder, configuration, rawPublishingInterval);
+        rawPublisherThread = aggregatePublisherThread = new Thread(metricPublisher);
+        aggregatePublisherThread.start();
     }
 
 
 
     private void stop() {
-        aggregatePublisher.stopPublisher();
-        rawPublisher.stopPublisher();
+        LOG.info("Stopping aggregator application");
+        aggregatePublisherThread.interrupt();
+        rawPublisherThread.interrupt();
         httpServer.stop(STOP_SECONDS_DELAY);
         LOG.info("Stopped web server.");
         try {
             LOG.info("Waiting for threads to join.");
-            aggregatePublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
-            rawPublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+            aggregatePublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
+            rawPublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
             LOG.info("Gracefully stopped Aggregator Application.");
         } catch (InterruptedException e) {
             LOG.error("Received exception during stop : ", e);
@@ -153,28 +164,43 @@ public class AggregatorApplication
 
     }
 
-    private String buildBasicCollectorURL(String host) {
-        String port = configuration.get("timeline.metrics.service.webapp.address", "0.0.0.0:6188").split(":")[1];
-        String protocol = configuration.get("timeline.metrics.service.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
-        return String.format(BASE_POST_URL, protocol, host, port);
+    private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
+        StringBuilder sb = new StringBuilder();
+        String[] quorumParts = zkQuorum.split(",");
+        String prefix = "";
+        for (String part : quorumParts) {
+            sb.append(prefix);
+            sb.append(part.trim());
+            if (!part.contains(":")) {
+                sb.append(":");
+                sb.append(zkClientPort);
+            }
+            prefix = ",";
+        }
+        return sb.toString();
     }
 
     public static void main( String[] args ) throws Exception {
-        LOG.info("Starting aggregator application");
-        if (args.length != 1) {
-            throw new Exception("This jar should be run with 1 argument - collector hosts separated with coma");
+        if (args.length != 2) {
+            throw new Exception("This jar should be executed with 2 arguments : 1st - current host name, " +
+                    "2nd - collector hosts separated with coma");
         }
 
-        final AggregatorApplication app = new AggregatorApplication(args[0]);
-        app.startAggregatePublisherThread();
-        app.startRawPublisherThread();
-        app.startWebServer();
+        final AggregatorApplication app = new AggregatorApplication(args[0], args[1]);
+
+        app.startWebServerAndPublishersThreads();
 
         Runtime.getRuntime().addShutdownHook(new Thread() {
             public void run() {
-                LOG.info("Stopping aggregator application");
                 app.stop();
             }
         });
     }
+
+    private void startWebServerAndPublishersThreads() {
+        LOG.info("Starting aggregator application");
+        startAggregatePublisherThread();
+        startRawPublisherThread();
+        startWebServer();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
index f96d0ed..b151209 100644
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
@@ -39,7 +39,7 @@ public class AggregatorWebService {
     @GET
     @Produces("text/json")
     @Path("/metrics")
-    public Response helloWorld() throws IOException {
+    public Response getOkResponse() throws IOException {
         return Response.ok().build();
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
deleted file mode 100644
index f317ed9..0000000
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.host.aggregator;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-
-import java.util.Map;
-
-public class RawMetricsPublisher extends AbstractMetricPublisherThread {
-    private final Log LOG;
-
-    public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
-        super(timelineMetricsHolder, collectorURL, interval);
-        LOG = LogFactory.getLog(this.getClass());
-    }
-
-
-    @Override
-    protected Map<Long, TimelineMetrics> getMetricsFromCache() {
-        return timelineMetricsHolder.extractMetricsForRawPublishing();
-    }
-
-    @Override
-    protected String processMetrics(Map<Long, TimelineMetrics> metricValues) {
-        //merge everything in one TimelineMetrics object
-        TimelineMetrics timelineMetrics = new TimelineMetrics();
-        for (TimelineMetrics metrics : metricValues.values()) {
-            for (TimelineMetric timelineMetric : metrics.getMetrics())
-                timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
-        }
-        //map TimelineMetrics to json string
-        String json = null;
-        try {
-            json = objectMapper.writeValueAsString(timelineMetrics);
-            LOG.debug(json);
-        } catch (Exception e) {
-            LOG.error("Failed to convert result into json", e);
-        }
-        return json;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
index b355c97..03b6542 100644
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.metrics2.host.aggregator;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
@@ -33,8 +35,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class TimelineMetricsHolder {
     private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
     private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
-    private Cache<Long, TimelineMetrics> aggregationMetricsCache;
-    private Cache<Long, TimelineMetrics> rawMetricsCache;
+    private Cache<String, TimelineMetrics> aggregationMetricsCache;
+    private Cache<String, TimelineMetrics> rawMetricsCache;
     private static TimelineMetricsHolder instance = null;
     //to ensure no metric values are expired
     private static int EXPIRE_DELAY = 30;
@@ -63,21 +65,29 @@ public class TimelineMetricsHolder {
 
     public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
         aggregationCacheLock.writeLock().lock();
-        aggregationMetricsCache.put(System.currentTimeMillis(), timelineMetrics);
+        aggregationMetricsCache.put(calculateCacheKey(timelineMetrics), timelineMetrics);
         aggregationCacheLock.writeLock().unlock();
     }
 
-    public Map<Long, TimelineMetrics> extractMetricsForAggregationPublishing() {
+    private String calculateCacheKey(TimelineMetrics timelineMetrics) {
+        List<TimelineMetric>  metrics =  timelineMetrics.getMetrics();
+        if (metrics.size() > 0) {
+            return  metrics.get(0).getAppId() + System.currentTimeMillis();
+        }
+        return String.valueOf(System.currentTimeMillis());
+    }
+
+    public Map<String, TimelineMetrics> extractMetricsForAggregationPublishing() {
         return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
     }
 
     public void putMetricsForRawPublishing(TimelineMetrics metrics) {
         rawCacheLock.writeLock().lock();
-        rawMetricsCache.put(System.currentTimeMillis(), metrics);
+        rawMetricsCache.put(calculateCacheKey(metrics), metrics);
         rawCacheLock.writeLock().unlock();
     }
 
-    public Map<Long, TimelineMetrics> extractMetricsForRawPublishing() {
+    public Map<String, TimelineMetrics> extractMetricsForRawPublishing() {
         return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
     }
 
@@ -87,9 +97,9 @@ public class TimelineMetricsHolder {
      * @param lock
      * @return
      */
-    private Map<Long, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<Long, TimelineMetrics> cache, ReadWriteLock lock) {
+    private Map<String, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<String, TimelineMetrics> cache, ReadWriteLock lock) {
         lock.writeLock().lock();
-        Map<Long, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
+        Map<String, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
         cache.invalidateAll();
         lock.writeLock().unlock();
         return metricsMap;

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
new file mode 100644
index 0000000..5af115f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
+ */
+public abstract class AbstractMetricPublisher extends AbstractTimelineMetricsSink implements Runnable {
+
+    private static final String AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
+    private static final String AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY = "ssl.server.truststore.password";
+    private static final String AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.type";
+    private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy";
+    private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address";
+    private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts";
+    private static final String PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY = "timeline.metrics.zk.quorum";
+    private static final String PUBLISHER_HOSTNAME_PROPERTY = "timeline.metrics.hostname";
+    protected static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
+    protected int publishIntervalInSeconds;
+    private Log LOG;
+    protected TimelineMetricsHolder timelineMetricsHolder;
+    protected Configuration configuration;
+    private String collectorProtocol;
+    private String collectorPort;
+    private Collection<String> collectorHosts;
+    private String hostname;
+    private String zkQuorum;
+
+    public AbstractMetricPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int publishIntervalInSeconds) {
+        LOG = LogFactory.getLog(this.getClass());
+        this.configuration = configuration;
+        this.publishIntervalInSeconds = publishIntervalInSeconds;
+        this.timelineMetricsHolder = timelineMetricsHolder;
+        configure();
+    }
+
+    protected void configure() {
+        collectorProtocol = configuration.get(AMS_SITE_HTTP_POLICY_PROPERTY, "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
+        collectorPort = configuration.getTrimmed(AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY, "0.0.0.0:6188").split(":")[1];
+        collectorHosts = parseHostsStringIntoCollection(configuration.getTrimmed(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
+        zkQuorum = configuration.get(PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY, "");
+        hostname = configuration.get(PUBLISHER_HOSTNAME_PROPERTY, "localhost");
+        collectorHosts = parseHostsStringIntoCollection(configuration.get(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
+        if (collectorHosts.isEmpty()) {
+            LOG.error("No Metric collector configured.");
+        } else {
+            if (collectorProtocol.contains("https")) {
+                String trustStorePath = configuration.get(AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY).trim();
+                String trustStoreType = configuration.get(AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY).trim();
+                String trustStorePwd = configuration.get(AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
+                loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
+            }
+        }
+    }
+
+    /**
+     * Publishes metrics to collector in specified intervals while not interrupted.
+     */
+    @Override
+    public void run() {
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                Thread.sleep(this.publishIntervalInSeconds * 1000);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            try {
+                processAndPublishMetrics(getMetricsFromCache());
+            } catch (Exception e) {
+                //ignore
+            }
+        }
+    }
+
+    /**
+     * Processes and sends metrics to collector.
+     * @param metricsFromCache
+     * @throws Exception
+     */
+    protected void processAndPublishMetrics(Map<String, TimelineMetrics> metricsFromCache) throws Exception {
+        if (metricsFromCache.size()==0) return;
+
+        LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
+        emitMetricsJson(getCollectorUri(getCurrentCollectorHost()), processMetrics(metricsFromCache));
+    }
+
+    /**
+     * Returns metrics map. Source is based on implementation.
+     * @return
+     */
+    protected abstract Map<String,TimelineMetrics> getMetricsFromCache();
+
+    /**
+     * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
+     * @param metricValues
+     * @return
+     */
+    protected abstract String processMetrics(Map<String, TimelineMetrics> metricValues);
+
+    protected abstract String getPostUrl();
+
+    @Override
+    protected String getCollectorUri(String host) {
+        return String.format(getPostUrl(), getCollectorProtocol(), host, getCollectorPort());
+    }
+
+    @Override
+    protected String getCollectorProtocol() {
+        return collectorProtocol;
+    }
+
+    @Override
+    protected String getCollectorPort() {
+        return collectorPort;
+    }
+
+    @Override
+    protected int getTimeoutSeconds() {
+        return DEFAULT_POST_TIMEOUT_SECONDS;
+    }
+
+    @Override
+    protected String getZookeeperQuorum() {
+        return zkQuorum;
+    }
+
+    @Override
+    protected Collection<String> getConfiguredCollectorHosts() {
+        return collectorHosts;
+    }
+
+    @Override
+    protected String getHostname() {
+        return hostname;
+    }
+
+    @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+        return false;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
new file mode 100644
index 0000000..c8dffab
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Thread that aggregates and publishes metrics to collector on specified interval.
+ */
+public class AggregatedMetricsPublisher extends AbstractMetricPublisher {
+    private static String AGGREGATED_POST_PREFIX = "/aggregated";
+    private Log LOG;
+
+    public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
+        super(timelineMetricsHolder, configuration, interval);
+        LOG = LogFactory.getLog(this.getClass());
+    }
+
+    /**
+     * get metrics map form @TimelineMetricsHolder
+     * @return
+     */
+    @Override
+    protected Map<String, TimelineMetrics> getMetricsFromCache() {
+        return timelineMetricsHolder.extractMetricsForAggregationPublishing();
+    }
+
+    /**
+     * Aggregates given metrics and converts them into json string that will be send to collector
+     * @param metricForAggregationValues
+     * @return
+     */
+    @Override
+    protected String processMetrics(Map<String, TimelineMetrics> metricForAggregationValues) {
+        HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
+        for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
+            for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+                if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
+                    nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
+                }
+                nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
+            }
+        }
+        Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
+        for (TimelineMetrics metrics : nameToMetricMap.values()) {
+            double sum = 0;
+            double max = Integer.MIN_VALUE;
+            double min = Integer.MAX_VALUE;
+            int count = 0;
+            for (TimelineMetric metric : metrics.getMetrics()) {
+                for (Double value : metric.getMetricValues().values()) {
+                    sum+=value;
+                    max = Math.max(max, value);
+                    min = Math.min(min, value);
+                    count++;
+                }
+            }
+            TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
+            tmpMetric.setMetricValues(new TreeMap<Long, Double>());
+            metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
+        }
+        String json = null;
+        try {
+            json = mapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
+            LOG.debug(json);
+        } catch (Exception e) {
+            LOG.error("Failed to convert result into json", e);
+        }
+
+        return json;
+    }
+
+    @Override
+    protected String getPostUrl() {
+        return BASE_POST_URL + AGGREGATED_POST_PREFIX;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
new file mode 100644
index 0000000..89addb7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.Map;
+
+public class RawMetricsPublisher extends AbstractMetricPublisher {
+    private final Log LOG;
+
+    public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
+        super(timelineMetricsHolder, configuration, interval);
+        LOG = LogFactory.getLog(this.getClass());
+    }
+
+
+    @Override
+    protected Map<String, TimelineMetrics> getMetricsFromCache() {
+        return timelineMetricsHolder.extractMetricsForRawPublishing();
+    }
+
+    @Override
+    protected String processMetrics(Map<String, TimelineMetrics> metricValues) {
+        //merge everything in one TimelineMetrics object
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        for (TimelineMetrics metrics : metricValues.values()) {
+            for (TimelineMetric timelineMetric : metrics.getMetrics())
+                timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        }
+        //map TimelineMetrics to json string
+        String json = null;
+        try {
+            json = mapper.writeValueAsString(timelineMetrics);
+            LOG.debug(json);
+        } catch (Exception e) {
+            LOG.error("Failed to convert result into json", e);
+        }
+        return json;
+    }
+
+    @Override
+    protected String getPostUrl() {
+        return BASE_POST_URL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java
new file mode 100644
index 0000000..ea72d17
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.easymock.EasyMock.createMockBuilder;
+
+
+public class AggregatorApplicationTest {
+    @Test
+    public void testMainNotEnoughArguments() {
+        try {
+            AggregatorApplication.main(new String[0]);
+            throw new Exception("Should not be thrown");
+        } catch (Exception e) {
+            //expected
+        }
+        try {
+            AggregatorApplication.main(new String[1]);
+            throw new Exception("Should not be thrown");
+        } catch (Exception e) {
+            //expected
+        }
+    }
+
+    @Test
+    public void testGetURI() {
+        AggregatorApplication aggregatorApplicationMock = createMockBuilder(AggregatorApplication.class)
+                .withConstructor("", "")
+                .addMockedMethod("createHttpServer")
+                .addMockedMethod("initConfiguration").createMock();
+
+        URI uri = aggregatorApplicationMock.getURI();
+        Assert.assertEquals("http://" + aggregatorApplicationMock.getHostName() + ":61888/", uri.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java
new file mode 100644
index 0000000..736fd06
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.test.framework.JerseyTest;
+import com.sun.jersey.test.framework.WebAppDescriptor;
+import com.sun.jersey.test.framework.spi.container.TestContainerFactory;
+import com.sun.jersey.test.framework.spi.container.grizzly2.GrizzlyTestContainerFactory;
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider;
+import org.junit.Test;
+
+
+import javax.ws.rs.core.MediaType;
+
+import java.util.Collection;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class AggregatorWebServiceTest extends JerseyTest {
+    public AggregatorWebServiceTest() {
+        super(new WebAppDescriptor.Builder(
+                "org.apache.hadoop.metrics2.host.aggregator")
+                .contextPath("jersey-guice-filter")
+                .servletPath("/")
+                .clientConfig(new DefaultClientConfig(JacksonJaxbJsonProvider.class))
+                .build());
+    }
+
+    @Override
+    public TestContainerFactory getTestContainerFactory() {
+        return new GrizzlyTestContainerFactory();
+    }
+
+    @Test
+    public void testOkResponse() {
+        WebResource r = resource();
+        ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics")
+                .accept("text/json")
+                .get(ClientResponse.class);
+        assertEquals(200, response.getStatus());
+        assertEquals("text/json", response.getType().toString());
+    }
+
+    @Test
+    public void testWrongPath() {
+        WebResource r = resource();
+        ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics").path("aggregated")
+                .accept("text/json")
+                .get(ClientResponse.class);
+        assertEquals(404, response.getStatus());
+    }
+
+
+    @Test
+    public void testMetricsPost() {
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        TimelineMetrics timelineMetrics = TimelineMetricsHolderTest.getTimelineMetricsWithAppID("appid");
+        WebResource r = resource();
+        ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics")
+                .accept(MediaType.TEXT_PLAIN)
+                .post(ClientResponse.class, timelineMetrics);
+        assertEquals(200, response.getStatus());
+        assertEquals(MediaType.TEXT_PLAIN, response.getType().toString());
+
+        Map<String, TimelineMetrics> aggregationMap =  timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        Map<String, TimelineMetrics> rawMap =  timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        Assert.assertEquals(1, aggregationMap.size());
+        Assert.assertEquals(1, rawMap.size());
+
+        Collection<TimelineMetrics> aggregationCollection = aggregationMap.values();
+        Collection<TimelineMetrics> rawCollection = rawMap.values();
+
+        Collection<String> aggregationCollectionKeys = aggregationMap.keySet();
+        Collection<String> rawCollectionKeys = rawMap.keySet();
+
+        for (String key : aggregationCollectionKeys) {
+            Assert.assertTrue(key.contains("appid"));
+        }
+
+        for (String key : rawCollectionKeys) {
+            Assert.assertTrue(key.contains("appid"));
+        }
+
+        Assert.assertEquals(1, aggregationCollection.size());
+        Assert.assertEquals(1, rawCollection.size());
+
+        TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0];
+        TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0];
+
+
+        Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size());
+        Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size());
+
+        Assert.assertEquals("appid", aggregationTimelineMetrics.getMetrics().get(0).getAppId());
+        Assert.assertEquals("appid", rawTimelineMetrics.getMetrics().get(0).getAppId());
+
+        aggregationMap =  timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        rawMap =  timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        //Cache should be empty after extraction
+        Assert.assertEquals(0, aggregationMap.size());
+        Assert.assertEquals(0, rawMap.size());
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java
new file mode 100644
index 0000000..7d8ebf4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Map;
+
+
+public class TimelineMetricsHolderTest {
+    private TimelineMetricsHolder timelineMetricsHolderInstance;
+
+    public void clearHolderSingleton() throws NoSuchFieldException, IllegalAccessException {
+        Class timelineMetricHolderClass = TimelineMetricsHolder.class;
+        Field field = timelineMetricHolderClass.getDeclaredField("instance");
+        field.setAccessible(true);
+        field.set(field, null);
+    }
+
+    @Test
+    public void testGetInstanceDefaultValues() throws Exception {
+        clearHolderSingleton();
+        Assert.assertNotNull(TimelineMetricsHolder.getInstance());
+    }
+
+    @Test
+    public void testGetInstanceWithParameters() throws Exception {
+        clearHolderSingleton();
+        Assert.assertNotNull(TimelineMetricsHolder.getInstance(1,2));
+    }
+
+    @Test
+    public void testCache() throws Exception {
+        clearHolderSingleton();
+        timelineMetricsHolderInstance = TimelineMetricsHolder.getInstance(4,4);
+        timelineMetricsHolderInstance.putMetricsForAggregationPublishing(getTimelineMetricsWithAppID("aggr"));
+        timelineMetricsHolderInstance.putMetricsForRawPublishing(getTimelineMetricsWithAppID("raw"));
+
+        Map<String, TimelineMetrics> aggregationMap =  timelineMetricsHolderInstance.extractMetricsForAggregationPublishing();
+        Map<String, TimelineMetrics> rawMap =  timelineMetricsHolderInstance.extractMetricsForRawPublishing();
+
+        Assert.assertEquals(1, aggregationMap.size());
+        Assert.assertEquals(1, rawMap.size());
+
+        Collection<TimelineMetrics> aggregationCollection = aggregationMap.values();
+        Collection<TimelineMetrics> rawCollection = rawMap.values();
+
+        Collection<String> aggregationCollectionKeys = aggregationMap.keySet();
+        Collection<String> rawCollectionKeys = rawMap.keySet();
+
+        for (String key : aggregationCollectionKeys) {
+            Assert.assertTrue(key.contains("aggr"));
+        }
+
+        for (String key : rawCollectionKeys) {
+            Assert.assertTrue(key.contains("raw"));
+        }
+
+        Assert.assertEquals(1, aggregationCollection.size());
+        Assert.assertEquals(1, rawCollection.size());
+
+        TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0];
+        TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0];
+
+
+        Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size());
+        Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size());
+
+        Assert.assertEquals("aggr", aggregationTimelineMetrics.getMetrics().get(0).getAppId());
+        Assert.assertEquals("raw", rawTimelineMetrics.getMetrics().get(0).getAppId());
+
+        aggregationMap =  timelineMetricsHolderInstance.extractMetricsForAggregationPublishing();
+        rawMap =  timelineMetricsHolderInstance.extractMetricsForRawPublishing();
+
+        //Cache should be empty after extraction
+        Assert.assertEquals(0, aggregationMap.size());
+        Assert.assertEquals(0, rawMap.size());
+    }
+
+    public static TimelineMetrics getTimelineMetricsWithAppID(String appId) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setAppId(appId);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        return timelineMetrics;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java
new file mode 100644
index 0000000..a8ddbee
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class AbstractMetricPublisherTest {
+    @Test
+    public void testProcessAndPublishMetrics() throws Exception {
+        AbstractMetricPublisher publisherMock =
+                createMockBuilder(RawMetricsPublisher.class)
+                        .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 60)
+                        .addMockedMethod("processMetrics")
+                        .addMockedMethod("getCollectorUri")
+                        .addMockedMethod("emitMetricsJson")
+                        .addMockedMethod("getCurrentCollectorHost").createStrictMock();
+
+        TimelineMetricsHolder.getInstance().putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
+        expect(publisherMock.getCurrentCollectorHost()).andReturn("collectorhost").once();
+        expect(publisherMock.getCollectorUri(anyString())).andReturn("https://collectorhost:11/metrics").once();
+        expect(publisherMock.processMetrics(anyObject(Map.class))).andReturn("{metrics}").once();
+        expect(publisherMock.emitMetricsJson("https://collectorhost:11/metrics", "{metrics}")).andReturn(true).once();
+
+        replay(publisherMock);
+
+        publisherMock.processAndPublishMetrics(TimelineMetricsHolder.getInstance().extractMetricsForRawPublishing());
+
+        verify(publisherMock);
+    }
+
+    @Test
+    public void testRunAndStop() throws Exception {
+        AbstractMetricPublisher publisherMock = createMockBuilder(RawMetricsPublisher.class)
+                .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 1)
+                .addMockedMethod("processAndPublishMetrics").createStrictMock();
+        publisherMock.processAndPublishMetrics(anyObject(Map.class));
+        expectLastCall().times(1);
+
+
+        Thread t = createMockBuilder(Thread.class)
+                .withConstructor(publisherMock)
+                .addMockedMethod("isInterrupted").createStrictMock();
+        expect(t.isInterrupted()).andReturn(false).once();
+        expect(t.isInterrupted()).andReturn(true).once();
+
+        replay(publisherMock, t);
+
+        t.start();
+
+        Thread.sleep(2222);
+
+        verify(publisherMock, t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
new file mode 100644
index 0000000..3413052
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class AggregatedMetricsPublisherTest {
+
+    @Test
+    public void testProcessMetrics() throws Exception {
+        Configuration configuration = new Configuration();
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
+        metric1App1Metrics.put(1L, 1d);
+        metric1App1Metrics.put(2L, 2d);
+        metric1App1Metrics.put(3L, 3d);
+        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
+
+        TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
+        metric2App2Metrics.put(1L, 4d);
+        metric2App2Metrics.put(2L, 5d);
+        metric2App2Metrics.put(3L, 6d);
+        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
+
+        TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
+        metric3App3Metrics.put(1L, 7d);
+        metric3App3Metrics.put(2L, 8d);
+        metric3App3Metrics.put(3L, 9d);
+
+        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
+
+
+        AggregatedMetricsPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
+
+        String aggregatedJson = aggregatedMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForAggregationPublishing());
+        String expectedMetric1App1 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}";
+        String expectedMetric2App2 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}";
+        String expectedMetric3App3 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}";
+        Assert.assertNotNull(aggregatedJson);
+        Assert.assertTrue(aggregatedJson.contains(expectedMetric1App1));
+        Assert.assertTrue(aggregatedJson.contains(expectedMetric3App3));
+        Assert.assertTrue(aggregatedJson.contains(expectedMetric2App2));
+    }
+
+    @Test
+    public void testGetPostUrl() {
+        Configuration configuration = new Configuration();
+        AggregatedMetricsPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = aggregatedMetricsPublisher.getPostUrl();
+        String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetCollectorUri() {
+        //default configuration
+        Configuration configuration = new Configuration();
+        AbstractMetricPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = aggregatedMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
+        String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //https configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
+        aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = aggregatedMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
+        expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //custom port configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
+        aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = aggregatedMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
+        expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetMetricsFromCache() throws InterruptedException {
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4);
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr1"));
+        timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
+        timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr2"));
+
+        Configuration configuration = new Configuration();
+        AggregatedMetricsPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+
+        Map<String, TimelineMetrics> metricsFromCache = aggregatedMetricsPublisher.getMetricsFromCache();
+        Assert.assertNotNull(metricsFromCache);
+        Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
+        Assert.assertNotNull(actualTimelineMetrics);
+        Assert.assertEquals(2, actualTimelineMetrics.size());
+
+        for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
+            List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+            Assert.assertEquals(1, metrics.size());
+            Assert.assertTrue(metrics.get(0).getAppId().contains("aggr"));
+        }
+
+    }
+
+    TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metricName);
+        timelineMetric.setAppId(appId);
+        timelineMetric.setMetricValues(metricValues);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        return timelineMetrics;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
new file mode 100644
index 0000000..60510d2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public class RawMetricsPublisherTest {
+    @Test
+    public void testProcessMetrics() throws Exception {
+        Configuration configuration = new Configuration();
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
+        metric1App1Metrics.put(1L, 1d);
+        metric1App1Metrics.put(2L, 2d);
+        metric1App1Metrics.put(3L, 3d);
+        timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
+
+        TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
+        metric2App2Metrics.put(1L, 4d);
+        metric2App2Metrics.put(2L, 5d);
+        metric2App2Metrics.put(3L, 6d);
+        timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
+
+        TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
+        metric3App3Metrics.put(1L, 7d);
+        metric3App3Metrics.put(2L, 8d);
+        metric3App3Metrics.put(3L, 9d);
+
+        timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
+
+
+        RawMetricsPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
+
+        String rawJson = rawMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForRawPublishing());
+        String expectedResult = "{\"metrics\":[{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}";
+        Assert.assertNotNull(rawJson);
+        Assert.assertEquals(expectedResult, rawJson);
+    }
+
+    @Test
+    public void testGetPostUrl() {
+        Configuration configuration = new Configuration();
+        RawMetricsPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = rawMetricsPublisher.getPostUrl();
+        String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetCollectorUri() {
+        //default configuration
+        Configuration configuration = new Configuration();
+        AbstractMetricPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = rawMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
+        String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //https configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
+        rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = rawMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
+        expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //custom port configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
+        rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = rawMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
+        expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetMetricsFromCache() throws InterruptedException {
+
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4);
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw1"));
+        timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr"));
+        timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw2"));
+
+        Configuration configuration = new Configuration();
+        RawMetricsPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+
+        Map<String, TimelineMetrics> metricsFromCache = rawMetricsPublisher.getMetricsFromCache();
+        Assert.assertNotNull(metricsFromCache);
+        Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
+        Assert.assertNotNull(actualTimelineMetrics);
+        Assert.assertEquals(2, actualTimelineMetrics.size());
+
+        for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
+            List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+            Assert.assertEquals(1, metrics.size());
+            Assert.assertTrue(metrics.get(0).getAppId().contains("raw"));
+        }
+
+    }
+
+    TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metricName);
+        timelineMetric.setAppId(appId);
+        timelineMetric.setMetricValues(metricValues);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        return timelineMetrics;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
index 2249e53..ba05e9b 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
@@ -42,9 +42,10 @@ class Aggregator(threading.Thread):
     ams_log_file = "ambari-metrics-aggregator.log"
     additional_classpath = ':{0}'.format(config_dir)
     ams_log_dir = self._config.ams_monitor_log_dir()
+    hostname = self._config.get_hostname_config()
     logger.info('Starting Aggregator thread.')
-    cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6}"\
-      .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, collector_hosts)
+    cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6} {7}"\
+      .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, hostname, collector_hosts)
 
     logger.info("Executing : {0}".format(cmd))
 
@@ -60,6 +61,7 @@ class Aggregator(threading.Thread):
     if self._aggregator_process :
       logger.info('Stopping Aggregator thread.')
       self._aggregator_process.terminate()
+      self._aggregator_process = None
 
 class AggregatorWatchdog(threading.Thread):
   SLEEP_TIME = 30