You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/25 05:05:21 UTC

[03/11] storm git commit: Added a topicAsStreamId flag to SpoutConfig

Added a topicAsStreamId flag to SpoutConfig


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bfa3095e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bfa3095e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bfa3095e

Branch: refs/heads/master
Commit: bfa3095ef871e0fdd76edef5d9ab1daf098b88b8
Parents: 80c82e3
Author: rohan_agarwal <ro...@gmail.com>
Authored: Mon Aug 10 14:35:26 2015 +0530
Committer: rohan_agarwal <ro...@gmail.com>
Committed: Mon Aug 10 14:35:26 2015 +0530

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 6 +++++-
 external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java      | 3 +++
 2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bfa3095e/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index ce18677..77087ec 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -138,7 +138,11 @@ public class PartitionManager {
             Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
             if (tups != null) {
                 for (List<Object> tup : tups) {
-                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
+		    if(_spoutConfig.topicAsStreamId) {
+                        collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset));
+	 	    } else {
+			collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
+		    }
                 }
                 break;
             } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/bfa3095e/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
index 61d0b35..27edd7a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
@@ -27,6 +27,9 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
     public String zkRoot = null;
     public String id = null;
 
+    // if set to true, spout will set Kafka topic as the emitted Stream ID
+    public boolean topicAsStreamId = false;
+
     // setting for how often to save the current kafka offset to ZooKeeper
     public long stateUpdateIntervalMs = 2000;