You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/04 22:51:31 UTC

[3/6] storm git commit: set output stream id in kafkaSpout

set output stream id in kafkaSpout


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/87a4a8b4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/87a4a8b4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/87a4a8b4

Branch: refs/heads/master
Commit: 87a4a8b4382de5617d463b23188f18e4f7e7f03c
Parents: 5261160
Author: Zhiqiang-He <ab...@qq.com>
Authored: Tue Nov 17 17:14:54 2015 +0800
Committer: Zhiqiang-He <ab...@qq.com>
Committed: Tue Nov 17 17:14:54 2015 +0800

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java       | 3 +++
 external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 5 +++++
 external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java      | 3 +++
 3 files changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/87a4a8b4/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
index d16659f..ce34ae5 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
@@ -23,6 +23,7 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
+import com.google.common.base.Strings;
 import kafka.message.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -192,6 +193,8 @@ public class KafkaSpout extends BaseRichSpout {
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         if (_spoutConfig.topicAsStreamId) {
             declarer.declareStream(_spoutConfig.topic, _spoutConfig.scheme.getOutputFields());
+        } else if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
+            declarer.declareStream(_spoutConfig.outputStreamId, _spoutConfig.scheme.getOutputFields());
         } else {
             declarer.declare(_spoutConfig.scheme.getOutputFields());
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/87a4a8b4/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index d1b9f48..48c4892 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -24,6 +24,7 @@ import backtype.storm.metric.api.MeanReducer;
 import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.spout.SpoutOutputCollector;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 
 import kafka.javaapi.consumer.SimpleConsumer;
@@ -153,6 +154,10 @@ public class PartitionManager {
                     for (List<Object> tup : tups) {
                         collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset));
                     }
+                } else if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
+                    for (List<Object> tup : tups) {
+                        collector.emit(_spoutConfig.outputStreamId, tup, new KafkaMessageId(_partition, toEmit.offset));
+                    }
                 } else {
                     for (List<Object> tup : tups) {
                         collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));

http://git-wip-us.apache.org/repos/asf/storm/blob/87a4a8b4/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
index 27edd7a..828ffa7 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
@@ -30,6 +30,9 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
     // if set to true, spout will set Kafka topic as the emitted Stream ID
     public boolean topicAsStreamId = false;
 
+    //if topicAsStreamId = true, outputStreamId = topicName
+    public String outputStreamId;
+
     // setting for how often to save the current kafka offset to ZooKeeper
     public long stateUpdateIntervalMs = 2000;