You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/02 01:04:05 UTC

[1/4] storm git commit: each KafkaBolt have its own properties

Repository: storm
Updated Branches:
  refs/heads/master fec4b53fe -> 154e9ec55


each KafkaBolt have its own properties


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a062a408
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a062a408
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a062a408

Branch: refs/heads/master
Commit: a062a40845a808566c8d727a654654a47e8d3eb3
Parents: 528958c
Author: renkai <ga...@gmail.com>
Authored: Wed Aug 26 17:55:16 2015 +0800
Committer: renkai <ga...@gmail.com>
Committed: Wed Aug 26 17:55:16 2015 +0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     | 15 +++++++--
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 32 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a062a408/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 738b358..9a020a0 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -63,7 +63,8 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     private OutputCollector collector;
     private TupleToKafkaMapper<K,V> mapper;
     private KafkaTopicSelector topicSelector;
-    /** 
+    private Properties boltSpecfiedProperties = new Properties();
+    /**
      * With default setting for fireAndForget and async, the callback is called when the sending succeeds.
      * By setting fireAndForget true, the send will not wait at all for kafka to ack.
      * "acks" setting in 0.8.2 Producer API config doesn't matter if fireAndForget is set.
@@ -72,6 +73,13 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     private boolean fireAndForget = false;
     private boolean async = true;
 
+    public KafkaBolt(Properties boltSpecfiedProperties) {
+        this.boltSpecfiedProperties = boltSpecfiedProperties;
+    }
+
+    public KafkaBolt() {
+    }
+
     public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> mapper) {
         this.mapper = mapper;
         return this;
@@ -96,7 +104,10 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
 
         Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
         Properties properties = new Properties();
-        properties.putAll(configMap);
+        if(configMap!= null)
+            properties.putAll(configMap);
+
+        properties.putAll(boltSpecfiedProperties);
         producer = new KafkaProducer<K, V>(properties);
         this.collector = collector;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a062a408/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 05d138b..673f8ff 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -166,6 +166,22 @@ public class KafkaBoltTest {
         verifyMessage(keyString, messageString);
     }
 
+    /* test bolt specified properties */
+    @Test
+    public void executeWithBoltSpecifiedProperties() {
+        boolean async = false;
+        boolean fireAndForget = false;
+        bolt = defaultSerializerBoltWithSpecifiedProperties(async, fireAndForget);
+        String keyString = "test-key";
+        String messageString = "test-message";
+        byte[] key = keyString.getBytes();
+        byte[] message = messageString.getBytes();
+        Tuple tuple = generateTestTuple(key, message);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(keyString, messageString);
+    }
+
     private KafkaBolt generateStringSerializerBolt() {
         KafkaBolt bolt = new KafkaBolt();
         Properties props = new Properties();
@@ -198,6 +214,22 @@ public class KafkaBoltTest {
         return bolt;
     }
 
+    private KafkaBolt defaultSerializerBoltWithSpecifiedProperties(boolean async, boolean fireAndForget) {
+        Properties props = new Properties();
+        props.put("request.required.acks", "1");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("bootstrap.servers", broker.getBrokerConnectionString());
+        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put("metadata.fetch.timeout.ms", 1000);
+        props.put("linger.ms", 0);
+        KafkaBolt bolt = new KafkaBolt(props);
+        bolt.prepare(config, null, new OutputCollector(collector));
+        bolt.setAsync(async);
+        bolt.setFireAndForget(fireAndForget);
+        return bolt;
+    }
+
     @Test
     public void executeWithoutKey() throws Exception {
         String message = "value-234";


[3/4] storm git commit: Merge branch 'dev' of https://github.com/Renkai/storm into STORM-1010

Posted by ka...@apache.org.
Merge branch 'dev' of https://github.com/Renkai/storm into STORM-1010


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1b2bba95
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1b2bba95
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1b2bba95

Branch: refs/heads/master
Commit: 1b2bba952a5285738d4793122a9a5b4a7e3ad94e
Parents: fec4b53 c5d77ac
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Sep 2 07:45:23 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 2 07:45:23 2015 +0900

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     | 17 +++++++++--
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 32 ++++++++++++++++++++
 2 files changed, 47 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[2/4] storm git commit: use smart API instead of constructor in KafkaBolt

Posted by ka...@apache.org.
use smart API instead of constructor in KafkaBolt


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c5d77ace
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c5d77ace
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c5d77ace

Branch: refs/heads/master
Commit: c5d77ace8fa0cb2a11a0672941f2e9fa655aef4c
Parents: a062a40
Author: renkai <ga...@gmail.com>
Authored: Thu Aug 27 08:59:14 2015 +0800
Committer: renkai <ga...@gmail.com>
Committed: Thu Aug 27 08:59:14 2015 +0800

----------------------------------------------------------------------
 .../storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 12 +++++++-----
 .../src/test/storm/kafka/bolt/KafkaBoltTest.java        |  2 +-
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c5d77ace/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 9a020a0..2cca826 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -73,10 +73,6 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     private boolean fireAndForget = false;
     private boolean async = true;
 
-    public KafkaBolt(Properties boltSpecfiedProperties) {
-        this.boltSpecfiedProperties = boltSpecfiedProperties;
-    }
-
     public KafkaBolt() {
     }
 
@@ -90,6 +86,11 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
         return this;
     }
 
+    public KafkaBolt<K,V> withProducerProperties(Properties producerProperties) {
+        this.boltSpecfiedProperties = producerProperties;
+        return this;
+    }
+
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         //for backward compatibility.
@@ -106,8 +107,9 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
         Properties properties = new Properties();
         if(configMap!= null)
             properties.putAll(configMap);
+        if(boltSpecfiedProperties != null)
+            properties.putAll(boltSpecfiedProperties);
 
-        properties.putAll(boltSpecfiedProperties);
         producer = new KafkaProducer<K, V>(properties);
         this.collector = collector;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c5d77ace/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 673f8ff..53d7c50 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -223,7 +223,7 @@ public class KafkaBoltTest {
         props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
         props.put("metadata.fetch.timeout.ms", 1000);
         props.put("linger.ms", 0);
-        KafkaBolt bolt = new KafkaBolt(props);
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
         bolt.prepare(config, null, new OutputCollector(collector));
         bolt.setAsync(async);
         bolt.setFireAndForget(fireAndForget);


[4/4] storm git commit: add STORM-1010 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-1010 to CHANGELOG.md

* also add Renkai Ge to contributor list


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/154e9ec5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/154e9ec5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/154e9ec5

Branch: refs/heads/master
Commit: 154e9ec55deb4eea8fca8554e4d3b224bf337834
Parents: 1b2bba9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Sep 2 08:03:42 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 2 08:03:42 2015 +0900

----------------------------------------------------------------------
 CHANGELOG.md    | 1 +
 README.markdown | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/154e9ec5/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1698145..39cd49a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1010: Each KafkaBolt could have a specified properties.
  * STORM-1008: Isolate the code for metric collection and retrieval from DisruptorQueue
  * STORM-991: General cleanup of the generics (storm.trident.spout package)
  * STORM-1000: Use static member classes when permitted 

http://git-wip-us.apache.org/repos/asf/storm/blob/154e9ec5/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index de02346..591a6c7 100644
--- a/README.markdown
+++ b/README.markdown
@@ -223,6 +223,7 @@ under the License.
 * Drew Robb ([@drewrobb](https://github.com/drewrobb))
 * Frantz Mazoyer ([@fmazoyer](https://github.com/fmazoyer))
 * Dean de Bree ([@ddebree](https://github.com/ddebree))
+* Renkai Ge ([@Renkai](https://github.com/Renkai))
 
 ## Acknowledgements