You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/06/09 17:09:43 UTC
ambari git commit: AMBARI-16946 Storm Metrics Sink has high chance to
discard some datapoints (Jungtaek Lim via avijayan)
Repository: ambari
Updated Branches:
refs/heads/branch-2.4 3ffb0e848 -> 0f9ba099a
AMBARI-16946 Storm Metrics Sink has high chance to discard some datapoints (Jungtaek Lim via avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0f9ba099
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0f9ba099
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0f9ba099
Branch: refs/heads/branch-2.4
Commit: 0f9ba099a1a3c18a99c06ec71f8b631d59c623ad
Parents: 3ffb0e8
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Thu Jun 9 10:08:24 2016 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Thu Jun 9 10:09:35 2016 -0700
----------------------------------------------------------------------
.../sink/storm/StormTimelineMetricsSink.java | 92 ++++++++++++++++----
.../storm/StormTimelineMetricsSinkTest.java | 29 +++++-
2 files changed, 104 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/0f9ba099/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 91c63b9..89906d8 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -21,12 +21,10 @@ package org.apache.hadoop.metrics2.sink.storm;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
-
import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
@@ -38,13 +36,15 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.*;
+import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS;
+import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
private String collectorUri;
private TimelineMetricsCache metricsCache;
private String hostname;
private int timeoutSeconds;
+ private String topologyName;
@Override
protected String getCollectorUri() {
@@ -84,20 +84,26 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
}
+ this.topologyName = removeNonce(topologyContext.getStormId());
}
@Override
public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
for (DataPoint dataPoint : dataPoints) {
- if (dataPoint.value != null && NumberUtils.isNumber(dataPoint.value.toString())) {
- LOG.debug(dataPoint.name + " = " + dataPoint.value);
- TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp,
- taskInfo.srcComponentId, dataPoint.name, dataPoint.value.toString());
+ LOG.debug(dataPoint.name + " = " + dataPoint.value);
+ List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
+
+ for (DataPoint populatedDataPoint : populatedDataPoints) {
+ TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000,
+ taskInfo.srcComponentId, taskInfo.srcTaskId, taskInfo.srcWorkerHost, populatedDataPoint.name,
+ Double.valueOf(populatedDataPoint.value.toString()));
+
// Put intermediate values into the cache until it is time to send
metricsCache.putTimelineMetric(timelineMetric);
- TimelineMetric cachedMetric = metricsCache.getTimelineMetric(dataPoint.name);
+ TimelineMetric cachedMetric = metricsCache.getTimelineMetric(timelineMetric.getMetricName());
if (cachedMetric != null) {
metricList.add(cachedMetric);
@@ -108,6 +114,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
if (!metricList.isEmpty()) {
TimelineMetrics timelineMetrics = new TimelineMetrics();
timelineMetrics.setMetrics(metricList);
+
try {
emitMetrics(timelineMetrics);
} catch (UnableToConnectException uce) {
@@ -121,20 +128,75 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
LOG.info("Stopping Storm Metrics Sink");
}
- private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
+ private String removeNonce(String topologyId) {
+ return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-"));
+ }
+
+ private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
+ List<DataPoint> dataPoints = new ArrayList<>();
+
+ if (dataPoint.value == null) {
+ LOG.warn("Data point with name " + dataPoint.name + " is null. Discarding." + dataPoint.name);
+ } else if (dataPoint.value instanceof Map) {
+ Map<String, Object> dataMap = (Map<String, Object>) dataPoint.value;
+
+ for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
+ Double value = convertValueToDouble(entry.getKey(), entry.getValue());
+ if (value != null) {
+ dataPoints.add(new DataPoint(dataPoint.name + "." + entry.getKey(), value));
+ }
+ }
+ } else {
+ Double value = convertValueToDouble(dataPoint.name, dataPoint.value);
+ if (value != null) {
+ dataPoints.add(new DataPoint(dataPoint.name, value));
+ }
+ }
+
+ return dataPoints;
+ }
+
+ private Double convertValueToDouble(String metricName, Object value) {
+ if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ } else if (value instanceof String) {
+ try {
+ return Double.parseDouble((String) value);
+ } catch (NumberFormatException e) {
+ LOG.warn("Data point with name " + metricName + " doesn't have number format value " +
+ value + ". Discarding.");
+ }
+
+ return null;
+ } else {
+ LOG.warn("Data point with name " + metricName + " has value " + value +
+ " which is not supported. Discarding.");
+
+ return null;
+ }
+ }
+
+ private TimelineMetric createTimelineMetric(long currentTimeMillis, String componentId, int taskId, String hostName,
+ String attributeName, Double attributeValue) {
TimelineMetric timelineMetric = new TimelineMetric();
- timelineMetric.setMetricName(attributeName);
- timelineMetric.setHostName(hostname);
- timelineMetric.setAppId(component);
+ timelineMetric.setMetricName(createMetricName(componentId, taskId, attributeName));
+ timelineMetric.setHostName(hostName);
+ timelineMetric.setAppId(topologyName);
timelineMetric.setStartTime(currentTimeMillis);
timelineMetric.setType(ClassUtils.getShortCanonicalName(
attributeValue, "Number"));
- timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
+ timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue);
return timelineMetric;
}
+ private String createMetricName(String componentId, int taskId, String attributeName) {
+ String metricName = componentId + "." + taskId + "." + attributeName;
+ // since '._' is treat as special character (separator) so it should be replaced
+ return metricName.replace('_', '-');
+ }
+
public void setMetricsCache(TimelineMetricsCache metricsCache) {
this.metricsCache = metricsCache;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0f9ba099/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 3f139da..557d088 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -28,9 +28,12 @@ import static org.easymock.EasyMock.verify;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.junit.Ignore;
import org.junit.Test;
import backtype.storm.metric.api.IMetricsConsumer;
@@ -52,8 +55,7 @@ public class StormTimelineMetricsSinkTest {
public void testNumericMetricMetricSubmission() throws InterruptedException, IOException {
StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
- expect(timelineMetricsCache.getTimelineMetric("key1"))
- .andReturn(new TimelineMetric()).once();
+ expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1")).andReturn(new TimelineMetric()).once();
timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
expectLastCall().once();
stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
@@ -63,4 +65,27 @@ public class StormTimelineMetricsSinkTest {
Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
verify(timelineMetricsCache);
}
+
+ @Test
+ @Ignore // TODO: Fix for failover
+ public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
+ StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+ TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+ expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field1"))
+ .andReturn(new TimelineMetric()).once();
+ expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field2"))
+ .andReturn(new TimelineMetric()).once();
+ timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+ expectLastCall().once();
+ stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+ replay(timelineMetricsCache);
+
+ Map<String, Object> valueMap = new HashMap<>();
+ valueMap.put("field1", 53);
+ valueMap.put("field2", 64.12);
+ stormTimelineMetricsSink.handleDataPoints(
+ new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+ Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMap)));
+ verify(timelineMetricsCache);
+ }
}