You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/25 05:05:22 UTC
[04/11] storm git commit: Declared stream in declareOutputFields if
topicAsStreamId flag is true
Declared stream in declareOutputFields if topicAsStreamId flag is true
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/84a282c4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/84a282c4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/84a282c4
Branch: refs/heads/master
Commit: 84a282c48ac54ca2573464e4935cd652dd1f84a3
Parents: bfa3095
Author: rohan_agarwal <ro...@gmail.com>
Authored: Mon Aug 10 17:29:47 2015 +0530
Committer: rohan_agarwal <ro...@gmail.com>
Committed: Mon Aug 10 17:29:47 2015 +0530
----------------------------------------------------------------------
external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java | 6 +++++-
.../storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 10 ++++++----
2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/84a282c4/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
index f3bc3ea..3260ad1 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
@@ -182,7 +182,11 @@ public class KafkaSpout extends BaseRichSpout {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(_spoutConfig.scheme.getOutputFields());
+ if (_spoutConfig.topicAsStreamId) {
+ declarer.declareStream(_spoutConfig.topic, _spoutConfig.scheme.getOutputFields());
+ } else {
+ declarer.declare(_spoutConfig.scheme.getOutputFields());
+ }
}
private void commit() {
http://git-wip-us.apache.org/repos/asf/storm/blob/84a282c4/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 77087ec..052d525 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -137,10 +137,12 @@ public class PartitionManager {
}
Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
if (tups != null) {
- for (List<Object> tup : tups) {
- if(_spoutConfig.topicAsStreamId) {
- collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset));
- } else {
+ if(_spoutConfig.topicAsStreamId) {
+ for (List<Object> tup : tups) {
+ collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset));
+ }
+ } else {
+ for (List<Object> tup : tups) {
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
}
}