You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:19 UTC

[20/50] [abbrv] git commit: Make the spout's Storm Metrics' time bucket size configurable via TridentKafkaConfig. Leave the default time bucket size at 60 seconds.

Make the spout's Storm Metrics' time bucket size configurable via
TridentKafkaConfig. Leave the default time bucket size at 60 seconds.

Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b6b1f1d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b6b1f1d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b6b1f1d8

Branch: refs/heads/master
Commit: b6b1f1d8fb5a56cf14c44ff46aa513e0fa4e99ab
Parents: 312408a
Author: Danijel Schiavuzzi <da...@infobip.com>
Authored: Tue Feb 11 15:28:59 2014 +0100
Committer: Danijel Schiavuzzi <da...@infobip.com>
Committed: Tue Feb 11 15:28:59 2014 +0100

----------------------------------------------------------------------
 pom.xml                                              | 4 ++--
 src/jvm/storm/kafka/KafkaConfig.java                 | 1 +
 src/jvm/storm/kafka/trident/TridentKafkaEmitter.java | 6 +++---
 3 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b6b1f1d8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 910041a..8dbd9a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
     <groupId>net.wurstmeister.storm</groupId>
     <artifactId>storm-kafka-0.8-plus</artifactId>
     <packaging>jar</packaging>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-configurable-metrics-emit-interval-SNAPSHOT</version>
     <name>storm-kafka-0.8-plus</name>
     <description>Storm module for kafka &gt; 0.8</description>
     <licenses>
@@ -170,7 +170,7 @@
         <dependency>
             <groupId>storm</groupId>
             <artifactId>storm</artifactId>
-            <version>0.9.0</version>
+            <version>0.9.0.1</version>
             <scope>provided</scope>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b6b1f1d8/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaConfig.java b/src/jvm/storm/kafka/KafkaConfig.java
index dddcead..8ef2a88 100644
--- a/src/jvm/storm/kafka/KafkaConfig.java
+++ b/src/jvm/storm/kafka/KafkaConfig.java
@@ -18,6 +18,7 @@ public class KafkaConfig implements Serializable {
     public boolean forceFromStart = false;
     public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
     public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
+    public int metricsTimeBucketSizeInSecs = 60;
 
     public KafkaConfig(BrokerHosts hosts, String topic) {
         this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b6b1f1d8/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index fbbbd4b..66785f0 100644
--- a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -49,9 +49,9 @@ public class TridentKafkaEmitter {
         _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));
         _topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
         _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_config.topic, _connections);
-        context.registerMetric("kafkaOffset", _kafkaOffsetMetric, 60);
-        _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), 60);
-        _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), 60);
+        context.registerMetric("kafkaOffset", _kafkaOffsetMetric, _config.metricsTimeBucketSizeInSecs);
+        _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), _config.metricsTimeBucketSizeInSecs);
+        _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), _config.metricsTimeBucketSizeInSecs);
     }