You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/02 01:04:06 UTC
[2/4] storm git commit: use smart API instead of constructor in
KafkaBolt
use smart API instead of constructor in KafkaBolt
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c5d77ace
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c5d77ace
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c5d77ace
Branch: refs/heads/master
Commit: c5d77ace8fa0cb2a11a0672941f2e9fa655aef4c
Parents: a062a40
Author: renkai <ga...@gmail.com>
Authored: Thu Aug 27 08:59:14 2015 +0800
Committer: renkai <ga...@gmail.com>
Committed: Thu Aug 27 08:59:14 2015 +0800
----------------------------------------------------------------------
.../storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 12 +++++++-----
.../src/test/storm/kafka/bolt/KafkaBoltTest.java | 2 +-
2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c5d77ace/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 9a020a0..2cca826 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -73,10 +73,6 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
private boolean fireAndForget = false;
private boolean async = true;
- public KafkaBolt(Properties boltSpecfiedProperties) {
- this.boltSpecfiedProperties = boltSpecfiedProperties;
- }
-
public KafkaBolt() {
}
@@ -90,6 +86,11 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
return this;
}
+ public KafkaBolt<K,V> withProducerProperties(Properties producerProperties) {
+ this.boltSpecfiedProperties = producerProperties;
+ return this;
+ }
+
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//for backward compatibility.
@@ -106,8 +107,9 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
Properties properties = new Properties();
if(configMap!= null)
properties.putAll(configMap);
+ if(boltSpecfiedProperties != null)
+ properties.putAll(boltSpecfiedProperties);
- properties.putAll(boltSpecfiedProperties);
producer = new KafkaProducer<K, V>(properties);
this.collector = collector;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c5d77ace/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 673f8ff..53d7c50 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -223,7 +223,7 @@ public class KafkaBoltTest {
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("metadata.fetch.timeout.ms", 1000);
props.put("linger.ms", 0);
- KafkaBolt bolt = new KafkaBolt(props);
+ KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
bolt.prepare(config, null, new OutputCollector(collector));
bolt.setAsync(async);
bolt.setFireAndForget(fireAndForget);