You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/06/09 17:09:43 UTC

ambari git commit: AMBARI-16946 Storm Metrics Sink has high chance to discard some datapoints (Jungtaek Lim via avijayan)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 3ffb0e848 -> 0f9ba099a


AMBARI-16946 Storm Metrics Sink has high chance to discard some datapoints (Jungtaek Lim via avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0f9ba099
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0f9ba099
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0f9ba099

Branch: refs/heads/branch-2.4
Commit: 0f9ba099a1a3c18a99c06ec71f8b631d59c623ad
Parents: 3ffb0e8
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Thu Jun 9 10:08:24 2016 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Thu Jun 9 10:09:35 2016 -0700

----------------------------------------------------------------------
 .../sink/storm/StormTimelineMetricsSink.java    | 92 ++++++++++++++++----
 .../storm/StormTimelineMetricsSinkTest.java     | 29 +++++-
 2 files changed, 104 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/0f9ba099/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 91c63b9..89906d8 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -21,12 +21,10 @@ package org.apache.hadoop.metrics2.sink.storm;
 import backtype.storm.metric.api.IMetricsConsumer;
 import backtype.storm.task.IErrorReporter;
 import backtype.storm.task.TopologyContext;
-
 import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
@@ -38,13 +36,15 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.*;
+import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS;
+import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
   private String hostname;
   private int timeoutSeconds;
+  private String topologyName;
 
   @Override
   protected String getCollectorUri() {
@@ -84,20 +84,26 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
       String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
       loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
     }
+    this.topologyName = removeNonce(topologyContext.getStormId());
   }
 
   @Override
   public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
     List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
     for (DataPoint dataPoint : dataPoints) {
-      if (dataPoint.value != null && NumberUtils.isNumber(dataPoint.value.toString())) {
-        LOG.debug(dataPoint.name + " = " + dataPoint.value);
-        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp,
-            taskInfo.srcComponentId, dataPoint.name, dataPoint.value.toString());
+      LOG.debug(dataPoint.name + " = " + dataPoint.value);
+      List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
+
+      for (DataPoint populatedDataPoint : populatedDataPoints) {
+        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000,
+            taskInfo.srcComponentId, taskInfo.srcTaskId, taskInfo.srcWorkerHost, populatedDataPoint.name,
+            Double.valueOf(populatedDataPoint.value.toString()));
+
         // Put intermediate values into the cache until it is time to send
         metricsCache.putTimelineMetric(timelineMetric);
 
-        TimelineMetric cachedMetric = metricsCache.getTimelineMetric(dataPoint.name);
+        TimelineMetric cachedMetric = metricsCache.getTimelineMetric(timelineMetric.getMetricName());
 
         if (cachedMetric != null) {
           metricList.add(cachedMetric);
@@ -108,6 +114,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     if (!metricList.isEmpty()) {
       TimelineMetrics timelineMetrics = new TimelineMetrics();
       timelineMetrics.setMetrics(metricList);
+
       try {
         emitMetrics(timelineMetrics);
       } catch (UnableToConnectException uce) {
@@ -121,20 +128,75 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     LOG.info("Stopping Storm Metrics Sink");
   }
 
-  private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
+  private String removeNonce(String topologyId) {
+    return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-"));
+  }
+
+  private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
+    List<DataPoint> dataPoints = new ArrayList<>();
+
+    if (dataPoint.value == null) {
+      LOG.warn("Data point with name " + dataPoint.name + " is null. Discarding." + dataPoint.name);
+    } else if (dataPoint.value instanceof Map) {
+      Map<String, Object> dataMap = (Map<String, Object>) dataPoint.value;
+
+      for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
+        Double value = convertValueToDouble(entry.getKey(), entry.getValue());
+        if (value != null) {
+          dataPoints.add(new DataPoint(dataPoint.name + "." + entry.getKey(), value));
+        }
+      }
+    } else {
+      Double value = convertValueToDouble(dataPoint.name, dataPoint.value);
+      if (value != null) {
+        dataPoints.add(new DataPoint(dataPoint.name, value));
+      }
+    }
+
+    return dataPoints;
+  }
+
+  private Double convertValueToDouble(String metricName, Object value) {
+    if (value instanceof Number) {
+      return ((Number) value).doubleValue();
+    } else if (value instanceof String) {
+      try {
+        return Double.parseDouble((String) value);
+      } catch (NumberFormatException e) {
+        LOG.warn("Data point with name " + metricName + " doesn't have number format value " +
+            value + ". Discarding.");
+      }
+
+      return null;
+    } else {
+      LOG.warn("Data point with name " + metricName + " has value " + value +
+          " which is not supported. Discarding.");
+
+      return null;
+    }
+  }
+
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String componentId, int taskId, String hostName,
+      String attributeName, Double attributeValue) {
     TimelineMetric timelineMetric = new TimelineMetric();
-    timelineMetric.setMetricName(attributeName);
-    timelineMetric.setHostName(hostname);
-    timelineMetric.setAppId(component);
+    timelineMetric.setMetricName(createMetricName(componentId, taskId, attributeName));
+    timelineMetric.setHostName(hostName);
+    timelineMetric.setAppId(topologyName);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.setType(ClassUtils.getShortCanonicalName(
         attributeValue, "Number"));
-    timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
+    timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue);
     return timelineMetric;
   }
 
+  private String createMetricName(String componentId, int taskId, String attributeName) {
+    String metricName = componentId + "." + taskId + "." + attributeName;
+    // since '._' is treat as special character (separator) so it should be replaced
+    return metricName.replace('_', '-');
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0f9ba099/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 3f139da..557d088 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -28,9 +28,12 @@ import static org.easymock.EasyMock.verify;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import backtype.storm.metric.api.IMetricsConsumer;
@@ -52,8 +55,7 @@ public class StormTimelineMetricsSinkTest {
   public void testNumericMetricMetricSubmission() throws InterruptedException, IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
     TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
-    expect(timelineMetricsCache.getTimelineMetric("key1"))
-        .andReturn(new TimelineMetric()).once();
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1")).andReturn(new TimelineMetric()).once();
     timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
     expectLastCall().once();
     stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
@@ -63,4 +65,27 @@ public class StormTimelineMetricsSinkTest {
         Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
     verify(timelineMetricsCache);
   }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field1"))
+        .andReturn(new TimelineMetric()).once();
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field2"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+
+    Map<String, Object> valueMap = new HashMap<>();
+    valueMap.put("field1", 53);
+    valueMap.put("field2", 64.12);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMap)));
+    verify(timelineMetricsCache);
+  }
 }