You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by msaunier-poctu <gi...@git.apache.org> on 2015/04/07 15:16:27 UTC

[GitHub] storm pull request: Storm-697: Support for Emitting Kafka Message ...

Github user msaunier-poctu commented on the pull request:

    https://github.com/apache/storm/pull/454#issuecomment-90546074
  
    :+1:
    
    TridentKafkaEmitter should be updated to support emitting offset in Trident :
    
    
    ```diff
    diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
    index 94bf134..dc9bb6d 100644
    --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
    +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
    @@ -113,7 +113,7 @@ public class TridentKafkaEmitter {
             ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
             long endoffset = offset;
             for (MessageAndOffset msg : msgs) {
    -            emit(collector, msg.message());
    +            emit(collector, msg.message(),partition,msg.offset());
                 endoffset = msg.nextOffset();
             }
             Map newMeta = new HashMap();
    @@ -160,14 +160,19 @@ public class TridentKafkaEmitter {
                     if (offset > nextOffset) {
                         throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
                     }
    -                emit(collector, msg.message());
    +                emit(collector, msg.message(),partition,msg.offset());
                     offset = msg.nextOffset();
                 }
             }
         }
     
    -    private void emit(TridentCollector collector, Message msg) {
    -        Iterable<List<Object>> values = KafkaUtils.generateTuples(_config, msg);
    +    private void emit(TridentCollector collector, Message msg, Partition partition, long offset) {
    +        Iterable<List<Object>> values;
    +        if(_config.tupleMetaData) {
    +            values = KafkaUtils.generateTuples(_config, msg, partition, offset);
    +        }else{
    +            values = KafkaUtils.generateTuples(_config, msg);
    +        }
             if (values != null) {
                 for (List<Object> value : values) {
                     collector.emit(value);
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---