You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/03/07 05:01:35 UTC

ambari git commit: AMBARI-9970. Metrics are absent for Storm. (mpapyrkovskyy via swagle)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.0.0 3f9ae9ef0 -> 913c086a4


AMBARI-9970. Metrics are absent for Storm. (mpapyrkovskyy via swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/913c086a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/913c086a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/913c086a

Branch: refs/heads/branch-2.0.0
Commit: 913c086a4626b0925e1f7d7a104b46b5579d9b29
Parents: 3f9ae9e
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Fri Mar 6 20:01:23 2015 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Fri Mar 6 20:01:23 2015 -0800

----------------------------------------------------------------------
 .../timeline/AbstractTimelineMetricsSink.java   | 26 ++++++--------------
 .../cache/HandleConnectExceptionTest.java       |  4 ---
 .../sink/flume/FlumeTimelineMetricsSink.java    | 13 ----------
 .../timeline/HadoopTimelineMetricsSink.java     |  7 ------
 .../kafka/KafkaTimelineMetricsReporter.java     | 15 -----------
 .../storm/StormTimelineMetricsReporter.java     | 21 +++++-----------
 .../sink/storm/StormTimelineMetricsSink.java    | 17 -------------
 .../storm/StormTimelineMetricsSinkTest.java     |  1 -
 8 files changed, 14 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 4f5c6a1..fd4cacd 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -19,10 +19,6 @@ package org.apache.hadoop.metrics2.sink.timeline;
 
 import java.io.IOException;
 import java.net.ConnectException;
-import java.net.SocketAddress;
-
-import java.io.IOException;
-import java.net.SocketAddress;
 
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.PostMethod;
@@ -63,19 +59,15 @@ public abstract class AbstractTimelineMetricsSink {
     try {
       String jsonData = mapper.writeValueAsString(metrics);
 
-      SocketAddress socketAddress = getServerSocketAddress();
+      StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
 
-      if (socketAddress != null) {
-        StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
-        
-        PostMethod postMethod = new PostMethod(connectUrl);
-        postMethod.setRequestEntity(requestEntity);
-        int statusCode = httpClient.executeMethod(postMethod);
-        if (statusCode != 200) {
-          LOG.info("Unable to POST metrics to collector, " + connectUrl);
-        } else {
-          LOG.debug("Metrics posted to Collector " + connectUrl);
-        }
+      PostMethod postMethod = new PostMethod(connectUrl);
+      postMethod.setRequestEntity(requestEntity);
+      int statusCode = httpClient.executeMethod(postMethod);
+      if (statusCode != 200) {
+        LOG.info("Unable to POST metrics to collector, " + connectUrl);
+      } else {
+        LOG.debug("Metrics posted to Collector " + connectUrl);
       }
     } catch (ConnectException e) {
       throw new UnableToConnectException(e).setConnectUrl(connectUrl);
@@ -86,7 +78,5 @@ public abstract class AbstractTimelineMetricsSink {
     this.httpClient = httpClient;
   }
 
-  abstract protected SocketAddress getServerSocketAddress();
-
   abstract protected String getCollectorUri();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index 450906a..2786e3c 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -65,10 +65,6 @@ public class HandleConnectExceptionTest {
   }
   class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{
     @Override
-    protected SocketAddress getServerSocketAddress() {
-      return new InetSocketAddress("host", 13);
-    }
-    @Override
     protected String getCollectorUri() {
       return COLLECTOR_URL;
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 9e66c99..a6137af 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -30,11 +30,9 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-import org.apache.hadoop.metrics2.sink.util.Servers;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -50,7 +48,6 @@ import java.util.concurrent.TimeUnit;
 
 
 public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService {
-  private SocketAddress socketAddress;
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
   private ScheduledExecutorService scheduledExecutorService;
@@ -94,11 +91,6 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     String collectorHostname = configuration.getProperty(COLLECTOR_HOST_PROPERTY);
     String port = configuration.getProperty(COLLECTOR_PORT_PROPERTY);
     collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
-    List<InetSocketAddress> socketAddresses =
-      Servers.parse(collectorHostname, Integer.valueOf(port));
-    if (socketAddresses != null && !socketAddresses.isEmpty()) {
-      socketAddress = socketAddresses.get(0);
-    }
     pollFrequency = Long.parseLong(configuration.getProperty("collectionFrequency"));
 
     String[] metrics = configuration.getProperty(COUNTER_METRICS_PROPERTY).trim().split(",");
@@ -106,11 +98,6 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   }
 
   @Override
-  public SocketAddress getServerSocketAddress() {
-    return socketAddress;
-  }
-
-  @Override
   public String getCollectorUri() {
     return collectorUri;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 06f6011..9ecb0ed 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -131,13 +131,6 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     return conf.getPrefix();
   }
 
-  protected SocketAddress getServerSocketAddress() {
-    if (metricsServers != null && !metricsServers.isEmpty()) {
-      return metricsServers.get(0);
-    }
-    return null;
-  }
-
   @Override
   protected String getCollectorUri() {
     return collectorUri;

http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index 1f44494..cc365bd 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.metrics2.sink.kafka;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
@@ -40,7 +38,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.sink.util.Servers;
 
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Counter;
@@ -73,16 +70,10 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
   private final Object lock = new Object();
   private String collectorUri;
   private String hostname;
-  private SocketAddress socketAddress;
   private TimelineScheduledReporter reporter;
   private TimelineMetricsCache metricsCache;
 
   @Override
-  protected SocketAddress getServerSocketAddress() {
-    return socketAddress;
-  }
-
-  @Override
   protected String getCollectorUri() {
     return collectorUri;
   }
@@ -110,18 +101,12 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
         String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
         collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics";
-        List<InetSocketAddress> socketAddresses = Servers.parse(metricCollectorHost,
-            Integer.parseInt(metricCollectorPort));
-        if (socketAddresses != null && !socketAddresses.isEmpty()) {
-          socketAddress = socketAddresses.get(0);
-        }
         initializeReporter();
         if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
           startReporter(metricsConfig.pollingIntervalSecs());
         }
         if (LOG.isTraceEnabled()) {
           LOG.trace("CollectorUri = " + collectorUri);
-          LOG.trace("SocketAddress = " + socketAddress);
           LOG.trace("MetricsSendInterval = " + metricsSendInterval);
           LOG.trace("MaxRowCacheSize = " + maxRowCacheSize);
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 2d4baa3..89fc2ca 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -29,11 +29,9 @@ import org.apache.commons.lang.Validate;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.util.Servers;
+import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
@@ -48,7 +46,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   public static final String APP_ID = "appId";
 
   private String hostname;
-  private SocketAddress socketAddress;
   private String collectorUri;
   private NimbusClient nimbusClient;
   private String applicationId;
@@ -58,11 +55,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   }
 
   @Override
-  protected SocketAddress getServerSocketAddress() {
-    return this.socketAddress;
-  }
-
-  @Override
   protected String getCollectorUri() {
     return this.collectorUri;
   }
@@ -85,11 +77,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
       String port = cf.get(COLLECTOR_PORT).toString();
       applicationId = cf.get(APP_ID).toString();
       collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
-      List<InetSocketAddress> socketAddresses =
-        Servers.parse(collectorHostname, Integer.valueOf(port));
-      if (socketAddresses != null && !socketAddresses.isEmpty()) {
-        socketAddress = socketAddresses.get(0);
-      }
     } catch (Exception e) {
       LOG.warn("Could not initialize metrics collector, please specify host, " +
         "port under $STORM_HOME/conf/config.yaml ", e);
@@ -139,7 +126,11 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
     TimelineMetrics timelineMetrics = new TimelineMetrics();
     timelineMetrics.setMetrics(totalMetrics);
 
-    emitMetrics(timelineMetrics);
+    try {
+      emitMetrics(timelineMetrics);
+    } catch (UnableToConnectException e) {
+      LOG.warn("Unable to connect to Metrics Collector " + e.getConnectUrl() + ". " + e.getMessage());
+    }
 
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index dd0e72f..767695b 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -30,12 +30,9 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-import org.apache.hadoop.metrics2.sink.util.Servers;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,17 +40,11 @@ import java.util.List;
 import java.util.Map;
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
-  private SocketAddress socketAddress;
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
   private String hostname;
 
   @Override
-  protected SocketAddress getServerSocketAddress() {
-    return socketAddress;
-  }
-
-  @Override
   protected String getCollectorUri() {
     return collectorUri;
   }
@@ -74,11 +65,6 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
         String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
     collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + ":" + configuration.getProperty(COLLECTOR_PORT_PROPERTY) + "/ws/v1/timeline/metrics";
-    List<InetSocketAddress> socketAddresses =
-        Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), Integer.valueOf(configuration.getProperty(COLLECTOR_PORT_PROPERTY)));
-    if (socketAddresses != null && !socketAddresses.isEmpty()) {
-      socketAddress = socketAddresses.get(0);
-    }
   }
 
   @Override
@@ -134,7 +120,4 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     this.metricsCache = metricsCache;
   }
 
-  public void setServerSocketAddress(SocketAddress socketAddress) {
-    this.socketAddress = socketAddress;
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 15021e5..a0600e5 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -64,7 +64,6 @@ public class StormTimelineMetricsSinkTest {
     HttpClient httpClient = createNiceMock(HttpClient.class);
     stormTimelineMetricsSink.setHttpClient(httpClient);
     expect(httpClient.executeMethod(anyObject(PostMethod.class))).andReturn(200).once();
-    stormTimelineMetricsSink.setServerSocketAddress(createNiceMock(SocketAddress.class));
     replay(timelineMetricsCache, httpClient);
     stormTimelineMetricsSink.handleDataPoints(
         new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),