You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2017/05/18 14:01:58 UTC
[17/24] ambari git commit: AMBARI-20758 Aggregate local metrics for
minute aggregation time window (dsen)
AMBARI-20758 Aggregate local metrics for minute aggregation time window (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/041d353b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/041d353b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/041d353b
Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 041d353b0d75b20b0322097e13a1701226e6fc97
Parents: 772be78
Author: Dmytro Sen <ds...@apache.org>
Authored: Wed May 17 19:38:29 2017 +0300
Committer: Dmytro Sen <ds...@apache.org>
Committed: Wed May 17 19:38:29 2017 +0300
----------------------------------------------------------------------
.../logfeeder/metrics/LogFeederAMSClient.java | 12 +-
ambari-metrics/ambari-metrics-assembly/pom.xml | 20 +++
.../src/main/assembly/monitor-windows.xml | 7 +
.../src/main/assembly/monitor.xml | 9 +-
.../timeline/AbstractTimelineMetricsSink.java | 24 ++-
.../sink/timeline/AggregationResult.java | 60 +++++++
.../metrics2/sink/timeline/MetricAggregate.java | 110 ++++++++++++
.../sink/timeline/MetricClusterAggregate.java | 73 ++++++++
.../sink/timeline/MetricHostAggregate.java | 81 +++++++++
.../metrics2/sink/timeline/TimelineMetric.java | 6 +-
.../TimelineMetricWithAggregatedValues.java | 65 +++++++
.../AbstractTimelineMetricSinkTest.java | 10 ++
.../availability/MetricCollectorHATest.java | 10 ++
.../cache/HandleConnectExceptionTest.java | 10 ++
.../sink/flume/FlumeTimelineMetricsSink.java | 16 ++
.../timeline/HadoopTimelineMetricsSink.java | 20 ++-
.../conf/unix/log4j.properties | 31 ++++
.../conf/windows/log4j.properties | 29 +++
.../ambari-metrics-host-aggregator/pom.xml | 120 +++++++++++++
.../AbstractMetricPublisherThread.java | 134 ++++++++++++++
.../aggregator/AggregatedMetricsPublisher.java | 101 +++++++++++
.../host/aggregator/AggregatorApplication.java | 180 +++++++++++++++++++
.../host/aggregator/AggregatorWebService.java | 56 ++++++
.../host/aggregator/RawMetricsPublisher.java | 60 +++++++
.../host/aggregator/TimelineMetricsHolder.java | 98 ++++++++++
.../conf/unix/ambari-metrics-monitor | 2 +-
.../src/main/python/core/aggregator.py | 110 ++++++++++++
.../src/main/python/core/config_reader.py | 35 +++-
.../src/main/python/core/controller.py | 28 +++
.../src/main/python/core/emitter.py | 8 +-
.../src/main/python/core/stop_handler.py | 3 +-
.../src/main/python/main.py | 6 +-
.../kafka/KafkaTimelineMetricsReporter.java | 17 ++
.../storm/StormTimelineMetricsReporter.java | 14 ++
.../sink/storm/StormTimelineMetricsSink.java | 14 ++
.../storm/StormTimelineMetricsReporter.java | 16 ++
.../sink/storm/StormTimelineMetricsSink.java | 16 ++
.../timeline/HBaseTimelineMetricStore.java | 29 ++-
.../metrics/timeline/PhoenixHBaseAccessor.java | 4 +-
.../timeline/TimelineMetricConfiguration.java | 2 +
.../metrics/timeline/TimelineMetricStore.java | 2 +
.../timeline/TimelineMetricsAggregatorSink.java | 4 +-
.../timeline/aggregators/MetricAggregate.java | 110 ------------
.../aggregators/MetricClusterAggregate.java | 73 --------
.../aggregators/MetricHostAggregate.java | 81 ---------
.../TimelineMetricAppAggregator.java | 1 +
.../TimelineMetricClusterAggregator.java | 2 +
.../TimelineMetricClusterAggregatorSecond.java | 1 +
.../TimelineMetricHostAggregator.java | 1 +
.../aggregators/TimelineMetricReadHelper.java | 2 +
.../webapp/TimelineWebServices.java | 31 ++++
.../timeline/ITPhoenixHBaseAccessor.java | 4 +-
.../metrics/timeline/MetricTestHelper.java | 2 +-
.../timeline/PhoenixHBaseAccessorTest.java | 4 +-
.../timeline/TestMetricHostAggregate.java | 8 +-
.../timeline/TestTimelineMetricStore.java | 6 +
.../TimelineMetricsAggregatorMemorySink.java | 4 +-
.../aggregators/ITClusterAggregator.java | 4 +-
.../aggregators/ITMetricAggregator.java | 13 +-
...melineMetricClusterAggregatorSecondTest.java | 1 +
ambari-metrics/pom.xml | 1 +
.../system/impl/AmbariMetricSinkImpl.java | 10 ++
.../1.6.1.2.2.0/package/scripts/params.py | 2 +
.../hadoop-metrics2-accumulo.properties.j2 | 3 +
.../0.1.0/configuration/ams-env.xml | 8 +
.../0.1.0/configuration/ams-site.xml | 11 ++
.../AMBARI_METRICS/0.1.0/metainfo.xml | 3 +
.../AMBARI_METRICS/0.1.0/package/scripts/ams.py | 30 ++++
.../0.1.0/package/scripts/params.py | 5 +
.../hadoop-metrics2-hbase.properties.j2 | 3 +
.../package/templates/metric_monitor.ini.j2 | 7 +
.../FLUME/1.4.0.2.0/package/scripts/params.py | 3 +
.../templates/flume-metrics2.properties.j2 | 2 +
.../0.96.0.2.0/package/scripts/params_linux.py | 3 +
...-metrics2-hbase.properties-GANGLIA-MASTER.j2 | 2 +
...doop-metrics2-hbase.properties-GANGLIA-RS.j2 | 2 +
.../hadoop-metrics2.properties.xml | 2 +
.../0.12.0.2.0/package/scripts/params_linux.py | 2 +
.../hadoop-metrics2-hivemetastore.properties.j2 | 2 +
.../hadoop-metrics2-hiveserver2.properties.j2 | 2 +
.../templates/hadoop-metrics2-llapdaemon.j2 | 2 +
.../hadoop-metrics2-llaptaskscheduler.j2 | 2 +
.../2.1.0.3.0/package/scripts/params_linux.py | 3 +
.../hadoop-metrics2-hivemetastore.properties.j2 | 2 +
.../hadoop-metrics2-hiveserver2.properties.j2 | 2 +
.../templates/hadoop-metrics2-llapdaemon.j2 | 2 +
.../hadoop-metrics2-llaptaskscheduler.j2 | 2 +
.../KAFKA/0.8.1/configuration/kafka-broker.xml | 11 ++
.../KAFKA/0.8.1/package/scripts/params.py | 3 +
.../STORM/0.9.1/package/scripts/params_linux.py | 2 +
.../0.9.1/package/templates/config.yaml.j2 | 2 +
.../templates/storm-metrics2.properties.j2 | 2 +
.../2.0.6/hooks/before-START/scripts/params.py | 3 +
.../templates/hadoop-metrics2.properties.j2 | 2 +
.../hadoop-metrics2.properties.xml | 2 +
.../3.0/hooks/before-START/scripts/params.py | 2 +
.../templates/hadoop-metrics2.properties.j2 | 2 +
.../system/impl/TestAmbariMetricsSinkImpl.java | 10 ++
.../2.0/hooks/before-START/scripts/params.py | 2 +
99 files changed, 1854 insertions(+), 307 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
index 2d1bf40..39526a5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
@@ -89,6 +89,16 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return false;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 0;
+ }
+
+ @Override
protected boolean emitMetrics(TimelineMetrics metrics) {
return super.emitMetrics(metrics);
}
@@ -103,4 +113,4 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
return collectorPort;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/pom.xml b/ambari-metrics/ambari-metrics-assembly/pom.xml
index a4b87de..6b81de5 100644
--- a/ambari-metrics/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics/ambari-metrics-assembly/pom.xml
@@ -35,6 +35,7 @@
<properties>
<collector.dir>${project.basedir}/../ambari-metrics-timelineservice</collector.dir>
<monitor.dir>${project.basedir}/../ambari-metrics-host-monitoring</monitor.dir>
+ <aggregator.dir>${project.basedir}/../ambari-metrics-host-aggregator</aggregator.dir>
<grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
<hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
<storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
@@ -599,6 +600,19 @@
</sources>
</mapping>
<mapping>
+ <directory>/var/lib/ambari-metrics-monitor/lib</directory>
+ <sources>
+ <source>
+ <location>
+ ${aggregator.dir}/target/
+ </location>
+ <includes>
+ <include>ambari-metrics-host-aggregator-${project.version}.jar</include>
+ </includes>
+ </source>
+ </sources>
+ </mapping>
+ <mapping>
<directory>/etc/ambari-metrics-monitor/conf</directory>
<configuration>true</configuration>
</mapping>
@@ -744,6 +758,7 @@
<path>/var/run/ambari-metrics-grafana</path>
<path>/var/log/ambari-metrics-grafana</path>
<path>/var/lib/ambari-metrics-collector</path>
+ <path>/var/lib/ambari-metrics-monitor/lib</path>
<path>/var/lib/ambari-metrics-grafana</path>
<path>/usr/lib/ambari-metrics-hadoop-sink</path>
<path>/usr/lib/ambari-metrics-kafka-sink</path>
@@ -1331,6 +1346,11 @@
<type>pom</type>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-host-aggregator</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
index ab309a1..d015d31 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
@@ -64,6 +64,13 @@
</includes>
</fileSet>
<fileSet>
+ <directory>${aggregator.dir}/conf/windows</directory>
+ <outputDirectory>conf</outputDirectory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ </fileSet>
+ <fileSet>
<directory>${monitor.dir}/conf/windows</directory>
<outputDirectory>/</outputDirectory>
<includes>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
index 99a41c3..448fe62 100644
--- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
+++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
@@ -46,6 +46,13 @@
</includes>
</fileSet>
<fileSet>
+ <directory>${aggregator.dir}/conf/unix</directory>
+ <outputDirectory>conf</outputDirectory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ </fileSet>
+ <fileSet>
<directory>${monitor.dir}/conf/unix</directory>
<outputDirectory>bin</outputDirectory>
<includes>
@@ -68,4 +75,4 @@
-</assembly>
\ No newline at end of file
+</assembly>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 2c6fae2..a8dc571 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -78,6 +78,8 @@ public abstract class AbstractTimelineMetricsSink {
public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path";
public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type";
public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password";
+ public static final String HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = "host_in_memory_aggregation";
+ public static final String HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = "host_in_memory_aggregation_port";
public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
public static final String INSTANCE_ID_PROPERTY = "instanceId";
public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
@@ -241,8 +243,14 @@ public abstract class AbstractTimelineMetricsSink {
}
protected boolean emitMetrics(TimelineMetrics metrics) {
- String collectorHost = getCurrentCollectorHost();
- String connectUrl = getCollectorUri(collectorHost);
+ String connectUrl;
+ if (isHostInMemoryAggregationEnabled()) {
+ connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort()));
+ } else {
+ String collectorHost = getCurrentCollectorHost();
+ connectUrl = getCollectorUri(collectorHost);
+ }
+
String jsonData = null;
LOG.debug("EmitMetrics connectUrl = " + connectUrl);
try {
@@ -562,4 +570,16 @@ public abstract class AbstractTimelineMetricsSink {
* @return String "host1"
*/
abstract protected String getHostname();
+
+ /**
+ * Check if host in-memory aggregation is enabled
+ * @return
+ */
+ abstract protected boolean isHostInMemoryAggregationEnabled();
+
+ /**
+ * In memory aggregation port
+ * @return
+ */
+ abstract protected int getHostInMemoryAggregationPort();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
new file mode 100644
index 0000000..c903e3d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Set;
+
+@XmlRootElement(name="AggregationResult")
+public class AggregationResult {
+ protected Set<TimelineMetricWithAggregatedValues> result;
+ protected Long timeInMilis;
+
+ @Override
+ public String toString() {
+ return "AggregationResult{" +
+ "result=" + result +
+ ", timeInMilis=" + timeInMilis +
+ '}';
+ }
+
+ public AggregationResult() {
+ }
+
+ public AggregationResult(Set<TimelineMetricWithAggregatedValues> result, Long timeInMilis) {
+ this.result = result;
+ this.timeInMilis = timeInMilis;
+ }
+ @XmlElement
+ public Set<TimelineMetricWithAggregatedValues> getResult() {
+ return result;
+ }
+
+ public void setResult(Set<TimelineMetricWithAggregatedValues> result) {
+ this.result = result;
+ }
+ @XmlElement
+ public Long getTimeInMilis() {
+ return timeInMilis;
+ }
+
+ public void setTimeInMilis(Long timeInMilis) {
+ this.timeInMilis = timeInMilis;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
new file mode 100644
index 0000000..84cba0e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+ @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ protected Double sum = 0.0;
+ protected Double deviation;
+ protected Double max = Double.MIN_VALUE;
+ protected Double min = Double.MAX_VALUE;
+
+ public MetricAggregate() {
+ }
+
+ MetricAggregate(Double sum, Double deviation, Double max,
+ Double min) {
+ this.sum = sum;
+ this.deviation = deviation;
+ this.max = max;
+ this.min = min;
+ }
+
+ public void updateSum(Double sum) {
+ this.sum += sum;
+ }
+
+ public void updateMax(Double max) {
+ if (max > this.max) {
+ this.max = max;
+ }
+ }
+
+ public void updateMin(Double min) {
+ if (min < this.min) {
+ this.min = min;
+ }
+ }
+
+ @JsonProperty("sum")
+ public Double getSum() {
+ return sum;
+ }
+
+ @JsonProperty("deviation")
+ public Double getDeviation() {
+ return deviation;
+ }
+
+ @JsonProperty("max")
+ public Double getMax() {
+ return max;
+ }
+
+ @JsonProperty("min")
+ public Double getMin() {
+ return min;
+ }
+
+ public void setSum(Double sum) {
+ this.sum = sum;
+ }
+
+ public void setDeviation(Double deviation) {
+ this.deviation = deviation;
+ }
+
+ public void setMax(Double max) {
+ this.max = max;
+ }
+
+ public void setMin(Double min) {
+ this.min = min;
+ }
+
+ public String toJSON() throws IOException {
+ return mapper.writeValueAsString(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
new file mode 100644
index 0000000..7ef2c1d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+ private int numberOfHosts;
+
+ @JsonCreator
+ public MetricClusterAggregate() {
+ }
+
+ public MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ @JsonProperty("numberOfHosts")
+ public int getNumberOfHosts() {
+ return numberOfHosts;
+ }
+
+ public void updateNumberOfHosts(int count) {
+ this.numberOfHosts += count;
+ }
+
+ public void setNumberOfHosts(int numberOfHosts) {
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ public void updateAggregates(MetricClusterAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+ }
+
+ @Override
+ public String toString() {
+ return "MetricAggregate{" +
+ "sum=" + sum +
+ ", numberOfHosts=" + numberOfHosts +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
new file mode 100644
index 0000000..e190913
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+ private long numberOfSamples = 0;
+
+ @JsonCreator
+ public MetricHostAggregate() {
+ super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+ }
+
+ public MetricHostAggregate(Double sum, int numberOfSamples,
+ Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ @JsonProperty("numberOfSamples")
+ public long getNumberOfSamples() {
+ return numberOfSamples == 0 ? 1 : numberOfSamples;
+ }
+
+ public void updateNumberOfSamples(long count) {
+ this.numberOfSamples += count;
+ }
+
+ public void setNumberOfSamples(long numberOfSamples) {
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ public double calculateAverage() {
+ return sum / numberOfSamples;
+ }
+
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ public void updateAggregates(MetricHostAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+ }
+
+ @Override
+ public String toString() {
+ return "MetricHostAggregate{" +
+ "sum=" + sum +
+ ", numberOfSamples=" + numberOfSamples +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 44c9d4a..edace52 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -45,7 +45,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
private String type;
private String units;
private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
- private Map<String, String> metadata = new HashMap<>();
+ private HashMap<String, String> metadata = new HashMap<>();
// default
public TimelineMetric() {
@@ -151,11 +151,11 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
}
@XmlElement(name = "metadata")
- public Map<String,String> getMetadata () {
+ public HashMap<String,String> getMetadata () {
return metadata;
}
- public void setMetadata (Map<String,String> metadata) {
+ public void setMetadata (HashMap<String,String> metadata) {
this.metadata = metadata;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
new file mode 100644
index 0000000..626ac5f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+
+@XmlRootElement(name = "TimelineMetricWithAggregatedValues")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetricWithAggregatedValues {
+ private TimelineMetric timelineMetric;
+ private MetricHostAggregate metricAggregate;
+
+ public TimelineMetricWithAggregatedValues() {
+ }
+
+ public TimelineMetricWithAggregatedValues(TimelineMetric metric, MetricHostAggregate metricAggregate) {
+ timelineMetric = metric;
+ this.metricAggregate = metricAggregate;
+ }
+
+ @XmlElement
+ public MetricHostAggregate getMetricAggregate() {
+ return metricAggregate;
+ }
+ @XmlElement
+ public TimelineMetric getTimelineMetric() {
+ return timelineMetric;
+ }
+
+ public void setTimelineMetric(TimelineMetric timelineMetric) {
+ this.timelineMetric = timelineMetric;
+ }
+
+ public void setMetricAggregate(MetricHostAggregate metricAggregate) {
+ this.metricAggregate = metricAggregate;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineMetricWithAggregatedValues{" +
+ "timelineMetric=" + timelineMetric +
+ ", metricAggregate=" + metricAggregate +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
index 9b0cdbe..ce2cf79 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
@@ -90,6 +90,16 @@ public class AbstractTimelineMetricSinkTest {
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return true;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 61888;
+ }
+
+ @Override
public boolean emitMetrics(TimelineMetrics metrics) {
super.init();
return super.emitMetrics(metrics);
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
index a393a96..f0174d5 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
@@ -192,5 +192,15 @@ public class MetricCollectorHATest {
protected String getHostname() {
return "h1";
}
+
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return true;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 61888;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index 32fe32e..4eb75eb 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -125,6 +125,16 @@ public class HandleConnectExceptionTest {
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return false;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 61888;
+ }
+
+ @Override
public boolean emitMetrics(TimelineMetrics metrics) {
super.init();
return super.emitMetrics(metrics);
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 904c916..6277907 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -63,6 +63,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
private int timeoutSeconds = 10;
private boolean setInstanceId;
private String instanceId;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
+
@Override
public void start() {
@@ -110,6 +113,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
port = configuration.getProperty(COLLECTOR_PORT, "6188");
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
+
+ hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
// Initialize the collector write strategy
super.init();
@@ -162,6 +168,16 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
return hostname;
}
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
public void setPollFrequency(long pollFrequency) {
this.pollFrequency = pollFrequency;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 11e16c2..c235c7c 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -75,6 +75,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
return t;
}
});
+ private int hostInMemoryAggregationPort;
+ private boolean hostInMemoryAggregationEnabled;
@Override
public void init(SubsetConfiguration conf) {
@@ -107,7 +109,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
collectorHosts = parseHostsStringArrayIntoCollection(conf.getStringArray(COLLECTOR_HOSTS_PROPERTY));
port = conf.getString(COLLECTOR_PORT, "6188");
-
+ hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY);
+ hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY);
if (collectorHosts.isEmpty()) {
LOG.error("No Metric collector configured.");
} else {
@@ -249,6 +252,16 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void putMetrics(MetricsRecord record) {
try {
String recordName = record.name();
@@ -308,9 +321,10 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
int sbBaseLen = sb.length();
List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
- Map<String, String> metadata = null;
+ HashMap<String, String> metadata = null;
if (skipAggregation) {
- metadata = Collections.singletonMap("skipAggregation", "true");
+ metadata = new HashMap<>();
+ metadata.put("skipAggregation", "true");
}
long startTime = record.timestamp();
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
new file mode 100644
index 0000000..d7ceedd
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/var/log/ambari-metrics-monitor/ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
new file mode 100644
index 0000000..d9aabab
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=\\var\\log\\ambari-metrics-monitor\\ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
new file mode 100644
index 0000000..c2c7897
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>ambari-metrics</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>ambari-metrics-host-aggregator</artifactId>
+ <packaging>jar</packaging>
+
+ <name>ambari-metrics-host-aggregator</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>14.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-common</artifactId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <version>2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ <version>1.11</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ <version>1.11</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>2.2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ <version>1.11</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.7.1.2.3.4.0-3347</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.6</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
new file mode 100644
index 0000000..b1f60fa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Map;
+
+/**
+ * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
+ */
+public abstract class AbstractMetricPublisherThread extends Thread {
+ protected int publishIntervalInSeconds;
+ protected String publishURL;
+ protected ObjectMapper objectMapper;
+ private Log LOG;
+ protected TimelineMetricsHolder timelineMetricsHolder;
+
+ public AbstractMetricPublisherThread(TimelineMetricsHolder timelineMetricsHolder, String publishURL, int publishIntervalInSeconds) {
+ LOG = LogFactory.getLog(this.getClass());
+ this.publishURL = publishURL;
+ this.publishIntervalInSeconds = publishIntervalInSeconds;
+ this.timelineMetricsHolder = timelineMetricsHolder;
+ objectMapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+ objectMapper.setAnnotationIntrospector(introspector);
+ objectMapper.getSerializationConfig()
+ .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+ }
+
+ /**
+ * Publishes metrics to collector in specified intervals while not interrupted.
+ */
+ @Override
+ public void run() {
+ while (!isInterrupted()) {
+ try {
+ sleep(this.publishIntervalInSeconds * 1000);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ try {
+ processAndPublishMetrics(getMetricsFromCache());
+ } catch (Exception e) {
+ LOG.error("Couldn't process and send metrics : ",e);
+ }
+ }
+ }
+
+ /**
+ * Processes and sends metrics to collector.
+ * @param metricsFromCache
+ * @throws Exception
+ */
+ protected void processAndPublishMetrics(Map<Long, TimelineMetrics> metricsFromCache) throws Exception {
+ if (metricsFromCache.size()==0) return;
+
+ LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
+ publishMetricsJson(processMetrics(metricsFromCache));
+ }
+
+ /**
+ * Returns metrics map. Source is based on implementation.
+ * @return
+ */
+ protected abstract Map<Long,TimelineMetrics> getMetricsFromCache();
+
+ /**
+ * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
+ * @param metricValues
+ * @return
+ */
+ protected abstract String processMetrics(Map<Long, TimelineMetrics> metricValues);
+
+ protected void publishMetricsJson(String jsonData) throws Exception {
+ int timeout = 5 * 1000;
+ HttpURLConnection connection = null;
+ if (this.publishURL == null) {
+ throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+ }
+ LOG.info("Collector URL : " + publishURL);
+ connection = (HttpURLConnection) new URL(this.publishURL).openConnection();
+
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("Connection", "Keep-Alive");
+ connection.setConnectTimeout(timeout);
+ connection.setReadTimeout(timeout);
+ connection.setDoOutput(true);
+
+ if (jsonData != null) {
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write(jsonData.getBytes("UTF-8"));
+ }
+ }
+ int responseCode = connection.getResponseCode();
+ if (responseCode != 200) {
+ throw new Exception("responseCode is " + responseCode);
+ }
+ LOG.info("Successfully sent metrics.");
+ }
+
+ /**
+ * Interrupts the thread.
+ */
+ protected void stopPublisher() {
+ this.interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
new file mode 100644
index 0000000..0540ec9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Thread that aggregates and publishes metrics to collector on specified interval.
+ */
+public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
+
+ private Log LOG;
+
+ public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
+ super(timelineMetricsHolder, collectorURL, interval);
+ LOG = LogFactory.getLog(this.getClass());
+ }
+
+ /**
+ * get metrics map form @TimelineMetricsHolder
+ * @return
+ */
+ @Override
+ protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+ return timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ }
+
+ /**
+ * Aggregates given metrics and converts them into json string that will be send to collector
+ * @param metricForAggregationValues
+ * @return
+ */
+ @Override
+ protected String processMetrics(Map<Long, TimelineMetrics> metricForAggregationValues) {
+ HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
+ for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
+ for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+ if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
+ nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
+ }
+ nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
+ }
+ }
+ Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
+ for (TimelineMetrics metrics : nameToMetricMap.values()) {
+ double sum = 0;
+ double max = Integer.MIN_VALUE;
+ double min = Integer.MAX_VALUE;
+ int count = 0;
+ for (TimelineMetric metric : metrics.getMetrics()) {
+ for (Double value : metric.getMetricValues().values()) {
+ sum+=value;
+ max = Math.max(max, value);
+ min = Math.min(min, value);
+ count++;
+ }
+ }
+ TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
+ tmpMetric.setMetricValues(new TreeMap<Long, Double>());
+ metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
+ }
+ String json = null;
+ try {
+ json = objectMapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
+ LOG.debug(json);
+ } catch (Exception e) {
+ LOG.error("Failed to convert result into json", e);
+ }
+
+ return json;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
new file mode 100644
index 0000000..c6b703b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.sun.jersey.api.container.httpserver.HttpServerFactory;
+import com.sun.jersey.api.core.PackagesResourceConfig;
+import com.sun.jersey.api.core.ResourceConfig;
+import com.sun.net.httpserver.HttpServer;
+
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * WEB application with 2 publisher threads that processes received metrics and submits results to the collector
+ */
+public class AggregatorApplication
+{
+ private static final int STOP_SECONDS_DELAY = 0;
+ private static final int JOIN_SECONDS_TIMEOUT = 2;
+ private static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
+ private static String AGGREGATED_POST_PREFIX = "/aggregated";
+ private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+ private static Log LOG = LogFactory.getLog("AggregatorApplication.class");
+ private final int webApplicationPort;
+ private final int rawPublishingInterval;
+ private final int aggregationInterval;
+ private Configuration configuration;
+ private String [] collectorHosts;
+ private AggregatedMetricsPublisher aggregatePublisher;
+ private RawMetricsPublisher rawPublisher;
+ private TimelineMetricsHolder timelineMetricsHolder;
+ private HttpServer httpServer;
+
+ public AggregatorApplication(String collectorHosts) {
+ initConfiguration();
+ this.collectorHosts = collectorHosts.split(",");
+ this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
+ this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
+ this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
+ this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
+ try {
+ this.httpServer = createHttpServer();
+ } catch (IOException e) {
+ LOG.error("Exception while starting HTTP server. Exiting", e);
+ System.exit(1);
+ }
+ }
+
+ private void initConfiguration() {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = getClass().getClassLoader();
+ }
+
+ URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+ LOG.info("Found metric service configuration: " + amsResUrl);
+ if (amsResUrl == null) {
+ throw new IllegalStateException("Unable to initialize the metrics " +
+ "subsystem. No ams-site present in the classpath.");
+ }
+ configuration = new Configuration(true);
+ try {
+ configuration.addResource(amsResUrl.toURI().toURL());
+ } catch (Exception e) {
+ LOG.error("Couldn't init configuration. ", e);
+ System.exit(1);
+ }
+ }
+
+ private String getHostName() {
+ String hostName = "localhost";
+ try {
+ hostName = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ LOG.error(e);
+ }
+ return hostName;
+ }
+
+ private URI getURI() {
+ URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
+ LOG.info(String.format("Web server at %s", uri));
+ return uri;
+ }
+
+ private HttpServer createHttpServer() throws IOException {
+ ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
+ HashMap<String, Object> params = new HashMap();
+ params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
+ resourceConfig.setPropertiesAndFeatures(params);
+ return HttpServerFactory.create(getURI(), resourceConfig);
+ }
+
+ private void startWebServer() {
+ LOG.info("Starting web server.");
+ this.httpServer.start();
+ }
+
+ private void startAggregatePublisherThread() {
+ LOG.info("Starting aggregated metrics publisher.");
+ String collectorURL = buildBasicCollectorURL(collectorHosts[0]) + AGGREGATED_POST_PREFIX;
+ aggregatePublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, collectorURL, aggregationInterval);
+ aggregatePublisher.start();
+ }
+
+ private void startRawPublisherThread() {
+ LOG.info("Starting raw metrics publisher.");
+ String collectorURL = buildBasicCollectorURL(collectorHosts[0]);
+ rawPublisher = new RawMetricsPublisher(timelineMetricsHolder, collectorURL, rawPublishingInterval);
+ rawPublisher.start();
+ }
+
+
+
+ private void stop() {
+ aggregatePublisher.stopPublisher();
+ rawPublisher.stopPublisher();
+ httpServer.stop(STOP_SECONDS_DELAY);
+ LOG.info("Stopped web server.");
+ try {
+ LOG.info("Waiting for threads to join.");
+ aggregatePublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+ rawPublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+ LOG.info("Gracefully stopped Aggregator Application.");
+ } catch (InterruptedException e) {
+ LOG.error("Received exception during stop : ", e);
+
+ }
+
+ }
+
+ private String buildBasicCollectorURL(String host) {
+ String port = configuration.get("timeline.metrics.service.webapp.address", "0.0.0.0:6188").split(":")[1];
+ String protocol = configuration.get("timeline.metrics.service.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
+ return String.format(BASE_POST_URL, protocol, host, port);
+ }
+
+ public static void main( String[] args ) throws Exception {
+ LOG.info("Starting aggregator application");
+ if (args.length != 1) {
+ throw new Exception("This jar should be run with 1 argument - collector hosts separated with coma");
+ }
+
+ final AggregatorApplication app = new AggregatorApplication(args[0]);
+ app.startAggregatePublisherThread();
+ app.startRawPublisherThread();
+ app.startWebServer();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ LOG.info("Stopping aggregator application");
+ app.stop();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
new file mode 100644
index 0000000..f96d0ed
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+
+@Singleton
+@Path("/ws/v1/timeline")
+public class AggregatorWebService {
+ TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance();
+
+ @GET
+ @Produces("text/json")
+ @Path("/metrics")
+ public Response helloWorld() throws IOException {
+ return Response.ok().build();
+ }
+
+ @POST
+ @Produces(MediaType.TEXT_PLAIN)
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ @Path("/metrics")
+ public Response postMetrics(
+ TimelineMetrics metrics) {
+ metricsHolder.putMetricsForAggregationPublishing(metrics);
+ metricsHolder.putMetricsForRawPublishing(metrics);
+ return Response.ok().build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
new file mode 100644
index 0000000..f317ed9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.Map;
+
+public class RawMetricsPublisher extends AbstractMetricPublisherThread {
+ private final Log LOG;
+
+ public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
+ super(timelineMetricsHolder, collectorURL, interval);
+ LOG = LogFactory.getLog(this.getClass());
+ }
+
+
+ @Override
+ protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+ return timelineMetricsHolder.extractMetricsForRawPublishing();
+ }
+
+ @Override
+ protected String processMetrics(Map<Long, TimelineMetrics> metricValues) {
+ //merge everything in one TimelineMetrics object
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ for (TimelineMetrics metrics : metricValues.values()) {
+ for (TimelineMetric timelineMetric : metrics.getMetrics())
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ }
+ //map TimelineMetrics to json string
+ String json = null;
+ try {
+ json = objectMapper.writeValueAsString(timelineMetrics);
+ LOG.debug(json);
+ } catch (Exception e) {
+ LOG.error("Failed to convert result into json", e);
+ }
+ return json;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
new file mode 100644
index 0000000..b355c97
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Singleton class with 2 guava caches for raw and aggregated metrics storing
+ */
+public class TimelineMetricsHolder {
+ private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
+ private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
+ private Cache<Long, TimelineMetrics> aggregationMetricsCache;
+ private Cache<Long, TimelineMetrics> rawMetricsCache;
+ private static TimelineMetricsHolder instance = null;
+ //to ensure no metric values are expired
+ private static int EXPIRE_DELAY = 30;
+ ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock();
+ ReadWriteLock rawCacheLock = new ReentrantReadWriteLock();
+
+ private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+ this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+ this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+ }
+
+ public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+ if (instance == null) {
+ instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime);
+ }
+ return instance;
+ }
+
+ /**
+ * Uses default expiration time for caches initialization if they are not initialized yet.
+ * @return
+ */
+ public static TimelineMetricsHolder getInstance() {
+ return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME);
+ }
+
+ public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
+ aggregationCacheLock.writeLock().lock();
+ aggregationMetricsCache.put(System.currentTimeMillis(), timelineMetrics);
+ aggregationCacheLock.writeLock().unlock();
+ }
+
+ public Map<Long, TimelineMetrics> extractMetricsForAggregationPublishing() {
+ return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
+ }
+
+ public void putMetricsForRawPublishing(TimelineMetrics metrics) {
+ rawCacheLock.writeLock().lock();
+ rawMetricsCache.put(System.currentTimeMillis(), metrics);
+ rawCacheLock.writeLock().unlock();
+ }
+
+ public Map<Long, TimelineMetrics> extractMetricsForRawPublishing() {
+ return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
+ }
+
+ /**
+ * Returns values from cache and clears the cache
+ * @param cache
+ * @param lock
+ * @return
+ */
+ private Map<Long, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<Long, TimelineMetrics> cache, ReadWriteLock lock) {
+ lock.writeLock().lock();
+ Map<Long, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
+ cache.invalidateAll();
+ lock.writeLock().unlock();
+ return metricsMap;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
index 967e133..9bbb271 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
+++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
@@ -24,7 +24,7 @@ METRIC_MONITOR_PY_SCRIPT=${RESOURCE_MONITORING_DIR}/main.py
PIDFILE=/var/run/ambari-metrics-monitor/ambari-metrics-monitor.pid
OUTFILE=/var/log/ambari-metrics-monitor/ambari-metrics-monitor.out
-STOP_TIMEOUT=5
+STOP_TIMEOUT=10
OK=0
NOTOK=1
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
new file mode 100644
index 0000000..2249e53
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import threading
+import subprocess
+import logging
+import urllib2
+
+logger = logging.getLogger()
+class Aggregator(threading.Thread):
+ def __init__(self, config, stop_handler):
+ threading.Thread.__init__(self)
+ self._config = config
+ self._stop_handler = stop_handler
+ self._aggregator_process = None
+ self._sleep_interval = config.get_collector_sleep_interval()
+ self.stopped = False
+
+ def run(self):
+ java_home = self._config.get_java_home()
+ collector_hosts = self._config.get_metrics_collector_hosts_as_string()
+ jvm_agrs = self._config.get_aggregator_jvm_agrs()
+ config_dir = self._config.get_config_dir()
+ class_name = "org.apache.hadoop.metrics2.host.aggregator.AggregatorApplication"
+ ams_log_file = "ambari-metrics-aggregator.log"
+ additional_classpath = ':{0}'.format(config_dir)
+ ams_log_dir = self._config.ams_monitor_log_dir()
+ logger.info('Starting Aggregator thread.')
+ cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6}"\
+ .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, collector_hosts)
+
+ logger.info("Executing : {0}".format(cmd))
+
+ self._aggregator_process = subprocess.Popen([cmd], stdout = None, stderr = None, shell = True)
+ while not self.stopped:
+ if 0 == self._stop_handler.wait(self._sleep_interval):
+ break
+ pass
+ self.stop()
+
+ def stop(self):
+ self.stopped = True
+ if self._aggregator_process :
+ logger.info('Stopping Aggregator thread.')
+ self._aggregator_process.terminate()
+
+class AggregatorWatchdog(threading.Thread):
+ SLEEP_TIME = 30
+ CONNECTION_TIMEOUT = 5
+ AMS_AGGREGATOR_METRICS_CHECK_URL = "/ws/v1/timeline/metrics/"
+ def __init__(self, config, stop_handler):
+ threading.Thread.__init__(self)
+ self._config = config
+ self._stop_handler = stop_handler
+ self.URL = 'http://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
+ self._is_ok = threading.Event()
+ self.set_is_ok(True)
+ self.stopped = False
+
+ def run(self):
+ logger.info('Starting Aggregator Watchdog thread.')
+ while not self.stopped:
+ if 0 == self._stop_handler.wait(self.SLEEP_TIME):
+ break
+ try:
+ conn = urllib2.urlopen(self.URL, timeout=self.CONNECTION_TIMEOUT)
+ self.set_is_ok(True)
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except Exception, e:
+ self.set_is_ok(False)
+ continue
+ if conn.code != 200:
+ self.set_is_ok(False)
+ continue
+ conn.close()
+
+ def is_ok(self):
+ return self._is_ok.is_set()
+
+ def set_is_ok(self, value):
+ if value == False and self.is_ok() != value:
+ logger.warning("Watcher couldn't connect to aggregator.")
+ self._is_ok.clear()
+ else:
+ self._is_ok.set()
+
+
+ def stop(self):
+ logger.info('Stopping watcher thread.')
+ self.stopped = True
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 2670e76..d1429ed 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -30,6 +30,8 @@ from ambari_commons.os_family_impl import OsFamilyImpl
# Abstraction for OS-dependent configuration defaults
#
class ConfigDefaults(object):
+ def get_config_dir(self):
+ pass
def get_config_file_path(self):
pass
def get_metric_file_path(self):
@@ -40,11 +42,14 @@ class ConfigDefaults(object):
@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
class ConfigDefaultsWindows(ConfigDefaults):
def __init__(self):
+ self._CONFIG_DIR = "conf"
self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini"
self._METRIC_FILE_PATH = "conf\\metric_groups.conf"
self._METRIC_FILE_PATH = "conf\\ca.pem"
pass
+ def get_config_dir(self):
+ return self._CONFIG_DIR
def get_config_file_path(self):
return self._CONFIG_FILE_PATH
def get_metric_file_path(self):
@@ -55,11 +60,13 @@ class ConfigDefaultsWindows(ConfigDefaults):
@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
class ConfigDefaultsLinux(ConfigDefaults):
def __init__(self):
+ self._CONFIG_DIR = "/etc/ambari-metrics-monitor/conf/"
self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini"
self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf"
self._CA_CERTS_FILE_PATH = "/etc/ambari-metrics-monitor/conf/ca.pem"
pass
-
+ def get_config_dir(self):
+ return self._CONFIG_DIR
def get_config_file_path(self):
return self._CONFIG_FILE_PATH
def get_metric_file_path(self):
@@ -71,6 +78,7 @@ configDefaults = ConfigDefaults()
config = ConfigParser.RawConfigParser()
+CONFIG_DIR = configDefaults.get_config_dir()
CONFIG_FILE_PATH = configDefaults.get_config_file_path()
METRIC_FILE_PATH = configDefaults.get_metric_file_path()
CA_CERTS_FILE_PATH = configDefaults.get_ca_certs_file_path()
@@ -191,6 +199,8 @@ class Configuration:
# No hostname script identified in the ambari agent conf
pass
pass
+ def get_config_dir(self):
+ return CONFIG_DIR
def getConfig(self):
return self.config
@@ -214,10 +224,14 @@ class Configuration:
def get_hostname_config(self):
return self.get("default", "hostname", None)
- def get_metrics_collector_hosts(self):
+ def get_metrics_collector_hosts_as_list(self):
hosts = self.get("default", "metrics_servers", "localhost")
return hosts.split(",")
+ def get_metrics_collector_hosts_as_string(self):
+ hosts = self.get("default", "metrics_servers", "localhost")
+ return hosts
+
def get_failover_strategy(self):
return self.get("collector", "failover_strategy", ROUND_ROBIN_FAILOVER_STRATEGY)
@@ -239,6 +253,23 @@ class Configuration:
def is_server_https_enabled(self):
return "true" == str(self.get("collector", "https_enabled")).lower()
+ def get_java_home(self):
+ return self.get("aggregation", "java_home")
+
+ def is_inmemory_aggregation_enabled(self):
+ return "true" == str(self.get("aggregation", "host_in_memory_aggregation")).lower()
+
+ def get_inmemory_aggregation_port(self):
+ return self.get("aggregation", "host_in_memory_aggregation_port")
+
+ def get_aggregator_jvm_agrs(self):
+ hosts = self.get("aggregation", "jvm_arguments", "-Xmx256m -Xms128m -XX:PermSize=68m")
+ return hosts
+
+ def ams_monitor_log_dir(self):
+ hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor")
+ return hosts
+
def is_set_instanceid(self):
return "true" == str(self.get("default", "set.instanceId", 'false')).lower()