You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/02 01:04:05 UTC
[1/4] storm git commit: each KafkaBolt have its own properties
Repository: storm
Updated Branches:
refs/heads/master fec4b53fe -> 154e9ec55
each KafkaBolt have its own properties
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a062a408
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a062a408
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a062a408
Branch: refs/heads/master
Commit: a062a40845a808566c8d727a654654a47e8d3eb3
Parents: 528958c
Author: renkai <ga...@gmail.com>
Authored: Wed Aug 26 17:55:16 2015 +0800
Committer: renkai <ga...@gmail.com>
Committed: Wed Aug 26 17:55:16 2015 +0800
----------------------------------------------------------------------
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 15 +++++++--
.../test/storm/kafka/bolt/KafkaBoltTest.java | 32 ++++++++++++++++++++
2 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a062a408/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 738b358..9a020a0 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -63,7 +63,8 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
private OutputCollector collector;
private TupleToKafkaMapper<K,V> mapper;
private KafkaTopicSelector topicSelector;
- /**
+ private Properties boltSpecfiedProperties = new Properties();
+ /**
* With default setting for fireAndForget and async, the callback is called when the sending succeeds.
* By setting fireAndForget true, the send will not wait at all for kafka to ack.
* "acks" setting in 0.8.2 Producer API config doesn't matter if fireAndForget is set.
@@ -72,6 +73,13 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
private boolean fireAndForget = false;
private boolean async = true;
+ public KafkaBolt(Properties boltSpecfiedProperties) {
+ this.boltSpecfiedProperties = boltSpecfiedProperties;
+ }
+
+ public KafkaBolt() {
+ }
+
public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> mapper) {
this.mapper = mapper;
return this;
@@ -96,7 +104,10 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
Properties properties = new Properties();
- properties.putAll(configMap);
+ if(configMap!= null)
+ properties.putAll(configMap);
+
+ properties.putAll(boltSpecfiedProperties);
producer = new KafkaProducer<K, V>(properties);
this.collector = collector;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a062a408/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 05d138b..673f8ff 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -166,6 +166,22 @@ public class KafkaBoltTest {
verifyMessage(keyString, messageString);
}
+ /* test bolt specified properties */
+ @Test
+ public void executeWithBoltSpecifiedProperties() {
+ boolean async = false;
+ boolean fireAndForget = false;
+ bolt = defaultSerializerBoltWithSpecifiedProperties(async, fireAndForget);
+ String keyString = "test-key";
+ String messageString = "test-message";
+ byte[] key = keyString.getBytes();
+ byte[] message = messageString.getBytes();
+ Tuple tuple = generateTestTuple(key, message);
+ bolt.execute(tuple);
+ verify(collector).ack(tuple);
+ verifyMessage(keyString, messageString);
+ }
+
private KafkaBolt generateStringSerializerBolt() {
KafkaBolt bolt = new KafkaBolt();
Properties props = new Properties();
@@ -198,6 +214,22 @@ public class KafkaBoltTest {
return bolt;
}
+ private KafkaBolt defaultSerializerBoltWithSpecifiedProperties(boolean async, boolean fireAndForget) {
+ Properties props = new Properties();
+ props.put("request.required.acks", "1");
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("bootstrap.servers", broker.getBrokerConnectionString());
+ props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ props.put("metadata.fetch.timeout.ms", 1000);
+ props.put("linger.ms", 0);
+ KafkaBolt bolt = new KafkaBolt(props);
+ bolt.prepare(config, null, new OutputCollector(collector));
+ bolt.setAsync(async);
+ bolt.setFireAndForget(fireAndForget);
+ return bolt;
+ }
+
@Test
public void executeWithoutKey() throws Exception {
String message = "value-234";
[3/4] storm git commit: Merge branch 'dev' of
https://github.com/Renkai/storm into STORM-1010
Posted by ka...@apache.org.
Merge branch 'dev' of https://github.com/Renkai/storm into STORM-1010
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1b2bba95
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1b2bba95
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1b2bba95
Branch: refs/heads/master
Commit: 1b2bba952a5285738d4793122a9a5b4a7e3ad94e
Parents: fec4b53 c5d77ac
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Sep 2 07:45:23 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 2 07:45:23 2015 +0900
----------------------------------------------------------------------
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 17 +++++++++--
.../test/storm/kafka/bolt/KafkaBoltTest.java | 32 ++++++++++++++++++++
2 files changed, 47 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[2/4] storm git commit: use smart API instead of constructor in
KafkaBolt
Posted by ka...@apache.org.
use smart API instead of constructor in KafkaBolt
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c5d77ace
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c5d77ace
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c5d77ace
Branch: refs/heads/master
Commit: c5d77ace8fa0cb2a11a0672941f2e9fa655aef4c
Parents: a062a40
Author: renkai <ga...@gmail.com>
Authored: Thu Aug 27 08:59:14 2015 +0800
Committer: renkai <ga...@gmail.com>
Committed: Thu Aug 27 08:59:14 2015 +0800
----------------------------------------------------------------------
.../storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 12 +++++++-----
.../src/test/storm/kafka/bolt/KafkaBoltTest.java | 2 +-
2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c5d77ace/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 9a020a0..2cca826 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -73,10 +73,6 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
private boolean fireAndForget = false;
private boolean async = true;
- public KafkaBolt(Properties boltSpecfiedProperties) {
- this.boltSpecfiedProperties = boltSpecfiedProperties;
- }
-
public KafkaBolt() {
}
@@ -90,6 +86,11 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
return this;
}
+ public KafkaBolt<K,V> withProducerProperties(Properties producerProperties) {
+ this.boltSpecfiedProperties = producerProperties;
+ return this;
+ }
+
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//for backward compatibility.
@@ -106,8 +107,9 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
Properties properties = new Properties();
if(configMap!= null)
properties.putAll(configMap);
+ if(boltSpecfiedProperties != null)
+ properties.putAll(boltSpecfiedProperties);
- properties.putAll(boltSpecfiedProperties);
producer = new KafkaProducer<K, V>(properties);
this.collector = collector;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c5d77ace/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 673f8ff..53d7c50 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -223,7 +223,7 @@ public class KafkaBoltTest {
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("metadata.fetch.timeout.ms", 1000);
props.put("linger.ms", 0);
- KafkaBolt bolt = new KafkaBolt(props);
+ KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
bolt.prepare(config, null, new OutputCollector(collector));
bolt.setAsync(async);
bolt.setFireAndForget(fireAndForget);
[4/4] storm git commit: add STORM-1010 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-1010 to CHANGELOG.md
* also add Renkai Ge to contributor list
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/154e9ec5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/154e9ec5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/154e9ec5
Branch: refs/heads/master
Commit: 154e9ec55deb4eea8fca8554e4d3b224bf337834
Parents: 1b2bba9
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Sep 2 08:03:42 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Sep 2 08:03:42 2015 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
README.markdown | 1 +
2 files changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/154e9ec5/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1698145..39cd49a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1010: Each KafkaBolt could have a specified properties.
* STORM-1008: Isolate the code for metric collection and retrieval from DisruptorQueue
* STORM-991: General cleanup of the generics (storm.trident.spout package)
* STORM-1000: Use static member classes when permitted
http://git-wip-us.apache.org/repos/asf/storm/blob/154e9ec5/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index de02346..591a6c7 100644
--- a/README.markdown
+++ b/README.markdown
@@ -223,6 +223,7 @@ under the License.
* Drew Robb ([@drewrobb](https://github.com/drewrobb))
* Frantz Mazoyer ([@fmazoyer](https://github.com/fmazoyer))
* Dean de Bree ([@ddebree](https://github.com/ddebree))
+* Renkai Ge ([@Renkai](https://github.com/Renkai))
## Acknowledgements