You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/11/23 12:06:10 UTC

[jira] [Commented] (STORM-826) As a storm developer I’d like to use the new kafka producer API to reduce dependencies and use long term supported kafka apis

    [ https://issues.apache.org/jira/browse/STORM-826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15021957#comment-15021957 ] 

ASF GitHub Bot commented on STORM-826:
--------------------------------------

Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/572#discussion_r45591441
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java ---
    @@ -102,12 +114,40 @@ public void execute(Tuple input) {
                 key = mapper.getKeyFromTuple(input);
                 message = mapper.getMessageFromTuple(input);
                 topic = topicSelector.getTopic(input);
    -            if(topic != null ) {
    -                producer.send(new KeyedMessage<K, V>(topic, key, message));
    +            if (topic != null ) {
    +                Callback callback = null;
    +
    +                if (!fireAndForget && async) {
    +                    callback = new Callback() {
    +                        @Override
    +                        public void onCompletion(RecordMetadata ignored, Exception e) {
    +                            synchronized (collector) {
    +                                if (e != null) {
    +                                    collector.reportError(e);
    +                                    collector.fail(input);
    +                                } else {
    +                                    collector.ack(input);
    +                                }
    +                            }
    +                        }
    +                    };
    +                }
    +                Future<RecordMetadata> result = producer.send(new ProducerRecord<K, V>(topic, key, message), callback);
    +                if (!async) {
    +                    try {
    +                        result.get();
    +                        collector.ack(input);
    --- End diff --
    
    does this call take place in the same executor thread? 


> As a storm developer I’d like to use the new kafka producer API to reduce dependencies and use long term supported kafka apis 
> ------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: STORM-826
>                 URL: https://issues.apache.org/jira/browse/STORM-826
>             Project: Apache Storm
>          Issue Type: Story
>          Components: storm-kafka
>            Reporter: Thomas Becker
>            Assignee: Zhuo Liu
>             Fix For: 0.11.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)