You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 00:13:11 UTC
[1/2] storm git commit: [STORM-2442]modify the Usage Examples in
storm-kafka-client's README.md
Repository: storm
Updated Branches:
refs/heads/1.0.x-branch 93ca92cf4 -> bb138f222
[STORM-2442]modify the Usage Examples in storm-kafka-client's README.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/33c57e86
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/33c57e86
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/33c57e86
Branch: refs/heads/1.0.x-branch
Commit: 33c57e86e597cb3834ad860f7c9020ac39f26637
Parents: 4f4c7b4
Author: liuzhaokun <li...@zte.com.cn>
Authored: Fri Mar 31 15:19:03 2017 +0800
Committer: \u5218\u5146\u576410206665 <li...@zte.com.cn>
Committed: Fri Mar 31 15:19:03 2017 +0800
----------------------------------------------------------------------
external/storm-kafka-client/README.md | 34 ++++++++++++++----------------
1 file changed, 16 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/33c57e86/external/storm-kafka-client/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/README.md b/external/storm-kafka-client/README.md
index 5093875..fba5766 100644
--- a/external/storm-kafka-client/README.md
+++ b/external/storm-kafka-client/README.md
@@ -17,33 +17,31 @@ Multiple topics can use the same `KafkaSpoutTupleBuilder` implementation, as lon
The code snippet bellow is extracted from the example in the module [test] (https://github.com/apache/storm/tree/master/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test). Please refer to this module for more detail
```java
-KafkaSpout<String,String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
-
-KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
- .setOffsetCommitPeriodMs(10_000)
- .setFirstPollOffsetStrategy(EARLIEST)
- .setMaxUncommittedOffsets(250)
- .build();
-
-
-KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})
+ KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})
.addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents of topic test2 sent to test_stream
.addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents of topic test2 sent to test2_stream
.build();
-Map<String, Object> kafkaConsumerProps = new HashMap<>();
- props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
- props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
- props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+ Map<String, Object> kafkaConsumerProps = new HashMap<>();
+ kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
+ kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
+ kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
-KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new KafkaSpoutTuplesBuilder.Builder<>(
+ KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new KafkaSpoutTuplesBuilder.Builder<>(
new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]),
new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
.build();
-KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
- TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
+ KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(new KafkaSpoutRetryExponentialBackoff.TimeInterval(500, TimeUnit.MICROSECONDS),
+ KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
+
+ KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
+ .setOffsetCommitPeriodMs(10_000)
+ .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+ .setMaxUncommittedOffsets(250)
+ .build();
+ KafkaSpout<String,String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
```
### Create a simple Toplogy using the Kafka Spout:
[2/2] storm git commit: Merge branch 'kafka-client' of
https://github.com/liu-zhaokun/storm into STORM-2442-1.0.x-merge
Posted by ka...@apache.org.
Merge branch 'kafka-client' of https://github.com/liu-zhaokun/storm into STORM-2442-1.0.x-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bb138f22
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bb138f22
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bb138f22
Branch: refs/heads/1.0.x-branch
Commit: bb138f22220b625c793e7043cab4d0bb8c5e510d
Parents: 93ca92c 33c57e8
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 5 09:12:55 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 5 09:12:55 2017 +0900
----------------------------------------------------------------------
external/storm-kafka-client/README.md | 34 ++++++++++++++----------------
1 file changed, 16 insertions(+), 18 deletions(-)
----------------------------------------------------------------------