You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/03/07 05:01:35 UTC
ambari git commit: AMBARI-9970. Metrics are absent for Storm.
(mpapyrkovskyy via swagle)
Repository: ambari
Updated Branches:
refs/heads/branch-2.0.0 3f9ae9ef0 -> 913c086a4
AMBARI-9970. Metrics are absent for Storm. (mpapyrkovskyy via swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/913c086a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/913c086a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/913c086a
Branch: refs/heads/branch-2.0.0
Commit: 913c086a4626b0925e1f7d7a104b46b5579d9b29
Parents: 3f9ae9e
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Fri Mar 6 20:01:23 2015 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Fri Mar 6 20:01:23 2015 -0800
----------------------------------------------------------------------
.../timeline/AbstractTimelineMetricsSink.java | 26 ++++++--------------
.../cache/HandleConnectExceptionTest.java | 4 ---
.../sink/flume/FlumeTimelineMetricsSink.java | 13 ----------
.../timeline/HadoopTimelineMetricsSink.java | 7 ------
.../kafka/KafkaTimelineMetricsReporter.java | 15 -----------
.../storm/StormTimelineMetricsReporter.java | 21 +++++-----------
.../sink/storm/StormTimelineMetricsSink.java | 17 -------------
.../storm/StormTimelineMetricsSinkTest.java | 1 -
8 files changed, 14 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 4f5c6a1..fd4cacd 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -19,10 +19,6 @@ package org.apache.hadoop.metrics2.sink.timeline;
import java.io.IOException;
import java.net.ConnectException;
-import java.net.SocketAddress;
-
-import java.io.IOException;
-import java.net.SocketAddress;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
@@ -63,19 +59,15 @@ public abstract class AbstractTimelineMetricsSink {
try {
String jsonData = mapper.writeValueAsString(metrics);
- SocketAddress socketAddress = getServerSocketAddress();
+ StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
- if (socketAddress != null) {
- StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
-
- PostMethod postMethod = new PostMethod(connectUrl);
- postMethod.setRequestEntity(requestEntity);
- int statusCode = httpClient.executeMethod(postMethod);
- if (statusCode != 200) {
- LOG.info("Unable to POST metrics to collector, " + connectUrl);
- } else {
- LOG.debug("Metrics posted to Collector " + connectUrl);
- }
+ PostMethod postMethod = new PostMethod(connectUrl);
+ postMethod.setRequestEntity(requestEntity);
+ int statusCode = httpClient.executeMethod(postMethod);
+ if (statusCode != 200) {
+ LOG.info("Unable to POST metrics to collector, " + connectUrl);
+ } else {
+ LOG.debug("Metrics posted to Collector " + connectUrl);
}
} catch (ConnectException e) {
throw new UnableToConnectException(e).setConnectUrl(connectUrl);
@@ -86,7 +78,5 @@ public abstract class AbstractTimelineMetricsSink {
this.httpClient = httpClient;
}
- abstract protected SocketAddress getServerSocketAddress();
-
abstract protected String getCollectorUri();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index 450906a..2786e3c 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -65,10 +65,6 @@ public class HandleConnectExceptionTest {
}
class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{
@Override
- protected SocketAddress getServerSocketAddress() {
- return new InetSocketAddress("host", 13);
- }
- @Override
protected String getCollectorUri() {
return COLLECTOR_URL;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 9e66c99..a6137af 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -30,11 +30,9 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-import org.apache.hadoop.metrics2.sink.util.Servers;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -50,7 +48,6 @@ import java.util.concurrent.TimeUnit;
public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService {
- private SocketAddress socketAddress;
private String collectorUri;
private TimelineMetricsCache metricsCache;
private ScheduledExecutorService scheduledExecutorService;
@@ -94,11 +91,6 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
String collectorHostname = configuration.getProperty(COLLECTOR_HOST_PROPERTY);
String port = configuration.getProperty(COLLECTOR_PORT_PROPERTY);
collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
- List<InetSocketAddress> socketAddresses =
- Servers.parse(collectorHostname, Integer.valueOf(port));
- if (socketAddresses != null && !socketAddresses.isEmpty()) {
- socketAddress = socketAddresses.get(0);
- }
pollFrequency = Long.parseLong(configuration.getProperty("collectionFrequency"));
String[] metrics = configuration.getProperty(COUNTER_METRICS_PROPERTY).trim().split(",");
@@ -106,11 +98,6 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
}
@Override
- public SocketAddress getServerSocketAddress() {
- return socketAddress;
- }
-
- @Override
public String getCollectorUri() {
return collectorUri;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 06f6011..9ecb0ed 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -131,13 +131,6 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
return conf.getPrefix();
}
- protected SocketAddress getServerSocketAddress() {
- if (metricsServers != null && !metricsServers.isEmpty()) {
- return metricsServers.get(0);
- }
- return null;
- }
-
@Override
protected String getCollectorUri() {
return collectorUri;
http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index 1f44494..cc365bd 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.metrics2.sink.kafka;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -40,7 +38,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.sink.util.Servers;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
@@ -73,16 +70,10 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
private final Object lock = new Object();
private String collectorUri;
private String hostname;
- private SocketAddress socketAddress;
private TimelineScheduledReporter reporter;
private TimelineMetricsCache metricsCache;
@Override
- protected SocketAddress getServerSocketAddress() {
- return socketAddress;
- }
-
- @Override
protected String getCollectorUri() {
return collectorUri;
}
@@ -110,18 +101,12 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics";
- List<InetSocketAddress> socketAddresses = Servers.parse(metricCollectorHost,
- Integer.parseInt(metricCollectorPort));
- if (socketAddresses != null && !socketAddresses.isEmpty()) {
- socketAddress = socketAddresses.get(0);
- }
initializeReporter();
if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
startReporter(metricsConfig.pollingIntervalSecs());
}
if (LOG.isTraceEnabled()) {
LOG.trace("CollectorUri = " + collectorUri);
- LOG.trace("SocketAddress = " + socketAddress);
LOG.trace("MetricsSendInterval = " + metricsSendInterval);
LOG.trace("MaxRowCacheSize = " + maxRowCacheSize);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 2d4baa3..89fc2ca 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -29,11 +29,9 @@ import org.apache.commons.lang.Validate;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.util.Servers;
+import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -48,7 +46,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
public static final String APP_ID = "appId";
private String hostname;
- private SocketAddress socketAddress;
private String collectorUri;
private NimbusClient nimbusClient;
private String applicationId;
@@ -58,11 +55,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
}
@Override
- protected SocketAddress getServerSocketAddress() {
- return this.socketAddress;
- }
-
- @Override
protected String getCollectorUri() {
return this.collectorUri;
}
@@ -85,11 +77,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
String port = cf.get(COLLECTOR_PORT).toString();
applicationId = cf.get(APP_ID).toString();
collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
- List<InetSocketAddress> socketAddresses =
- Servers.parse(collectorHostname, Integer.valueOf(port));
- if (socketAddresses != null && !socketAddresses.isEmpty()) {
- socketAddress = socketAddresses.get(0);
- }
} catch (Exception e) {
LOG.warn("Could not initialize metrics collector, please specify host, " +
"port under $STORM_HOME/conf/config.yaml ", e);
@@ -139,7 +126,11 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
TimelineMetrics timelineMetrics = new TimelineMetrics();
timelineMetrics.setMetrics(totalMetrics);
- emitMetrics(timelineMetrics);
+ try {
+ emitMetrics(timelineMetrics);
+ } catch (UnableToConnectException e) {
+ LOG.warn("Unable to connect to Metrics Collector " + e.getConnectUrl() + ". " + e.getMessage());
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index dd0e72f..767695b 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -30,12 +30,9 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-import org.apache.hadoop.metrics2.sink.util.Servers;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
@@ -43,17 +40,11 @@ import java.util.List;
import java.util.Map;
public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
- private SocketAddress socketAddress;
private String collectorUri;
private TimelineMetricsCache metricsCache;
private String hostname;
@Override
- protected SocketAddress getServerSocketAddress() {
- return socketAddress;
- }
-
- @Override
protected String getCollectorUri() {
return collectorUri;
}
@@ -74,11 +65,6 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + ":" + configuration.getProperty(COLLECTOR_PORT_PROPERTY) + "/ws/v1/timeline/metrics";
- List<InetSocketAddress> socketAddresses =
- Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), Integer.valueOf(configuration.getProperty(COLLECTOR_PORT_PROPERTY)));
- if (socketAddresses != null && !socketAddresses.isEmpty()) {
- socketAddress = socketAddresses.get(0);
- }
}
@Override
@@ -134,7 +120,4 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
this.metricsCache = metricsCache;
}
- public void setServerSocketAddress(SocketAddress socketAddress) {
- this.socketAddress = socketAddress;
- }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 15021e5..a0600e5 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -64,7 +64,6 @@ public class StormTimelineMetricsSinkTest {
HttpClient httpClient = createNiceMock(HttpClient.class);
stormTimelineMetricsSink.setHttpClient(httpClient);
expect(httpClient.executeMethod(anyObject(PostMethod.class))).andReturn(200).once();
- stormTimelineMetricsSink.setServerSocketAddress(createNiceMock(SocketAddress.class));
replay(timelineMetricsCache, httpClient);
stormTimelineMetricsSink.handleDataPoints(
new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),