You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/08/25 05:05:21 UTC
[03/11] storm git commit: Added a topicAsStreamId flag to SpoutConfig
Added a topicAsStreamId flag to SpoutConfig
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bfa3095e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bfa3095e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bfa3095e
Branch: refs/heads/master
Commit: bfa3095ef871e0fdd76edef5d9ab1daf098b88b8
Parents: 80c82e3
Author: rohan_agarwal <ro...@gmail.com>
Authored: Mon Aug 10 14:35:26 2015 +0530
Committer: rohan_agarwal <ro...@gmail.com>
Committed: Mon Aug 10 14:35:26 2015 +0530
----------------------------------------------------------------------
external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 6 +++++-
external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java | 3 +++
2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bfa3095e/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index ce18677..77087ec 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -138,7 +138,11 @@ public class PartitionManager {
Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
if (tups != null) {
for (List<Object> tup : tups) {
- collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
+ if(_spoutConfig.topicAsStreamId) {
+ collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset));
+ } else {
+ collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
+ }
}
break;
} else {
http://git-wip-us.apache.org/repos/asf/storm/blob/bfa3095e/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
index 61d0b35..27edd7a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
@@ -27,6 +27,9 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
public String zkRoot = null;
public String id = null;
+ // if set to true, spout will set Kafka topic as the emitted Stream ID
+ public boolean topicAsStreamId = false;
+
// setting for how often to save the current kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;