You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/06/30 18:14:26 UTC

ambari git commit: AMBARI-17249 : Storm metrics sink should include worker host and port to metric name when metrics are coming from SystemBolt (Jungtaek Lim via avijayan)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 a08c207b5 -> 0b3fc3f1c


 AMBARI-17249 : Storm metrics sink should include worker host and port to metric name when metrics are coming from SystemBolt (Jungtaek Lim via avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0b3fc3f1
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0b3fc3f1
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0b3fc3f1

Branch: refs/heads/branch-2.4
Commit: 0b3fc3f1cb079d4d99590364eafa9cfd2d0b91ff
Parents: a08c207
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Thu Jun 30 11:14:14 2016 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Thu Jun 30 11:14:14 2016 -0700

----------------------------------------------------------------------
 .../sink/storm/StormTimelineMetricsSink.java    | 27 ++++++++++++++++----
 .../storm/StormTimelineMetricsSinkTest.java     | 18 +++++++++++++
 2 files changed, 40 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/0b3fc3f1/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 89906d8..6ab12e1 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -40,6 +40,8 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach
 import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
+  public static final int SYSTEM_BOLT_TASK_ID = -1;
+
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
   private String hostname;
@@ -96,9 +98,17 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
       List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
 
       for (DataPoint populatedDataPoint : populatedDataPoints) {
-        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000,
-            taskInfo.srcComponentId, taskInfo.srcTaskId, taskInfo.srcWorkerHost, populatedDataPoint.name,
-            Double.valueOf(populatedDataPoint.value.toString()));
+        String metricName;
+        if (taskInfo.srcTaskId == SYSTEM_BOLT_TASK_ID) {
+          metricName = createMetricNameForSystemBolt(taskInfo, populatedDataPoint.name);
+        } else {
+          metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcTaskId, populatedDataPoint.name);
+        }
+
+        LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value);
+
+        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, taskInfo.srcWorkerHost,
+            metricName, Double.valueOf(populatedDataPoint.value.toString()));
 
         // Put intermediate values into the cache until it is time to send
         metricsCache.putTimelineMetric(timelineMetric);
@@ -176,10 +186,10 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     }
   }
 
-  private TimelineMetric createTimelineMetric(long currentTimeMillis, String componentId, int taskId, String hostName,
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName,
       String attributeName, Double attributeValue) {
     TimelineMetric timelineMetric = new TimelineMetric();
-    timelineMetric.setMetricName(createMetricName(componentId, taskId, attributeName));
+    timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostName);
     timelineMetric.setAppId(topologyName);
     timelineMetric.setStartTime(currentTimeMillis);
@@ -195,6 +205,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     return metricName.replace('_', '-');
   }
 
+  private String createMetricNameForSystemBolt(TaskInfo taskInfo, String attributeName) {
+    String metricName = taskInfo.srcComponentId + "." + taskInfo.srcWorkerHost + "." +
+        taskInfo.srcWorkerPort + "." + attributeName;
+    // since '._' is treat as special character (separator) so it should be replaced
+    return metricName.replace('_', '-');
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0b3fc3f1/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 557d088..e582a95 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
+import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_BOLT_TASK_ID;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
@@ -68,6 +69,23 @@ public class StormTimelineMetricsSinkTest {
 
   @Test
   @Ignore // TODO: Fix for failover
+  public void testNumericMetricFromSystemBoltMetricSubmission() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.localhost.1234.key1"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_BOLT_TASK_ID, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
   public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
     TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);