You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/06/30 18:14:26 UTC
ambari git commit: AMBARI-17249 : Storm metrics sink should include
worker host and port to metric name when metrics are coming from SystemBolt
(Jungtaek Lim via avijayan)
Repository: ambari
Updated Branches:
refs/heads/branch-2.4 a08c207b5 -> 0b3fc3f1c
AMBARI-17249 : Storm metrics sink should include worker host and port to metric name when metrics are coming from SystemBolt (Jungtaek Lim via avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0b3fc3f1
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0b3fc3f1
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0b3fc3f1
Branch: refs/heads/branch-2.4
Commit: 0b3fc3f1cb079d4d99590364eafa9cfd2d0b91ff
Parents: a08c207
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Thu Jun 30 11:14:14 2016 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Thu Jun 30 11:14:14 2016 -0700
----------------------------------------------------------------------
.../sink/storm/StormTimelineMetricsSink.java | 27 ++++++++++++++++----
.../storm/StormTimelineMetricsSinkTest.java | 18 +++++++++++++
2 files changed, 40 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/0b3fc3f1/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 89906d8..6ab12e1 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -40,6 +40,8 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach
import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
+ public static final int SYSTEM_BOLT_TASK_ID = -1;
+
private String collectorUri;
private TimelineMetricsCache metricsCache;
private String hostname;
@@ -96,9 +98,17 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
for (DataPoint populatedDataPoint : populatedDataPoints) {
- TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000,
- taskInfo.srcComponentId, taskInfo.srcTaskId, taskInfo.srcWorkerHost, populatedDataPoint.name,
- Double.valueOf(populatedDataPoint.value.toString()));
+ String metricName;
+ if (taskInfo.srcTaskId == SYSTEM_BOLT_TASK_ID) {
+ metricName = createMetricNameForSystemBolt(taskInfo, populatedDataPoint.name);
+ } else {
+ metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcTaskId, populatedDataPoint.name);
+ }
+
+ LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value);
+
+ TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, taskInfo.srcWorkerHost,
+ metricName, Double.valueOf(populatedDataPoint.value.toString()));
// Put intermediate values into the cache until it is time to send
metricsCache.putTimelineMetric(timelineMetric);
@@ -176,10 +186,10 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
}
}
- private TimelineMetric createTimelineMetric(long currentTimeMillis, String componentId, int taskId, String hostName,
+ private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName,
String attributeName, Double attributeValue) {
TimelineMetric timelineMetric = new TimelineMetric();
- timelineMetric.setMetricName(createMetricName(componentId, taskId, attributeName));
+ timelineMetric.setMetricName(attributeName);
timelineMetric.setHostName(hostName);
timelineMetric.setAppId(topologyName);
timelineMetric.setStartTime(currentTimeMillis);
@@ -195,6 +205,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
return metricName.replace('_', '-');
}
+ private String createMetricNameForSystemBolt(TaskInfo taskInfo, String attributeName) {
+ String metricName = taskInfo.srcComponentId + "." + taskInfo.srcWorkerHost + "." +
+ taskInfo.srcWorkerPort + "." + attributeName;
+ // since '._' is treat as special character (separator) so it should be replaced
+ return metricName.replace('_', '-');
+ }
+
public void setMetricsCache(TimelineMetricsCache metricsCache) {
this.metricsCache = metricsCache;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0b3fc3f1/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 557d088..e582a95 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.metrics2.sink.storm;
+import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_BOLT_TASK_ID;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
@@ -68,6 +69,23 @@ public class StormTimelineMetricsSinkTest {
@Test
@Ignore // TODO: Fix for failover
+ public void testNumericMetricFromSystemBoltMetricSubmission() throws InterruptedException, IOException {
+ StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+ TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+ expect(timelineMetricsCache.getTimelineMetric("testComponent.localhost.1234.key1"))
+ .andReturn(new TimelineMetric()).once();
+ timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+ expectLastCall().once();
+ stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+ replay(timelineMetricsCache);
+ stormTimelineMetricsSink.handleDataPoints(
+ new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_BOLT_TASK_ID, 20000L, 60),
+ Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
+ verify(timelineMetricsCache);
+ }
+
+ @Test
+ @Ignore // TODO: Fix for failover
public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);