You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/02 01:04:06 UTC

[2/4] storm git commit: use smart API instead of constructor in KafkaBolt

use smart API instead of constructor in KafkaBolt


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c5d77ace
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c5d77ace
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c5d77ace

Branch: refs/heads/master
Commit: c5d77ace8fa0cb2a11a0672941f2e9fa655aef4c
Parents: a062a40
Author: renkai <ga...@gmail.com>
Authored: Thu Aug 27 08:59:14 2015 +0800
Committer: renkai <ga...@gmail.com>
Committed: Thu Aug 27 08:59:14 2015 +0800

----------------------------------------------------------------------
 .../storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 12 +++++++-----
 .../src/test/storm/kafka/bolt/KafkaBoltTest.java        |  2 +-
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c5d77ace/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 9a020a0..2cca826 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -73,10 +73,6 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     private boolean fireAndForget = false;
     private boolean async = true;
 
-    public KafkaBolt(Properties boltSpecfiedProperties) {
-        this.boltSpecfiedProperties = boltSpecfiedProperties;
-    }
-
     public KafkaBolt() {
     }
 
@@ -90,6 +86,11 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
         return this;
     }
 
+    public KafkaBolt<K,V> withProducerProperties(Properties producerProperties) {
+        this.boltSpecfiedProperties = producerProperties;
+        return this;
+    }
+
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         //for backward compatibility.
@@ -106,8 +107,9 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
         Properties properties = new Properties();
         if(configMap!= null)
             properties.putAll(configMap);
+        if(boltSpecfiedProperties != null)
+            properties.putAll(boltSpecfiedProperties);
 
-        properties.putAll(boltSpecfiedProperties);
         producer = new KafkaProducer<K, V>(properties);
         this.collector = collector;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c5d77ace/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 673f8ff..53d7c50 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -223,7 +223,7 @@ public class KafkaBoltTest {
         props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
         props.put("metadata.fetch.timeout.ms", 1000);
         props.put("linger.ms", 0);
-        KafkaBolt bolt = new KafkaBolt(props);
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
         bolt.prepare(config, null, new OutputCollector(collector));
         bolt.setAsync(async);
         bolt.setFireAndForget(fireAndForget);