You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/10 22:52:01 UTC

ambari git commit: AMBARI-8521. Add STORM metric sink implementation to enable sink to AMS. (Szilard Nemethy via swagle)

Repository: ambari
Updated Branches:
  refs/heads/trunk 391a53b40 -> 8e4a00308


AMBARI-8521. Add STORM metric sink implementation to enable sink to AMS. (Szilard Nemethy via swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8e4a0030
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8e4a0030
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8e4a0030

Branch: refs/heads/trunk
Commit: 8e4a00308147ba496325bd4e3766ca4f9fba9d54
Parents: 391a53b
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Wed Dec 10 13:51:44 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Wed Dec 10 13:51:52 2014 -0800

----------------------------------------------------------------------
 .../ambari-metrics-storm-sink/pom.xml           | 181 +++++++++++++++++++
 .../src/main/assemblies/empty.xml               |  21 +++
 .../src/main/assemblies/sink.xml                |  34 ++++
 .../src/main/conf/storm-metrics2.properties.j2  |  21 +++
 .../sink/storm/StormTimelineMetricsSink.java    | 136 ++++++++++++++
 .../storm/StormTimelineMetricsSinkTest.java     |  68 +++++++
 ambari-metrics/pom.xml                          |   1 +
 .../services/STORM/package/scripts/params.py    |   5 +
 .../2.1/services/STORM/package/scripts/storm.py |   6 +
 .../templates/storm-metrics2.properties.j2      |  21 +++
 .../STORM/package/templates/storm.yaml.j2       |   6 +
 11 files changed, 500 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/pom.xml b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
new file mode 100644
index 0000000..9b8960c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/pom.xml
@@ -0,0 +1,181 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>ambari-metrics</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>0.1.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>ambari-metrics-storm-sink</artifactId>
+  <version>0.1.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assemblies/sink.xml</descriptor>
+          </descriptors>
+          <tarLongFileMode>gnu</tarLongFileMode>
+        </configuration>
+        <executions>
+          <execution>
+            <id>build-tarball</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.0</version>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.8</version>
+        <executions>
+          <execution>
+            <id>parse-version</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>parse-version</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>regex-property</id>
+            <goals>
+              <goal>regex-property</goal>
+            </goals>
+            <configuration>
+              <name>ambariVersion</name>
+              <value>${project.version}</value>
+              <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
+              <replacement>$1.$2.$3</replacement>
+              <failIfNoMatch>false</failIfNoMatch>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>com.github.goldin</groupId>
+        <artifactId>copy-maven-plugin</artifactId>
+        <version>0.2.5</version>
+        <executions>
+          <execution>
+            <id>create-archive</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>rpm-maven-plugin</artifactId>
+        <version>2.0.1</version>
+        <executions>
+          <execution>
+            <phase>none</phase>
+            <goals>
+              <goal>rpm</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <name>ambari-metrics-storm-sink</name>
+          <copyright>2012, Apache Software Foundation</copyright>
+          <group>Development</group>
+          <description>Maven Recipe: RPM Package.</description>
+          <mappings>
+            <mapping>
+              <directory>/usr/lib/storm/lib</directory>
+              <filemode>644</filemode>
+              <username>root</username>
+              <groupname>root</groupname>
+              <sources>
+                <source>
+                  <location>target/${project.artifactId}-${project.version}.jar</location>
+                </source>
+                <source>
+                  <location>target/lib</location>
+                </source>
+              </sources>
+            </mapping>
+          </mappings>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>0.9.3</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-common</artifactId>
+      <version>0.1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+      <version>4.10</version>
+    </dependency>
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <version>3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-easymock</artifactId>
+      <version>1.4.9</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>1.4.9</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/empty.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/empty.xml b/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/empty.xml
new file mode 100644
index 0000000..35738b1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/empty.xml
@@ -0,0 +1,21 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly>
+    <id>empty</id>
+    <formats/>
+</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/sink.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/sink.xml b/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/sink.xml
new file mode 100644
index 0000000..21a6b36
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/sink.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly>
+  <!--This 'all' id is not appended to the produced bundle because we do this:
+    http://maven.apache.org/plugins/maven-assembly-plugin/faq.html#required-classifiers
+  -->
+  <id>dist</id>
+  <formats>
+    <format>dir</format>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <files>
+    <file>
+      <source>${project.build.directory}/${artifact.artifactId}-${artifact.version}.jar</source>
+      <outputDirectory>ambari-metrics-${project.version}/lib/ambari-metrics</outputDirectory>
+    </file>
+  </files>
+</assembly>

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2 b/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2
new file mode 100644
index 0000000..6d333d5
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2
@@ -0,0 +1,21 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+collector={{metric_collector_host}}:8188
+maxRowCacheSize=10000
+sendInterval=59000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
new file mode 100644
index 0000000..a8f35e4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+import backtype.storm.task.IErrorReporter;
+import backtype.storm.task.TopologyContext;
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.base.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
+import org.apache.hadoop.metrics2.util.Servers;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
+  private SocketAddress socketAddress;
+  private String collectorUri;
+  private TimelineMetricsCache metricsCache;
+  private String hostname;
+
+  @Override
+  protected SocketAddress getServerSocketAddress() {
+    return socketAddress;
+  }
+
+  @Override
+  protected String getCollectorUri() {
+    return collectorUri;
+  }
+
+  @Override
+  public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
+    LOG.info("Preparing Storm Metrics Sink");
+    try {
+      hostname = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      LOG.error("Could not identify hostname.");
+      throw new RuntimeException("Could not identify hostname.", e);
+    }
+    Configuration configuration = new Configuration("/storm-metrics2.properties");
+    int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
+        String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
+    int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
+        String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
+    metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
+    collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + "/ws/v1/timeline/metrics";
+    List<InetSocketAddress> socketAddresses =
+        Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), 8188);
+    if (socketAddresses != null && !socketAddresses.isEmpty()) {
+      socketAddress = socketAddresses.get(0);
+    }
+  }
+
+  @Override
+  public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+    List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+    for (DataPoint dataPoint : dataPoints) {
+      if (dataPoint.value != null && NumberUtils.isNumber(dataPoint.value.toString())) {
+        LOG.info(dataPoint.name + " = " + dataPoint.value);
+        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp,
+            taskInfo.srcComponentId, dataPoint.name, dataPoint.value.toString());
+        // Put intermediate values into the cache until it is time to send
+        metricsCache.putTimelineMetric(timelineMetric);
+
+        TimelineMetric cachedMetric = metricsCache.getTimelineMetric(dataPoint.name);
+
+        if (cachedMetric != null) {
+          metricList.add(cachedMetric);
+        }
+      }
+    }
+
+    if (!metricList.isEmpty()) {
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(metricList);
+      try {
+        emitMetrics(timelineMetrics);
+      } catch (IOException e) {
+        LOG.error("Unexpected error", e);
+      }
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    LOG.info("Stopping Storm Metrics Sink");
+  }
+
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(attributeName);
+    timelineMetric.setHostName(hostname);
+    timelineMetric.setAppId(component);
+    timelineMetric.setStartTime(currentTimeMillis);
+    timelineMetric.setType(ClassUtils.getShortCanonicalName(
+        attributeValue, "Number"));
+    timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
+    return timelineMetric;
+  }
+
+  public void setMetricsCache(TimelineMetricsCache metricsCache) {
+    this.metricsCache = metricsCache;
+  }
+
+  public void setServerSocketAddress(SocketAddress socketAddress) {
+    this.socketAddress = socketAddress;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
new file mode 100644
index 0000000..95a9329
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collections;
+
+import static org.easymock.EasyMock.*;
+
+public class StormTimelineMetricsSinkTest {
+  @Test
+  public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    HttpClient httpClient = createNiceMock(HttpClient.class);
+    stormTimelineMetricsSink.setHttpClient(httpClient);
+    replay(timelineMetricsCache, httpClient);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", "value1")));
+    verify(timelineMetricsCache, httpClient);
+  }
+
+  @Test
+  public void testNumericMetricMetricSubmission() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("key1"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    HttpClient httpClient = createNiceMock(HttpClient.class);
+    stormTimelineMetricsSink.setHttpClient(httpClient);
+    expect(httpClient.executeMethod(anyObject(PostMethod.class))).andReturn(200).once();
+    stormTimelineMetricsSink.setServerSocketAddress(createNiceMock(SocketAddress.class));
+    replay(timelineMetricsCache, httpClient);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
+    verify(timelineMetricsCache, httpClient);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml
index 12c571b..cdd2c62 100644
--- a/ambari-metrics/pom.xml
+++ b/ambari-metrics/pom.xml
@@ -29,6 +29,7 @@
     <module>ambari-metrics-common</module>
     <module>ambari-metrics-hadoop-sink</module>
     <module>ambari-metrics-flume-sink</module>
+    <module>ambari-metrics-storm-sink</module>
     <module>ambari-metrics-timelineservice</module>
     <module>ambari-metrics-host-monitoring</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/params.py
index 7164ebd..ec74d77 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/params.py
@@ -83,3 +83,8 @@ if security_enabled:
     nimbus_jaas_principal = _nimbus_principal_name.replace('_HOST',nimbus_host.lower())
     nimbus_bare_jaas_principal = _nimbus_principal_name.replace('/_HOST','').replace('@'+kerberos_domain,'')
     nimbus_keytab_path = config['configurations']['storm-env']['nimbus_keytab']
+
+ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", [])
+has_metric_collector = not len(ams_collector_hosts) == 0
+if has_metric_collector:
+  metric_collector_host = ams_collector_hosts[0]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/storm.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/storm.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/storm.py
index 0462f97..fbc29d2 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/storm.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/scripts/storm.py
@@ -55,6 +55,12 @@ def storm():
        group=params.user_group
   )
 
+  if params.has_metric_collector:
+    File(format("{conf_dir}/storm-metrics2.properties"),
+        owner=params.storm_user,
+        group=params.user_group,
+        content=Template("storm-metrics2.properties.j2")
+    )
 
   File(format("{conf_dir}/storm-env.sh"),
     owner=params.storm_user,

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm-metrics2.properties.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm-metrics2.properties.j2
new file mode 100644
index 0000000..6d333d5
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm-metrics2.properties.j2
@@ -0,0 +1,21 @@
+{#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+collector={{metric_collector_host}}:8188
+maxRowCacheSize=10000
+sendInterval=59000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e4a0030/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm.yaml.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm.yaml.j2 b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm.yaml.j2
index ffb92e4..e6eed41 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm.yaml.j2
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/package/templates/storm.yaml.j2
@@ -57,4 +57,10 @@ ui.filter.params:
   "kerberos.keytab": "{{storm_ui_keytab_path}}"
   "kerberos.name.rules": "DEFAULT"
 supervisor.enable: true
+{% endif %}
+
+{% if has_metric_collector %}
+topology.metrics.consumer.register:
+  - class: "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink"
+    parallelism.hint: 1
 {% endif %}
\ No newline at end of file