You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Haralds Ulmanis (JIRA)" <ji...@apache.org> on 2014/06/17 11:29:01 UTC
[jira] [Comment Edited] (STORM-353) Add configuration per
kafka-bolt
[ https://issues.apache.org/jira/browse/STORM-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14033597#comment-14033597 ]
Haralds Ulmanis edited comment on STORM-353 at 6/17/14 9:28 AM:
----------------------------------------------------------------
{noformat}
Ok, what about this:
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index b9ea948..a36e846 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -55,14 +55,36 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
private OutputCollector collector;
private String topic;
+ private Map boltConfig;
+
+ public KafkaBolt(Map boltConfig) {
+ this.boltConfig = boltConfig;
+ }
+
+ public KafkaBolt() {
+ }
+
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+ Map configMap;
+ if (boltConfig == null) {
+ configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+ this.topic = (String) stormConf.get(TOPIC);
+ } else {
+ if (boltConfig.containsKey(KAFKA_BROKER_PROPERTIES)) {
+ configMap = (Map) boltConfig.get(KAFKA_BROKER_PROPERTIES);
+ } else {
+ configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+ }
+ if (boltConfig.containsKey(TOPIC))
+ this.topic = (String) boltConfig.get(TOPIC);
+ else
+ this.topic = (String) stormConf.get(TOPIC);
+ }
Properties properties = new Properties();
properties.putAll(configMap);
ProducerConfig config = new ProducerConfig(properties);
producer = new Producer<K, V>(config);
- this.topic = (String) stormConf.get(TOPIC);
this.collector = collector;
}
:) you can set kafka config or topic or both.
Also if you could turn this into a github pull request to apache/incubator-storm --> this is unknown for me.
{noformat}
was (Author: evilezh):
Ok, what about this:
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index b9ea948..a36e846 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -55,14 +55,36 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
private OutputCollector collector;
private String topic;
+ private Map boltConfig;
+
+ public KafkaBolt(Map boltConfig) {
+ this.boltConfig = boltConfig;
+ }
+
+ public KafkaBolt() {
+ }
+
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+ Map configMap;
+ if (boltConfig == null) {
+ configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+ this.topic = (String) stormConf.get(TOPIC);
+ } else {
+ if (boltConfig.containsKey(KAFKA_BROKER_PROPERTIES)) {
+ configMap = (Map) boltConfig.get(KAFKA_BROKER_PROPERTIES);
+ } else {
+ configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+ }
+ if (boltConfig.containsKey(TOPIC))
+ this.topic = (String) boltConfig.get(TOPIC);
+ else
+ this.topic = (String) stormConf.get(TOPIC);
+ }
Properties properties = new Properties();
properties.putAll(configMap);
ProducerConfig config = new ProducerConfig(properties);
producer = new Producer<K, V>(config);
- this.topic = (String) stormConf.get(TOPIC);
this.collector = collector;
}
:) you can set kafka config or topic or both.
Also if you could turn this into a github pull request to apache/incubator-storm --> this is unknown for me.
> Add configuration per kafka-bolt
> ---------------------------------
>
> Key: STORM-353
> URL: https://issues.apache.org/jira/browse/STORM-353
> Project: Apache Storm (Incubating)
> Issue Type: Improvement
> Reporter: Haralds Ulmanis
> Priority: Trivial
>
> Currently kafka bolt configuration is passed through storm configuration - which limits possibility to use more than one storm bolt/topic per topology.
> I can suggest something like this:
> diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
> index b9ea948..2a78f84 100644
> --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
> +++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
> @@ -55,14 +55,20 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
> private OutputCollector collector;
> private String topic;
>
> + private Map boltConfig;
> +
> + public KafkaBolt(Map boltConfig) {
> + this.boltConfig = boltConfig;
> + }
> +
> @Override
> public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
> - Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
> + Map configMap = (Map) boltConfig.get(KAFKA_BROKER_PROPERTIES);
> Properties properties = new Properties();
> properties.putAll(configMap);
> ProducerConfig config = new ProducerConfig(properties);
> producer = new Producer<K, V>(config);
> - this.topic = (String) stormConf.get(TOPIC);
> + this.topic = (String) boltConfig.get(TOPIC);
> this.collector = collector;
> }
>
> After which you can initialize each bolt with own topic and even own zk servers.
> Use case:
> I'm using several streams for output and want each of stream to be published in own topic.
--
This message was sent by Atlassian JIRA
(v6.2#6252)