You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/25 05:05:22 UTC

[04/11] storm git commit: Declared stream in declareOutputFields if topicAsStreamId flag is true

Declared stream in declareOutputFields if topicAsStreamId flag is true


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/84a282c4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/84a282c4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/84a282c4

Branch: refs/heads/master
Commit: 84a282c48ac54ca2573464e4935cd652dd1f84a3
Parents: bfa3095
Author: rohan_agarwal <ro...@gmail.com>
Authored: Mon Aug 10 17:29:47 2015 +0530
Committer: rohan_agarwal <ro...@gmail.com>
Committed: Mon Aug 10 17:29:47 2015 +0530

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java  |  6 +++++-
 .../storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 10 ++++++----
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/84a282c4/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
index f3bc3ea..3260ad1 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
@@ -182,7 +182,11 @@ public class KafkaSpout extends BaseRichSpout {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(_spoutConfig.scheme.getOutputFields());
+	if (_spoutConfig.topicAsStreamId) {
+	    declarer.declareStream(_spoutConfig.topic, _spoutConfig.scheme.getOutputFields());
+	} else {
+            declarer.declare(_spoutConfig.scheme.getOutputFields());
+	}
     }
 
     private void commit() {

http://git-wip-us.apache.org/repos/asf/storm/blob/84a282c4/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 77087ec..052d525 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -137,10 +137,12 @@ public class PartitionManager {
             }
             Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
             if (tups != null) {
-                for (List<Object> tup : tups) {
-		    if(_spoutConfig.topicAsStreamId) {
-                        collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset));
-	 	    } else {
+		if(_spoutConfig.topicAsStreamId) {
+	            for (List<Object> tup : tups) {
+			collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset));
+		    }
+		} else {
+		    for (List<Object> tup : tups) {
 			collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
 		    }
                 }