You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/10 22:52:01 UTC
ambari git commit: AMBARI-8521. Add STORM metric sink implementation
to enable sink to AMS. (Szilard Nemethy via swagle)
Repository: ambari
Updated Branches:
refs/heads/trunk 391a53b40 -> 8e4a00308
AMBARI-8521. Add STORM metric sink implementation to enable sink to AMS. (Szilard Nemethy via swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8e4a0030
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8e4a0030
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8e4a0030
Branch: refs/heads/trunk
Commit: 8e4a00308147ba496325bd4e3766ca4f9fba9d54
Parents: 391a53b
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Wed Dec 10 13:51:44 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Wed Dec 10 13:51:52 2014 -0800
----------------------------------------------------------------------
.../ambari-metrics-storm-sink/pom.xml | 181 +++++++++++++++++++
.../src/main/assemblies/empty.xml | 21 +++
.../src/main/assemblies/sink.xml | 34 ++++
.../src/main/conf/storm-metrics2.properties.j2 | 21 +++
.../sink/storm/StormTimelineMetricsSink.java | 136 ++++++++++++++
.../storm/StormTimelineMetricsSinkTest.java | 68 +++++++
ambari-metrics/pom.xml | 1 +
.../services/STORM/package/scripts/params.py | 5 +
.../2.1/services/STORM/package/scripts/storm.py | 6 +
.../templates/storm-metrics2.properties.j2 | 21 +++
.../STORM/package/templates/storm.yaml.j2 | 6 +
11 files changed, 500 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/pom.xml b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
new file mode 100644
index 0000000..9b8960c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
@@ -0,0 +1,181 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>ambari-metrics</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>ambari-metrics-storm-sink</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assemblies/sink.xml</descriptor>
+ </descriptors>
+ <tarLongFileMode>gnu</tarLongFileMode>
+ </configuration>
+ <executions>
+ <execution>
+ <id>build-tarball</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.0</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>parse-version</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>parse-version</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>regex-property</id>
+ <goals>
+ <goal>regex-property</goal>
+ </goals>
+ <configuration>
+ <name>ambariVersion</name>
+ <value>${project.version}</value>
+ <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
+ <replacement>$1.$2.$3</replacement>
+ <failIfNoMatch>false</failIfNoMatch>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>com.github.goldin</groupId>
+ <artifactId>copy-maven-plugin</artifactId>
+ <version>0.2.5</version>
+ <executions>
+ <execution>
+ <id>create-archive</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>rpm-maven-plugin</artifactId>
+ <version>2.0.1</version>
+ <executions>
+ <execution>
+ <phase>none</phase>
+ <goals>
+ <goal>rpm</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <name>ambari-metrics-storm-sink</name>
+ <copyright>2012, Apache Software Foundation</copyright>
+ <group>Development</group>
+ <description>Maven Recipe: RPM Package.</description>
+ <mappings>
+ <mapping>
+ <directory>/usr/lib/storm/lib</directory>
+ <filemode>644</filemode>
+ <username>root</username>
+ <groupname>root</groupname>
+ <sources>
+ <source>
+ <location>target/${project.artifactId}-${project.version}.jar</location>
+ </source>
+ <source>
+ <location>target/lib</location>
+ </source>
+ </sources>
+ </mapping>
+ </mappings>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>0.9.3</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-common</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ <version>4.10</version>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>3.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-easymock</artifactId>
+ <version>1.4.9</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.4.9</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/empty.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/empty.xml b/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/empty.xml
new file mode 100644
index 0000000..35738b1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/empty.xml
@@ -0,0 +1,21 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly>
+ <id>empty</id>
+ <formats/>
+</assembly>
http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/sink.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/sink.xml b/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/sink.xml
new file mode 100644
index 0000000..21a6b36
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/sink.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly>
+ <!--This 'all' id is not appended to the produced bundle because we do this:
+ http://maven.apache.org/plugins/maven-assembly-plugin/faq.html#required-classifiers
+ -->
+ <id>dist</id>
+ <formats>
+ <format>dir</format>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <files>
+ <file>
+ <source>${project.build.directory}/${artifact.artifactId}-${artifact.version}.jar</source>
+ <outputDirectory>ambari-metrics-${project.version}/lib/ambari-metrics</outputDirectory>
+ </file>
+ </files>
+</assembly>
http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2 b/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2
new file mode 100644
index 0000000..6d333d5
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2
@@ -0,0 +1,21 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+collector={{metric_collector_host}}:8188
+maxRowCacheSize=10000
+sendInterval=59000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
new file mode 100644
index 0000000..a8f35e4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+import backtype.storm.task.IErrorReporter;
+import backtype.storm.task.TopologyContext;
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
+import org.apache.hadoop.metrics2.util.Servers;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
+ private SocketAddress socketAddress;
+ private String collectorUri;
+ private TimelineMetricsCache metricsCache;
+ private String hostname;
+
+ @Override
+ protected SocketAddress getServerSocketAddress() {
+ return socketAddress;
+ }
+
+ @Override
+ protected String getCollectorUri() {
+ return collectorUri;
+ }
+
+ @Override
+ public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
+ LOG.info("Preparing Storm Metrics Sink");
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ LOG.error("Could not identify hostname.");
+ throw new RuntimeException("Could not identify hostname.", e);
+ }
+ Configuration configuration = new Configuration("/storm-metrics2.properties");
+ int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
+ String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
+ int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
+ String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
+ metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
+ collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + "/ws/v1/timeline/metrics";
+ List<InetSocketAddress> socketAddresses =
+ Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), 8188);
+ if (socketAddresses != null && !socketAddresses.isEmpty()) {
+ socketAddress = socketAddresses.get(0);
+ }
+ }
+
+ @Override
+ public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+ List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+ for (DataPoint dataPoint : dataPoints) {
+ if (dataPoint.value != null && NumberUtils.isNumber(dataPoint.value.toString())) {
+ LOG.info(dataPoint.name + " = " + dataPoint.value);
+ TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp,
+ taskInfo.srcComponentId, dataPoint.name, dataPoint.value.toString());
+ // Put intermediate values into the cache until it is time to send
+ metricsCache.putTimelineMetric(timelineMetric);
+
+ TimelineMetric cachedMetric = metricsCache.getTimelineMetric(dataPoint.name);
+
+ if (cachedMetric != null) {
+ metricList.add(cachedMetric);
+ }
+ }
+ }
+
+ if (!metricList.isEmpty()) {
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(metricList);
+ try {
+ emitMetrics(timelineMetrics);
+ } catch (IOException e) {
+ LOG.error("Unexpected error", e);
+ }
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ LOG.info("Stopping Storm Metrics Sink");
+ }
+
+ private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(attributeName);
+ timelineMetric.setHostName(hostname);
+ timelineMetric.setAppId(component);
+ timelineMetric.setStartTime(currentTimeMillis);
+ timelineMetric.setType(ClassUtils.getShortCanonicalName(
+ attributeValue, "Number"));
+ timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
+ return timelineMetric;
+ }
+
+ public void setMetricsCache(TimelineMetricsCache metricsCache) {
+ this.metricsCache = metricsCache;
+ }
+
+ public void setServerSocketAddress(SocketAddress socketAddress) {
+ this.socketAddress = socketAddress;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
new file mode 100644
index 0000000..95a9329
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collections;
+
+import static org.easymock.EasyMock.*;
+
+public class StormTimelineMetricsSinkTest {
+ @Test
+ public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException {
+ StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+ TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+ stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+ HttpClient httpClient = createNiceMock(HttpClient.class);
+ stormTimelineMetricsSink.setHttpClient(httpClient);
+ replay(timelineMetricsCache, httpClient);
+ stormTimelineMetricsSink.handleDataPoints(
+ new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+ Collections.singleton(new IMetricsConsumer.DataPoint("key1", "value1")));
+ verify(timelineMetricsCache, httpClient);
+ }
+
+ @Test
+ public void testNumericMetricMetricSubmission() throws InterruptedException, IOException {
+ StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+ TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+ expect(timelineMetricsCache.getTimelineMetric("key1"))
+ .andReturn(new TimelineMetric()).once();
+ timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+ expectLastCall().once();
+ stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+ HttpClient httpClient = createNiceMock(HttpClient.class);
+ stormTimelineMetricsSink.setHttpClient(httpClient);
+ expect(httpClient.executeMethod(anyObject(PostMethod.class))).andReturn(200).once();
+ stormTimelineMetricsSink.setServerSocketAddress(createNiceMock(SocketAddress.class));
+ replay(timelineMetricsCache, httpClient);
+ stormTimelineMetricsSink.handleDataPoints(
+ new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+ Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
+ verify(timelineMetricsCache, httpClient);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml
index 12c571b..cdd2c62 100644
--- a/ambari-metrics/pom.xml
+++ b/ambari-metrics/pom.xml
@@ -29,6 +29,7 @@
<module>ambari-metrics-common</module>
<module>ambari-metrics-hadoop-sink</module>
<module>ambari-metrics-flume-sink</module>
+ <module>ambari-metrics-storm-sink</module>
<module>ambari-metrics-timelineservice</module>
<module>ambari-metrics-host-monitoring</module>
</modules>
http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/params.py
index 7164ebd..ec74d77 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/params.py
@@ -83,3 +83,8 @@ if security_enabled:
nimbus_jaas_principal = _nimbus_principal_name.replace('_HOST',nimbus_host.lower())
nimbus_bare_jaas_principal = _nimbus_principal_name.replace('/_HOST','').replace('@'+kerberos_domain,'')
nimbus_keytab_path = config['configurations']['storm-env']['nimbus_keytab']
+
+ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", [])
+has_metric_collector = not len(ams_collector_hosts) == 0
+if has_metric_collector:
+ metric_collector_host = ams_collector_hosts[0]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/storm.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/storm.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/storm.py
index 0462f97..fbc29d2 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/storm.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/storm.py
@@ -55,6 +55,12 @@ def storm():
group=params.user_group
)
+ if params.has_metric_collector:
+ File(format("{conf_dir}/storm-metrics2.properties"),
+ owner=params.storm_user,
+ group=params.user_group,
+ content=Template("storm-metrics2.properties.j2")
+ )
File(format("{conf_dir}/storm-env.sh"),
owner=params.storm_user,
http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm-metrics2.properties.j2
new file mode 100644
index 0000000..6d333d5
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm-metrics2.properties.j2
@@ -0,0 +1,21 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+collector={{metric_collector_host}}:8188
+maxRowCacheSize=10000
+sendInterval=59000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm.yaml.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm.yaml.j2 b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm.yaml.j2
index ffb92e4..e6eed41 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm.yaml.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm.yaml.j2
@@ -57,4 +57,10 @@ ui.filter.params:
"kerberos.keytab": "{{storm_ui_keytab_path}}"
"kerberos.name.rules": "DEFAULT"
supervisor.enable: true
+{% endif %}
+
+{% if has_metric_collector %}
+topology.metrics.consumer.register:
+ - class: "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink"
+ parallelism.hint: 1
{% endif %}
\ No newline at end of file