You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2017/06/09 11:36:21 UTC
[2/2] ambari git commit: AMBARI-21128 Add AMS HA support to local
metrics aggregator application (dsen)
AMBARI-21128 Add AMS HA support to local metrics aggregator application (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/29f75089
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/29f75089
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/29f75089
Branch: refs/heads/trunk
Commit: 29f750894754ed2112fcedaa0b2f5ec693b5cd0e
Parents: 190ecad
Author: Dmytro Sen <ds...@apache.org>
Authored: Fri Jun 9 14:36:11 2017 +0300
Committer: Dmytro Sen <ds...@apache.org>
Committed: Fri Jun 9 14:36:11 2017 +0300
----------------------------------------------------------------------
.../timeline/AbstractTimelineMetricsSink.java | 4 +-
.../ambari-metrics-host-aggregator/pom.xml | 30 +++-
.../AbstractMetricPublisherThread.java | 134 ---------------
.../aggregator/AggregatedMetricsPublisher.java | 101 -----------
.../host/aggregator/AggregatorApplication.java | 98 +++++++----
.../host/aggregator/AggregatorWebService.java | 2 +-
.../host/aggregator/RawMetricsPublisher.java | 60 -------
.../host/aggregator/TimelineMetricsHolder.java | 26 ++-
.../sink/timeline/AbstractMetricPublisher.java | 169 +++++++++++++++++++
.../timeline/AggregatedMetricsPublisher.java | 103 +++++++++++
.../sink/timeline/RawMetricsPublisher.java | 65 +++++++
.../aggregator/AggregatorApplicationTest.java | 55 ++++++
.../aggregator/AggregatorWebServiceTest.java | 135 +++++++++++++++
.../aggregator/TimelineMetricsHolderTest.java | 107 ++++++++++++
.../timeline/AbstractMetricPublisherTest.java | 82 +++++++++
.../AggregatedMetricsPublisherTest.java | 154 +++++++++++++++++
.../sink/timeline/RawMetricsPublisherTest.java | 151 +++++++++++++++++
.../src/main/python/core/aggregator.py | 6 +-
.../src/main/python/core/controller.py | 2 +-
19 files changed, 1133 insertions(+), 351 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index fddf4b3..644d978 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -88,7 +88,7 @@ public abstract class AbstractTimelineMetricsSink {
private static final String WWW_AUTHENTICATE = "WWW-Authenticate";
private static final String NEGOTIATE = "Negotiate";
- protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
+ protected final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
protected static final AtomicInteger nullCollectorCounter = new AtomicInteger(0);
public static int NUMBER_OF_NULL_COLLECTOR_EXCEPTIONS = 20;
@@ -120,7 +120,7 @@ public abstract class AbstractTimelineMetricsSink {
private volatile boolean isInitializedForHA = false;
@SuppressWarnings("all")
- private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 5;
+ private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 3;
private final Gson gson = new Gson();
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
index 0598bef..24432dd 100644
--- a/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
+++ b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
@@ -38,12 +38,6 @@
<dependencies>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>3.8.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
@@ -83,6 +77,30 @@
<artifactId>hadoop-common</artifactId>
<version>2.7.1.2.3.4.0-3347</version>
</dependency>
+ <dependency>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ <version>1.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ <version>1.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>3.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
deleted file mode 100644
index b1f60fa..0000000
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.host.aggregator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Map;
-
-/**
- * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
- */
-public abstract class AbstractMetricPublisherThread extends Thread {
- protected int publishIntervalInSeconds;
- protected String publishURL;
- protected ObjectMapper objectMapper;
- private Log LOG;
- protected TimelineMetricsHolder timelineMetricsHolder;
-
- public AbstractMetricPublisherThread(TimelineMetricsHolder timelineMetricsHolder, String publishURL, int publishIntervalInSeconds) {
- LOG = LogFactory.getLog(this.getClass());
- this.publishURL = publishURL;
- this.publishIntervalInSeconds = publishIntervalInSeconds;
- this.timelineMetricsHolder = timelineMetricsHolder;
- objectMapper = new ObjectMapper();
- AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
- objectMapper.setAnnotationIntrospector(introspector);
- objectMapper.getSerializationConfig()
- .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
- }
-
- /**
- * Publishes metrics to collector in specified intervals while not interrupted.
- */
- @Override
- public void run() {
- while (!isInterrupted()) {
- try {
- sleep(this.publishIntervalInSeconds * 1000);
- } catch (InterruptedException e) {
- //Ignore
- }
- try {
- processAndPublishMetrics(getMetricsFromCache());
- } catch (Exception e) {
- LOG.error("Couldn't process and send metrics : ",e);
- }
- }
- }
-
- /**
- * Processes and sends metrics to collector.
- * @param metricsFromCache
- * @throws Exception
- */
- protected void processAndPublishMetrics(Map<Long, TimelineMetrics> metricsFromCache) throws Exception {
- if (metricsFromCache.size()==0) return;
-
- LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
- publishMetricsJson(processMetrics(metricsFromCache));
- }
-
- /**
- * Returns metrics map. Source is based on implementation.
- * @return
- */
- protected abstract Map<Long,TimelineMetrics> getMetricsFromCache();
-
- /**
- * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
- * @param metricValues
- * @return
- */
- protected abstract String processMetrics(Map<Long, TimelineMetrics> metricValues);
-
- protected void publishMetricsJson(String jsonData) throws Exception {
- int timeout = 5 * 1000;
- HttpURLConnection connection = null;
- if (this.publishURL == null) {
- throw new IOException("Unknown URL. Unable to connect to metrics collector.");
- }
- LOG.info("Collector URL : " + publishURL);
- connection = (HttpURLConnection) new URL(this.publishURL).openConnection();
-
- connection.setRequestMethod("POST");
- connection.setRequestProperty("Content-Type", "application/json");
- connection.setRequestProperty("Connection", "Keep-Alive");
- connection.setConnectTimeout(timeout);
- connection.setReadTimeout(timeout);
- connection.setDoOutput(true);
-
- if (jsonData != null) {
- try (OutputStream os = connection.getOutputStream()) {
- os.write(jsonData.getBytes("UTF-8"));
- }
- }
- int responseCode = connection.getResponseCode();
- if (responseCode != 200) {
- throw new Exception("responseCode is " + responseCode);
- }
- LOG.info("Successfully sent metrics.");
- }
-
- /**
- * Interrupts the thread.
- */
- protected void stopPublisher() {
- this.interrupt();
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
deleted file mode 100644
index 0540ec9..0000000
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.host.aggregator;
-
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-/**
- * Thread that aggregates and publishes metrics to collector on specified interval.
- */
-public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
-
- private Log LOG;
-
- public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
- super(timelineMetricsHolder, collectorURL, interval);
- LOG = LogFactory.getLog(this.getClass());
- }
-
- /**
- * get metrics map form @TimelineMetricsHolder
- * @return
- */
- @Override
- protected Map<Long, TimelineMetrics> getMetricsFromCache() {
- return timelineMetricsHolder.extractMetricsForAggregationPublishing();
- }
-
- /**
- * Aggregates given metrics and converts them into json string that will be send to collector
- * @param metricForAggregationValues
- * @return
- */
- @Override
- protected String processMetrics(Map<Long, TimelineMetrics> metricForAggregationValues) {
- HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
- for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
- for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
- if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
- nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
- }
- nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
- }
- }
- Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
- for (TimelineMetrics metrics : nameToMetricMap.values()) {
- double sum = 0;
- double max = Integer.MIN_VALUE;
- double min = Integer.MAX_VALUE;
- int count = 0;
- for (TimelineMetric metric : metrics.getMetrics()) {
- for (Double value : metric.getMetricValues().values()) {
- sum+=value;
- max = Math.max(max, value);
- min = Math.min(min, value);
- count++;
- }
- }
- TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
- tmpMetric.setMetricValues(new TreeMap<Long, Double>());
- metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
- }
- String json = null;
- try {
- json = objectMapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
- LOG.debug(json);
- } catch (Exception e) {
- LOG.error("Failed to convert result into json", e);
- }
-
- return json;
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
index c6b703b..1e5cc82 100644
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
@@ -33,6 +33,9 @@ import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher;
+import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher;
+import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher;
/**
* WEB application with 2 publisher threads that processes received metrics and submits results to the collector
@@ -40,24 +43,25 @@ import org.apache.hadoop.conf.Configuration;
public class AggregatorApplication
{
private static final int STOP_SECONDS_DELAY = 0;
- private static final int JOIN_SECONDS_TIMEOUT = 2;
- private static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
- private static String AGGREGATED_POST_PREFIX = "/aggregated";
+ private static final int JOIN_SECONDS_TIMEOUT = 5;
private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
- private static Log LOG = LogFactory.getLog("AggregatorApplication.class");
+ private Log LOG;
private final int webApplicationPort;
private final int rawPublishingInterval;
private final int aggregationInterval;
private Configuration configuration;
- private String [] collectorHosts;
- private AggregatedMetricsPublisher aggregatePublisher;
- private RawMetricsPublisher rawPublisher;
+ private Thread aggregatePublisherThread;
+ private Thread rawPublisherThread;
private TimelineMetricsHolder timelineMetricsHolder;
private HttpServer httpServer;
- public AggregatorApplication(String collectorHosts) {
+ public AggregatorApplication(String hostname, String collectorHosts) {
+ LOG = LogFactory.getLog(this.getClass());
+ configuration = new Configuration(true);
initConfiguration();
- this.collectorHosts = collectorHosts.split(",");
+ configuration.set("timeline.metrics.collector.hosts", collectorHosts);
+ configuration.set("timeline.metrics.hostname", hostname);
+ configuration.set("timeline.metrics.zk.quorum", getZkQuorumFromConfiguration());
this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
@@ -70,7 +74,13 @@ public class AggregatorApplication
}
}
- private void initConfiguration() {
+ private String getZkQuorumFromConfiguration() {
+ String zkClientPort = configuration.getTrimmed("cluster.zookeeper.property.clientPort", "2181");
+ String zkServerHosts = configuration.getTrimmed("cluster.zookeeper.quorum", "");
+ return getZkConnectionUrl(zkClientPort, zkServerHosts);
+ }
+
+ protected void initConfiguration() {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
classLoader = getClass().getClassLoader();
@@ -82,7 +92,7 @@ public class AggregatorApplication
throw new IllegalStateException("Unable to initialize the metrics " +
"subsystem. No ams-site present in the classpath.");
}
- configuration = new Configuration(true);
+
try {
configuration.addResource(amsResUrl.toURI().toURL());
} catch (Exception e) {
@@ -91,7 +101,7 @@ public class AggregatorApplication
}
}
- private String getHostName() {
+ protected String getHostName() {
String hostName = "localhost";
try {
hostName = InetAddress.getLocalHost().getCanonicalHostName();
@@ -101,13 +111,13 @@ public class AggregatorApplication
return hostName;
}
- private URI getURI() {
+ protected URI getURI() {
URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
LOG.info(String.format("Web server at %s", uri));
return uri;
}
- private HttpServer createHttpServer() throws IOException {
+ protected HttpServer createHttpServer() throws IOException {
ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
HashMap<String, Object> params = new HashMap();
params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
@@ -122,29 +132,30 @@ public class AggregatorApplication
private void startAggregatePublisherThread() {
LOG.info("Starting aggregated metrics publisher.");
- String collectorURL = buildBasicCollectorURL(collectorHosts[0]) + AGGREGATED_POST_PREFIX;
- aggregatePublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, collectorURL, aggregationInterval);
- aggregatePublisher.start();
+ AbstractMetricPublisher metricPublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, configuration, aggregationInterval);
+ aggregatePublisherThread = new Thread(metricPublisher);
+ aggregatePublisherThread.start();
}
private void startRawPublisherThread() {
LOG.info("Starting raw metrics publisher.");
- String collectorURL = buildBasicCollectorURL(collectorHosts[0]);
- rawPublisher = new RawMetricsPublisher(timelineMetricsHolder, collectorURL, rawPublishingInterval);
- rawPublisher.start();
+ AbstractMetricPublisher metricPublisher = new RawMetricsPublisher(timelineMetricsHolder, configuration, rawPublishingInterval);
+ rawPublisherThread = aggregatePublisherThread = new Thread(metricPublisher);
+ aggregatePublisherThread.start();
}
private void stop() {
- aggregatePublisher.stopPublisher();
- rawPublisher.stopPublisher();
+ LOG.info("Stopping aggregator application");
+ aggregatePublisherThread.interrupt();
+ rawPublisherThread.interrupt();
httpServer.stop(STOP_SECONDS_DELAY);
LOG.info("Stopped web server.");
try {
LOG.info("Waiting for threads to join.");
- aggregatePublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
- rawPublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+ aggregatePublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
+ rawPublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
LOG.info("Gracefully stopped Aggregator Application.");
} catch (InterruptedException e) {
LOG.error("Received exception during stop : ", e);
@@ -153,28 +164,43 @@ public class AggregatorApplication
}
- private String buildBasicCollectorURL(String host) {
- String port = configuration.get("timeline.metrics.service.webapp.address", "0.0.0.0:6188").split(":")[1];
- String protocol = configuration.get("timeline.metrics.service.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
- return String.format(BASE_POST_URL, protocol, host, port);
+ private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
+ StringBuilder sb = new StringBuilder();
+ String[] quorumParts = zkQuorum.split(",");
+ String prefix = "";
+ for (String part : quorumParts) {
+ sb.append(prefix);
+ sb.append(part.trim());
+ if (!part.contains(":")) {
+ sb.append(":");
+ sb.append(zkClientPort);
+ }
+ prefix = ",";
+ }
+ return sb.toString();
}
public static void main( String[] args ) throws Exception {
- LOG.info("Starting aggregator application");
- if (args.length != 1) {
- throw new Exception("This jar should be run with 1 argument - collector hosts separated with coma");
+ if (args.length != 2) {
+ throw new Exception("This jar should be executed with 2 arguments : 1st - current host name, " +
+ "2nd - collector hosts separated with coma");
}
- final AggregatorApplication app = new AggregatorApplication(args[0]);
- app.startAggregatePublisherThread();
- app.startRawPublisherThread();
- app.startWebServer();
+ final AggregatorApplication app = new AggregatorApplication(args[0], args[1]);
+
+ app.startWebServerAndPublishersThreads();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
- LOG.info("Stopping aggregator application");
app.stop();
}
});
}
+
+ private void startWebServerAndPublishersThreads() {
+ LOG.info("Starting aggregator application");
+ startAggregatePublisherThread();
+ startRawPublisherThread();
+ startWebServer();
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
index f96d0ed..b151209 100644
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
@@ -39,7 +39,7 @@ public class AggregatorWebService {
@GET
@Produces("text/json")
@Path("/metrics")
- public Response helloWorld() throws IOException {
+ public Response getOkResponse() throws IOException {
return Response.ok().build();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
deleted file mode 100644
index f317ed9..0000000
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.host.aggregator;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-
-import java.util.Map;
-
-public class RawMetricsPublisher extends AbstractMetricPublisherThread {
- private final Log LOG;
-
- public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
- super(timelineMetricsHolder, collectorURL, interval);
- LOG = LogFactory.getLog(this.getClass());
- }
-
-
- @Override
- protected Map<Long, TimelineMetrics> getMetricsFromCache() {
- return timelineMetricsHolder.extractMetricsForRawPublishing();
- }
-
- @Override
- protected String processMetrics(Map<Long, TimelineMetrics> metricValues) {
- //merge everything in one TimelineMetrics object
- TimelineMetrics timelineMetrics = new TimelineMetrics();
- for (TimelineMetrics metrics : metricValues.values()) {
- for (TimelineMetric timelineMetric : metrics.getMetrics())
- timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
- }
- //map TimelineMetrics to json string
- String json = null;
- try {
- json = objectMapper.writeValueAsString(timelineMetrics);
- LOG.debug(json);
- } catch (Exception e) {
- LOG.error("Failed to convert result into json", e);
- }
- return json;
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
index b355c97..03b6542 100644
--- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.metrics2.host.aggregator;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
@@ -33,8 +35,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TimelineMetricsHolder {
private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
- private Cache<Long, TimelineMetrics> aggregationMetricsCache;
- private Cache<Long, TimelineMetrics> rawMetricsCache;
+ private Cache<String, TimelineMetrics> aggregationMetricsCache;
+ private Cache<String, TimelineMetrics> rawMetricsCache;
private static TimelineMetricsHolder instance = null;
//to ensure no metric values are expired
private static int EXPIRE_DELAY = 30;
@@ -63,21 +65,29 @@ public class TimelineMetricsHolder {
public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
aggregationCacheLock.writeLock().lock();
- aggregationMetricsCache.put(System.currentTimeMillis(), timelineMetrics);
+ aggregationMetricsCache.put(calculateCacheKey(timelineMetrics), timelineMetrics);
aggregationCacheLock.writeLock().unlock();
}
- public Map<Long, TimelineMetrics> extractMetricsForAggregationPublishing() {
+ private String calculateCacheKey(TimelineMetrics timelineMetrics) {
+ List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+ if (metrics.size() > 0) {
+ return metrics.get(0).getAppId() + System.currentTimeMillis();
+ }
+ return String.valueOf(System.currentTimeMillis());
+ }
+
+ public Map<String, TimelineMetrics> extractMetricsForAggregationPublishing() {
return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
}
public void putMetricsForRawPublishing(TimelineMetrics metrics) {
rawCacheLock.writeLock().lock();
- rawMetricsCache.put(System.currentTimeMillis(), metrics);
+ rawMetricsCache.put(calculateCacheKey(metrics), metrics);
rawCacheLock.writeLock().unlock();
}
- public Map<Long, TimelineMetrics> extractMetricsForRawPublishing() {
+ public Map<String, TimelineMetrics> extractMetricsForRawPublishing() {
return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
}
@@ -87,9 +97,9 @@ public class TimelineMetricsHolder {
* @param lock
* @return
*/
- private Map<Long, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<Long, TimelineMetrics> cache, ReadWriteLock lock) {
+ private Map<String, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<String, TimelineMetrics> cache, ReadWriteLock lock) {
lock.writeLock().lock();
- Map<Long, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
+ Map<String, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
cache.invalidateAll();
lock.writeLock().unlock();
return metricsMap;
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
new file mode 100644
index 0000000..5af115f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
+ */
+public abstract class AbstractMetricPublisher extends AbstractTimelineMetricsSink implements Runnable {
+
+ private static final String AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
+ private static final String AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY = "ssl.server.truststore.password";
+ private static final String AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.type";
+ private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy";
+ private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address";
+ private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts";
+ private static final String PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY = "timeline.metrics.zk.quorum";
+ private static final String PUBLISHER_HOSTNAME_PROPERTY = "timeline.metrics.hostname";
+ protected static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
+ protected int publishIntervalInSeconds;
+ private Log LOG;
+ protected TimelineMetricsHolder timelineMetricsHolder;
+ protected Configuration configuration;
+ private String collectorProtocol;
+ private String collectorPort;
+ private Collection<String> collectorHosts;
+ private String hostname;
+ private String zkQuorum;
+
+ public AbstractMetricPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int publishIntervalInSeconds) {
+ LOG = LogFactory.getLog(this.getClass());
+ this.configuration = configuration;
+ this.publishIntervalInSeconds = publishIntervalInSeconds;
+ this.timelineMetricsHolder = timelineMetricsHolder;
+ configure();
+ }
+
+ protected void configure() {
+ collectorProtocol = configuration.get(AMS_SITE_HTTP_POLICY_PROPERTY, "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
+ collectorPort = configuration.getTrimmed(AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY, "0.0.0.0:6188").split(":")[1];
+ collectorHosts = parseHostsStringIntoCollection(configuration.getTrimmed(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
+ zkQuorum = configuration.get(PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY, "");
+ hostname = configuration.get(PUBLISHER_HOSTNAME_PROPERTY, "localhost");
+ collectorHosts = parseHostsStringIntoCollection(configuration.get(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
+ if (collectorHosts.isEmpty()) {
+ LOG.error("No Metric collector configured.");
+ } else {
+ if (collectorProtocol.contains("https")) {
+ String trustStorePath = configuration.get(AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY).trim();
+ String trustStoreType = configuration.get(AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY).trim();
+ String trustStorePwd = configuration.get(AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
+ loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
+ }
+ }
+ }
+
+ /**
+ * Publishes metrics to collector in specified intervals while not interrupted.
+ */
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(this.publishIntervalInSeconds * 1000);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ try {
+ processAndPublishMetrics(getMetricsFromCache());
+ } catch (Exception e) {
+ //ignore
+ }
+ }
+ }
+
+ /**
+ * Processes and sends metrics to collector.
+ * @param metricsFromCache
+ * @throws Exception
+ */
+ protected void processAndPublishMetrics(Map<String, TimelineMetrics> metricsFromCache) throws Exception {
+ if (metricsFromCache.size()==0) return;
+
+ LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
+ emitMetricsJson(getCollectorUri(getCurrentCollectorHost()), processMetrics(metricsFromCache));
+ }
+
+ /**
+ * Returns metrics map. Source is based on implementation.
+ * @return
+ */
+ protected abstract Map<String,TimelineMetrics> getMetricsFromCache();
+
+ /**
+ * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
+ * @param metricValues
+ * @return
+ */
+ protected abstract String processMetrics(Map<String, TimelineMetrics> metricValues);
+
+ protected abstract String getPostUrl();
+
+ @Override
+ protected String getCollectorUri(String host) {
+ return String.format(getPostUrl(), getCollectorProtocol(), host, getCollectorPort());
+ }
+
+ @Override
+ protected String getCollectorProtocol() {
+ return collectorProtocol;
+ }
+
+ @Override
+ protected String getCollectorPort() {
+ return collectorPort;
+ }
+
+ @Override
+ protected int getTimeoutSeconds() {
+ return DEFAULT_POST_TIMEOUT_SECONDS;
+ }
+
+ @Override
+ protected String getZookeeperQuorum() {
+ return zkQuorum;
+ }
+
+ @Override
+ protected Collection<String> getConfiguredCollectorHosts() {
+ return collectorHosts;
+ }
+
+ @Override
+ protected String getHostname() {
+ return hostname;
+ }
+
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return false;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
new file mode 100644
index 0000000..c8dffab
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Thread that aggregates and publishes metrics to collector on specified interval.
+ */
+public class AggregatedMetricsPublisher extends AbstractMetricPublisher {
+ private static String AGGREGATED_POST_PREFIX = "/aggregated";
+ private Log LOG;
+
+ public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
+ super(timelineMetricsHolder, configuration, interval);
+ LOG = LogFactory.getLog(this.getClass());
+ }
+
+ /**
+ * get metrics map form @TimelineMetricsHolder
+ * @return
+ */
+ @Override
+ protected Map<String, TimelineMetrics> getMetricsFromCache() {
+ return timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ }
+
+ /**
+ * Aggregates given metrics and converts them into json string that will be send to collector
+ * @param metricForAggregationValues
+ * @return
+ */
+ @Override
+ protected String processMetrics(Map<String, TimelineMetrics> metricForAggregationValues) {
+ HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
+ for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
+ for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+ if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
+ nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
+ }
+ nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
+ }
+ }
+ Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
+ for (TimelineMetrics metrics : nameToMetricMap.values()) {
+ double sum = 0;
+ double max = Integer.MIN_VALUE;
+ double min = Integer.MAX_VALUE;
+ int count = 0;
+ for (TimelineMetric metric : metrics.getMetrics()) {
+ for (Double value : metric.getMetricValues().values()) {
+ sum+=value;
+ max = Math.max(max, value);
+ min = Math.min(min, value);
+ count++;
+ }
+ }
+ TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
+ tmpMetric.setMetricValues(new TreeMap<Long, Double>());
+ metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
+ }
+ String json = null;
+ try {
+ json = mapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
+ LOG.debug(json);
+ } catch (Exception e) {
+ LOG.error("Failed to convert result into json", e);
+ }
+
+ return json;
+ }
+
+ @Override
+ protected String getPostUrl() {
+ return BASE_POST_URL + AGGREGATED_POST_PREFIX;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
new file mode 100644
index 0000000..89addb7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.Map;
+
+public class RawMetricsPublisher extends AbstractMetricPublisher {
+ private final Log LOG;
+
+ public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
+ super(timelineMetricsHolder, configuration, interval);
+ LOG = LogFactory.getLog(this.getClass());
+ }
+
+
+ @Override
+ protected Map<String, TimelineMetrics> getMetricsFromCache() {
+ return timelineMetricsHolder.extractMetricsForRawPublishing();
+ }
+
+ @Override
+ protected String processMetrics(Map<String, TimelineMetrics> metricValues) {
+ //merge everything in one TimelineMetrics object
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ for (TimelineMetrics metrics : metricValues.values()) {
+ for (TimelineMetric timelineMetric : metrics.getMetrics())
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ }
+ //map TimelineMetrics to json string
+ String json = null;
+ try {
+ json = mapper.writeValueAsString(timelineMetrics);
+ LOG.debug(json);
+ } catch (Exception e) {
+ LOG.error("Failed to convert result into json", e);
+ }
+ return json;
+ }
+
+ @Override
+ protected String getPostUrl() {
+ return BASE_POST_URL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java
new file mode 100644
index 0000000..ea72d17
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.easymock.EasyMock.createMockBuilder;
+
+
+public class AggregatorApplicationTest {
+ @Test
+ public void testMainNotEnoughArguments() {
+ try {
+ AggregatorApplication.main(new String[0]);
+ throw new Exception("Should not be thrown");
+ } catch (Exception e) {
+ //expected
+ }
+ try {
+ AggregatorApplication.main(new String[1]);
+ throw new Exception("Should not be thrown");
+ } catch (Exception e) {
+ //expected
+ }
+ }
+
+ @Test
+ public void testGetURI() {
+ AggregatorApplication aggregatorApplicationMock = createMockBuilder(AggregatorApplication.class)
+ .withConstructor("", "")
+ .addMockedMethod("createHttpServer")
+ .addMockedMethod("initConfiguration").createMock();
+
+ URI uri = aggregatorApplicationMock.getURI();
+ Assert.assertEquals("http://" + aggregatorApplicationMock.getHostName() + ":61888/", uri.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java
new file mode 100644
index 0000000..736fd06
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.test.framework.JerseyTest;
+import com.sun.jersey.test.framework.WebAppDescriptor;
+import com.sun.jersey.test.framework.spi.container.TestContainerFactory;
+import com.sun.jersey.test.framework.spi.container.grizzly2.GrizzlyTestContainerFactory;
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider;
+import org.junit.Test;
+
+
+import javax.ws.rs.core.MediaType;
+
+import java.util.Collection;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class AggregatorWebServiceTest extends JerseyTest {
+ public AggregatorWebServiceTest() {
+ super(new WebAppDescriptor.Builder(
+ "org.apache.hadoop.metrics2.host.aggregator")
+ .contextPath("jersey-guice-filter")
+ .servletPath("/")
+ .clientConfig(new DefaultClientConfig(JacksonJaxbJsonProvider.class))
+ .build());
+ }
+
+ @Override
+ public TestContainerFactory getTestContainerFactory() {
+ return new GrizzlyTestContainerFactory();
+ }
+
+ @Test
+ public void testOkResponse() {
+ WebResource r = resource();
+ ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics")
+ .accept("text/json")
+ .get(ClientResponse.class);
+ assertEquals(200, response.getStatus());
+ assertEquals("text/json", response.getType().toString());
+ }
+
+ @Test
+ public void testWrongPath() {
+ WebResource r = resource();
+ ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics").path("aggregated")
+ .accept("text/json")
+ .get(ClientResponse.class);
+ assertEquals(404, response.getStatus());
+ }
+
+
+ @Test
+ public void testMetricsPost() {
+ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+
+ timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ TimelineMetrics timelineMetrics = TimelineMetricsHolderTest.getTimelineMetricsWithAppID("appid");
+ WebResource r = resource();
+ ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics")
+ .accept(MediaType.TEXT_PLAIN)
+ .post(ClientResponse.class, timelineMetrics);
+ assertEquals(200, response.getStatus());
+ assertEquals(MediaType.TEXT_PLAIN, response.getType().toString());
+
+ Map<String, TimelineMetrics> aggregationMap = timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ Map<String, TimelineMetrics> rawMap = timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ Assert.assertEquals(1, aggregationMap.size());
+ Assert.assertEquals(1, rawMap.size());
+
+ Collection<TimelineMetrics> aggregationCollection = aggregationMap.values();
+ Collection<TimelineMetrics> rawCollection = rawMap.values();
+
+ Collection<String> aggregationCollectionKeys = aggregationMap.keySet();
+ Collection<String> rawCollectionKeys = rawMap.keySet();
+
+ for (String key : aggregationCollectionKeys) {
+ Assert.assertTrue(key.contains("appid"));
+ }
+
+ for (String key : rawCollectionKeys) {
+ Assert.assertTrue(key.contains("appid"));
+ }
+
+ Assert.assertEquals(1, aggregationCollection.size());
+ Assert.assertEquals(1, rawCollection.size());
+
+ TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0];
+ TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0];
+
+
+ Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size());
+ Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size());
+
+ Assert.assertEquals("appid", aggregationTimelineMetrics.getMetrics().get(0).getAppId());
+ Assert.assertEquals("appid", rawTimelineMetrics.getMetrics().get(0).getAppId());
+
+ aggregationMap = timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ rawMap = timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ //Cache should be empty after extraction
+ Assert.assertEquals(0, aggregationMap.size());
+ Assert.assertEquals(0, rawMap.size());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java
new file mode 100644
index 0000000..7d8ebf4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Map;
+
+
+public class TimelineMetricsHolderTest {
+ private TimelineMetricsHolder timelineMetricsHolderInstance;
+
+ public void clearHolderSingleton() throws NoSuchFieldException, IllegalAccessException {
+ Class timelineMetricHolderClass = TimelineMetricsHolder.class;
+ Field field = timelineMetricHolderClass.getDeclaredField("instance");
+ field.setAccessible(true);
+ field.set(field, null);
+ }
+
+ @Test
+ public void testGetInstanceDefaultValues() throws Exception {
+ clearHolderSingleton();
+ Assert.assertNotNull(TimelineMetricsHolder.getInstance());
+ }
+
+ @Test
+ public void testGetInstanceWithParameters() throws Exception {
+ clearHolderSingleton();
+ Assert.assertNotNull(TimelineMetricsHolder.getInstance(1,2));
+ }
+
+ @Test
+ public void testCache() throws Exception {
+ clearHolderSingleton();
+ timelineMetricsHolderInstance = TimelineMetricsHolder.getInstance(4,4);
+ timelineMetricsHolderInstance.putMetricsForAggregationPublishing(getTimelineMetricsWithAppID("aggr"));
+ timelineMetricsHolderInstance.putMetricsForRawPublishing(getTimelineMetricsWithAppID("raw"));
+
+ Map<String, TimelineMetrics> aggregationMap = timelineMetricsHolderInstance.extractMetricsForAggregationPublishing();
+ Map<String, TimelineMetrics> rawMap = timelineMetricsHolderInstance.extractMetricsForRawPublishing();
+
+ Assert.assertEquals(1, aggregationMap.size());
+ Assert.assertEquals(1, rawMap.size());
+
+ Collection<TimelineMetrics> aggregationCollection = aggregationMap.values();
+ Collection<TimelineMetrics> rawCollection = rawMap.values();
+
+ Collection<String> aggregationCollectionKeys = aggregationMap.keySet();
+ Collection<String> rawCollectionKeys = rawMap.keySet();
+
+ for (String key : aggregationCollectionKeys) {
+ Assert.assertTrue(key.contains("aggr"));
+ }
+
+ for (String key : rawCollectionKeys) {
+ Assert.assertTrue(key.contains("raw"));
+ }
+
+ Assert.assertEquals(1, aggregationCollection.size());
+ Assert.assertEquals(1, rawCollection.size());
+
+ TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0];
+ TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0];
+
+
+ Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size());
+ Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size());
+
+ Assert.assertEquals("aggr", aggregationTimelineMetrics.getMetrics().get(0).getAppId());
+ Assert.assertEquals("raw", rawTimelineMetrics.getMetrics().get(0).getAppId());
+
+ aggregationMap = timelineMetricsHolderInstance.extractMetricsForAggregationPublishing();
+ rawMap = timelineMetricsHolderInstance.extractMetricsForRawPublishing();
+
+ //Cache should be empty after extraction
+ Assert.assertEquals(0, aggregationMap.size());
+ Assert.assertEquals(0, rawMap.size());
+ }
+
+ public static TimelineMetrics getTimelineMetricsWithAppID(String appId) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setAppId(appId);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ return timelineMetrics;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java
new file mode 100644
index 0000000..a8ddbee
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class AbstractMetricPublisherTest {
+ @Test
+ public void testProcessAndPublishMetrics() throws Exception {
+ AbstractMetricPublisher publisherMock =
+ createMockBuilder(RawMetricsPublisher.class)
+ .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 60)
+ .addMockedMethod("processMetrics")
+ .addMockedMethod("getCollectorUri")
+ .addMockedMethod("emitMetricsJson")
+ .addMockedMethod("getCurrentCollectorHost").createStrictMock();
+
+ TimelineMetricsHolder.getInstance().putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
+ expect(publisherMock.getCurrentCollectorHost()).andReturn("collectorhost").once();
+ expect(publisherMock.getCollectorUri(anyString())).andReturn("https://collectorhost:11/metrics").once();
+ expect(publisherMock.processMetrics(anyObject(Map.class))).andReturn("{metrics}").once();
+ expect(publisherMock.emitMetricsJson("https://collectorhost:11/metrics", "{metrics}")).andReturn(true).once();
+
+ replay(publisherMock);
+
+ publisherMock.processAndPublishMetrics(TimelineMetricsHolder.getInstance().extractMetricsForRawPublishing());
+
+ verify(publisherMock);
+ }
+
+ @Test
+ public void testRunAndStop() throws Exception {
+ AbstractMetricPublisher publisherMock = createMockBuilder(RawMetricsPublisher.class)
+ .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 1)
+ .addMockedMethod("processAndPublishMetrics").createStrictMock();
+ publisherMock.processAndPublishMetrics(anyObject(Map.class));
+ expectLastCall().times(1);
+
+
+ Thread t = createMockBuilder(Thread.class)
+ .withConstructor(publisherMock)
+ .addMockedMethod("isInterrupted").createStrictMock();
+ expect(t.isInterrupted()).andReturn(false).once();
+ expect(t.isInterrupted()).andReturn(true).once();
+
+ replay(publisherMock, t);
+
+ t.start();
+
+ Thread.sleep(2222);
+
+ verify(publisherMock, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
new file mode 100644
index 0000000..3413052
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class AggregatedMetricsPublisherTest {
+
+ @Test
+ public void testProcessMetrics() throws Exception {
+ Configuration configuration = new Configuration();
+ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+ timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
+ metric1App1Metrics.put(1L, 1d);
+ metric1App1Metrics.put(2L, 2d);
+ metric1App1Metrics.put(3L, 3d);
+ timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
+
+ TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
+ metric2App2Metrics.put(1L, 4d);
+ metric2App2Metrics.put(2L, 5d);
+ metric2App2Metrics.put(3L, 6d);
+ timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
+
+ TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
+ metric3App3Metrics.put(1L, 7d);
+ metric3App3Metrics.put(2L, 8d);
+ metric3App3Metrics.put(3L, 9d);
+
+ timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
+
+
+ AggregatedMetricsPublisher aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
+
+ String aggregatedJson = aggregatedMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForAggregationPublishing());
+ String expectedMetric1App1 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}";
+ String expectedMetric2App2 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}";
+ String expectedMetric3App3 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}";
+ Assert.assertNotNull(aggregatedJson);
+ Assert.assertTrue(aggregatedJson.contains(expectedMetric1App1));
+ Assert.assertTrue(aggregatedJson.contains(expectedMetric3App3));
+ Assert.assertTrue(aggregatedJson.contains(expectedMetric2App2));
+ }
+
+ @Test
+ public void testGetPostUrl() {
+ Configuration configuration = new Configuration();
+ AggregatedMetricsPublisher aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ String actualURL = aggregatedMetricsPublisher.getPostUrl();
+ String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics/aggregated";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+ }
+
+ @Test
+ public void testGetCollectorUri() {
+ //default configuration
+ Configuration configuration = new Configuration();
+ AbstractMetricPublisher aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ String actualURL = aggregatedMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
+ String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+
+ //https configuration
+ configuration = new Configuration();
+ configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
+ aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ actualURL = aggregatedMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
+ expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+
+ //custom port configuration
+ configuration = new Configuration();
+ configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
+ aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ actualURL = aggregatedMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
+ expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics/aggregated";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+ }
+
+ @Test
+ public void testGetMetricsFromCache() throws InterruptedException {
+ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4);
+ timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr1"));
+ timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
+ timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr2"));
+
+ Configuration configuration = new Configuration();
+ AggregatedMetricsPublisher aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+
+ Map<String, TimelineMetrics> metricsFromCache = aggregatedMetricsPublisher.getMetricsFromCache();
+ Assert.assertNotNull(metricsFromCache);
+ Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
+ Assert.assertNotNull(actualTimelineMetrics);
+ Assert.assertEquals(2, actualTimelineMetrics.size());
+
+ for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
+ List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+ Assert.assertEquals(1, metrics.size());
+ Assert.assertTrue(metrics.get(0).getAppId().contains("aggr"));
+ }
+
+ }
+
+ TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metricName);
+ timelineMetric.setAppId(appId);
+ timelineMetric.setMetricValues(metricValues);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ return timelineMetrics;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
new file mode 100644
index 0000000..60510d2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public class RawMetricsPublisherTest {
+ @Test
+ public void testProcessMetrics() throws Exception {
+ Configuration configuration = new Configuration();
+ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+
+ timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
+ metric1App1Metrics.put(1L, 1d);
+ metric1App1Metrics.put(2L, 2d);
+ metric1App1Metrics.put(3L, 3d);
+ timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
+
+ TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
+ metric2App2Metrics.put(1L, 4d);
+ metric2App2Metrics.put(2L, 5d);
+ metric2App2Metrics.put(3L, 6d);
+ timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
+
+ TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
+ metric3App3Metrics.put(1L, 7d);
+ metric3App3Metrics.put(2L, 8d);
+ metric3App3Metrics.put(3L, 9d);
+
+ timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
+
+
+ RawMetricsPublisher rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
+
+ String rawJson = rawMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForRawPublishing());
+ String expectedResult = "{\"metrics\":[{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}";
+ Assert.assertNotNull(rawJson);
+ Assert.assertEquals(expectedResult, rawJson);
+ }
+
+ @Test
+ public void testGetPostUrl() {
+ Configuration configuration = new Configuration();
+ RawMetricsPublisher rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ String actualURL = rawMetricsPublisher.getPostUrl();
+ String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+ }
+
+ @Test
+ public void testGetCollectorUri() {
+ //default configuration
+ Configuration configuration = new Configuration();
+ AbstractMetricPublisher rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ String actualURL = rawMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
+ String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+
+ //https configuration
+ configuration = new Configuration();
+ configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
+ rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ actualURL = rawMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
+ expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+
+ //custom port configuration
+ configuration = new Configuration();
+ configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
+ rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ actualURL = rawMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
+ expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+ }
+
+ @Test
+ public void testGetMetricsFromCache() throws InterruptedException {
+
+ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4);
+ timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw1"));
+ timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr"));
+ timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw2"));
+
+ Configuration configuration = new Configuration();
+ RawMetricsPublisher rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+
+ Map<String, TimelineMetrics> metricsFromCache = rawMetricsPublisher.getMetricsFromCache();
+ Assert.assertNotNull(metricsFromCache);
+ Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
+ Assert.assertNotNull(actualTimelineMetrics);
+ Assert.assertEquals(2, actualTimelineMetrics.size());
+
+ for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
+ List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+ Assert.assertEquals(1, metrics.size());
+ Assert.assertTrue(metrics.get(0).getAppId().contains("raw"));
+ }
+
+ }
+
+ TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metricName);
+ timelineMetric.setAppId(appId);
+ timelineMetric.setMetricValues(metricValues);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ return timelineMetrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/29f75089/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
index 2249e53..ba05e9b 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
@@ -42,9 +42,10 @@ class Aggregator(threading.Thread):
ams_log_file = "ambari-metrics-aggregator.log"
additional_classpath = ':{0}'.format(config_dir)
ams_log_dir = self._config.ams_monitor_log_dir()
+ hostname = self._config.get_hostname_config()
logger.info('Starting Aggregator thread.')
- cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6}"\
- .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, collector_hosts)
+ cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6} {7}"\
+ .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, hostname, collector_hosts)
logger.info("Executing : {0}".format(cmd))
@@ -60,6 +61,7 @@ class Aggregator(threading.Thread):
if self._aggregator_process :
logger.info('Stopping Aggregator thread.')
self._aggregator_process.terminate()
+ self._aggregator_process = None
class AggregatorWatchdog(threading.Thread):
SLEEP_TIME = 30