You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by XuMingmin <gi...@git.apache.org> on 2017/02/03 20:06:13 UTC

[GitHub] storm pull request #1919: fix: KafkaSpout is blocked in AutoCommitMode

GitHub user XuMingmin opened a pull request:

    https://github.com/apache/storm/pull/1919

    fix: KafkaSpout is blocked in AutoCommitMode

    overwrite https://github.com/apache/storm/pull/1863 to merge changes of STORM-2225 
    
    see https://issues.apache.org/jira/browse/STORM-2340 for more details

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/XuMingmin/storm master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1919.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1919
    
----
commit 5c5d11573485fc62f87e394bf48592a0d449bc7d
Author: mingmxu <mi...@ebay.com>
Date:   2017-02-03T20:03:37Z

    fix: KafkaSpout is blocked in AutoCommitMode

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    The spout as is doesn't support true at-most-once. When auto commit is on, the consumer periodically commits offsets. When auto commit is off and ackers is 0, the spout periodically commits offsets manually. In either case the spout could crash and replay tuples that were acked but not committed. If you need real at-most-once, the spout needs to commit offsets immediately when acked, and in that case you would also need to disable auto commit. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: fix: KafkaSpout is blocked in AutoCommitMode

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    closed https://github.com/apache/storm/pull/1863.
    https://github.com/apache/storm/pull/1919 is compatible with STORM-2225. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    Please see https://github.com/apache/storm/pull/1919#discussion_r99888764 and https://storm.apache.org/releases/1.0.0/javadocs/org/apache/storm/Config.html#TOPOLOGY_ACKER_EXECUTORS. If ack=0 Storm will call `ack` immediately on emit.
    
    Yes, some tuples may be replayed if the spout crashes. What I'm saying is you can't currently configure the spout to commit after every ack. So it's not really at-most-once, so we shouldn't call it that. It's confusing.
    
    Emitting with null message id is an option, but then you have to make sure that emitted tuples leave no state behind (i.e. they aren't put in `emitted`). It also prevents you from getting best-effort replaying unless you explicitly check for ackers in `emitIfNotEmitted`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    it makes sense for me to totally rely on kafka autocommit in at-most-once semantic, and leverages manually offset management for at-least-once, not mix the two offset options.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1919


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @srdo 
    I think it's up to the Spout implementation and we don't need to consider ordering if we don't rely on ack() method.
    
    If the spout is aware of the count of ackers (0 or more) and data source provides the way to ack synchronously, nextTuple() method can have two branches (ack vs non-ack) which handle ack/emit accordingly, and ack() method can don't do anything when no acker is activated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: fix: KafkaSpout is blocked in AutoCommitMode

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @XuMingmin does this PR make the initial [PR](https://github.com/apache/storm/pull/1863) obsolete? If so, can you please close that one? If not, I suggest that we have only one PR (compatible with STORM-2225) to address this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: fix: KafkaSpout is blocked in AutoCommitMode

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @XuMingmin Could you also include a STORM JIRA number in the description of this JIRA?  That is how Apache is able to tie the pull request to a corresponding JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @srdo agree, that's why I remove the word *at-most-once* in this conversation to make it clear. 
    
    I haven't followed the latest Storm version for some time, I remember no later than 0.9.* people call it *at-most-once* when acker=0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @XuMingmin You shouldn't assume that `KafkaConsumer.close` is called when crashing. If the executor running the spout crashes (or throws an uncaught Exception), `close` is not called as far as I know. If the spout is reassigned to another worker, Storm will just `kill -9` the JVM running the spout. I'm not sure `ISpout.close` is actually ever called in a non-local cluster. See https://storm.apache.org/releases/0.9.7/javadocs/backtype/storm/spout/ISpout.html#close--
    
    If you need real at-most-once, the spout would probably need to commit offsets after every ack.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    Do we really need a `commitSync` call for every `emit`? That would be lots of calls. The consumer reads data in batch.
    The pseudo-at-most-once, either case1 or case 3, is good option, as a performance trade-off .
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    when ack=0, explicit ack()/fail() should not take affects as no AckExecutor is running.
    
    Regarding to spout crash, possiblely some tuples are replayed in a periodically commit manner, unless offset is committed after every emit, that's a trade-off for performance.
    
    Another point, may need to skip messageId in SpoutOutputCollector.emit() when auto-commit, to disable ack permanently:
    >> public List<Integer> emit(List<Object> tuple)  Emits a tuple to the default output stream with a null message id. Storm will not track this message so ack and fail will never be called for this tuple. The emitted values must be immutable.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    Autocommit=true + ack=0 means the consumer will commit periodically. Tuples cannot fail. Messages may be replayed if the spout crashes (it may not have committed them because it only happens periodically). This is not true at-most-once, since it is only at-most-once if the spout doesn't crash.
    
    Autocommit=true + ack>0. Acked tuples are handled identically to the case above. Failed tuples are replayed, but only if the spout doesn't crash. This configuration could make sense if you just want best-effort replaying, or you're running a topology with other types of spouts as well.
    
    Autocommit=false + ack=0 will cause Storm to ack tuples immediately. The spout will commit them periodically (https://github.com/XuMingmin/storm/blob/717edc8e7ea46ef3392ec1402235a764039070ff/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L241). This is identical behavior to the first case, except the commit handling has moved from the consumer to the spout.
    
    Autocommit=false + ack>0 is at-least-once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    I would prefer to keep case 1 for at-most-once, when user want to avoid manage the offset additional in `KafkaSpout`. 
    
    IMO, there's no difference for guarantee between case 3 and case 1. It's achieved either committed by `KafkaSpout` or by `KafkaConsumer`.  Back to the discussion before, I think case 1 can also guarantee after reading code of `KafkaSpout` as below:
      *it's assumed that `KafkaSpout.close()` is called in any crash, otherwise no for both  *
    1. `close()` call `shutdown`;
    2. when `consumerAutoCommitMode=false`, `commitOffsetsForAckedTuples()` is called to commit offset;
    3. then `kafkaConsumer.close()` in the `finally` block would commit the latest offset if `consumerAutoCommitMode=true`, as
     ~~~Java
    KafkaConsuer.close() 
      -> ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
        -> ConsumerCoordinator.close()
          -> ConsumerCoordinator.maybeAutoCommitOffsetsSync();
            -> commitOffsetsSync(subscriptions.allConsumed());
    ~~~



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @srdo  @revans2, any comments ?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @srdo if `kafkaConsumer.close()` is not guaranteed to call, strictly, there's no at-most-once guarantee for either case 3 or case 1 so far. And case 1 equals case 3 after this PR.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    What I'm referring is the blog doc: https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client/
    
    Referring to the doc, "at most once" can be achieved with below code snippet *with no autocommit enabled*:
    ```
    try {
      while (running) {
      ConsumerRecords<String, String> records = consumer.poll(1000);
    
      try {
        consumer.commitSync();
        for (ConsumerRecord<String, String> record : records)
          System.out.println(record.offset() + ": " + record.value());
        } catch (CommitFailedException e) {
          // application specific failure handling
        }
      }
    } finally {
      consumer.close();
    }
    ```
    
    It calls commitSync for every poll, not every emit (println in the code).
    Again I'm not familiar with new Kafka API so if we think the above code snippet has some other issues I'm completely OK to not providing strict at-most-once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1919#discussion_r100547347
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -314,15 +316,26 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
                 boolean isScheduled = retryService.isScheduled(msgId);
                 if (!isScheduled || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
                     final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
    -                if (tuple instanceof KafkaTuple) {
    -                    collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId);
    -                } else {
    -                    collector.emit(tuple, msgId);
    -                }
    -                emitted.add(msgId);
    -                numUncommittedOffsets++;
    -                if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
    -                    retryService.remove(msgId);
    +                
    +                if(consumerAutoCommitMode){
    +                	if (tuple instanceof KafkaTuple) {
    +                        collector.emit(((KafkaTuple)tuple).getStream(), tuple);
    +                    } else {
    +                        collector.emit(tuple);
    +                    }
    --- End diff --
    
    Could we put in a comment saying something like when autocommit is enabled don't bother to track anything. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1919#discussion_r99891846
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -107,7 +107,8 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
     
             // Offset management
             firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
    -        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
    +        // with AutoCommitMode, topology works in 'At-Most-Once' mode, and offset will be periodically committed in the background by Kafka consumer
    --- End diff --
    
    Can we add this to the readme https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md
    
    We don't document auto-commit there, but if people are using it we should have something explaining how to use it and the ramifications of turning it on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1919#discussion_r99882134
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -319,8 +320,10 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
                     } else {
                         collector.emit(tuple, msgId);
                     }
    -                emitted.add(msgId);
    -                numUncommittedOffsets++;
    +                if(!consumerAutoCommitMode){//only need to track in none-AutoCommitMode
    +                    emitted.add(msgId);
    --- End diff --
    
    It might be good to check if there are any ackers before adding the tuple to emitted. Otherwise people who are using auto commit and no ackers will run into OOME when emitted gets sufficiently large.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @HeartSaVioR Right, acking a batch at a time is better. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    This is the thing with auto-commit.  auto-commit removes some of the overhead of tracking outstanding messages, but also violates a lot of the guarantees that storm has in place.
    
    To truly get at most once processing you need to commit the message as soon as it is emitted.   Otherwise a crash can still result in messages being replayed.  If you want at least once processing you have to ack the message only after it is fully processed.  Storm makes this work out of the box for most spouts that commit the message when ack is called.
    
    With auto-commit it makes it so we could replay messages even in at most once processing and that we might not replay messages in at least once processing.  It is "I really don't care what messages I see processing".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    remove at-most-once for better describe the changes; 
    emit null msgId when AutoCommitMode to disable tuple track in storm;


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @srdo @XuMingmin 
    If my understanding is right, at-most-once can be guaranteed with this step: 
    1. pull the data from datasource
    2. send ack to the datasource
    3. emit the data to the downstreams
    
    Loosening the requirement that there will be no crash between emitting the data and sending ack to the datasource, we can swap 2 and 3, and that's what we're often referring to.
    
    So yes case 3 should explicitly ack to the datasource and data should be emitted only when sending ack succeeds. I'm not familiar with Kafka new API, but if `KafkaConsumer.commitSync` guarantees ack, we should use this for case 3.
    
    Please correct me if I'm missing here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1919#discussion_r100546788
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -345,3 +345,27 @@ Currently the Kafka spout has has the following default values, which have shown
     * offset.commit.period.ms = 30000   (30s)
     * max.uncommitted.offsets = 10000000
     <br/>
    +
    +# Kafka AutoCommitMode 
    +
    +If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations --, and want to remove the overhead of tuple tracking, then you can run a KafkaSpout with AutoCommitMode.
    +
    +To enable it, you need to:
    +* set Config.TOPOLOGY_ACKERS to 0;
    +* enable *AutoCommitMode* in Kafka consumer configuration; 
    +
    +Here's one example to set AutoCommitMode in KafkaSpout:
    +```java
    +Map<String, Object> props = new HashMap<String, Object>();
    +props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    +KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
    +		.builder(String bootstrapServers, String ... topics)
    +		.setProp(props)
    --- End diff --
    
    nit: set prop works with a key and a value, so it would be smaller to just do
    ```
    KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
    		.builder(String bootstrapServers, String ... topics)
    		.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
                    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
    		.build();
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @revans2 updated. thanks for the notes!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @HeartSaVioR  instinctively I think your code can meet at-most-once. Records that are pulled and not yet emitted are discarded if any crash. 
    
    Still, I'd want to limit the scope of this task, to close it first to fix case 1, and have a new task(STORM-2357) to figure out what's the proper solution to achieve exactly at-most-once. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1919#discussion_r99888764
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -319,8 +320,10 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
                     } else {
                         collector.emit(tuple, msgId);
                     }
    -                emitted.add(msgId);
    -                numUncommittedOffsets++;
    +                if(!consumerAutoCommitMode){//only need to track in none-AutoCommitMode
    +                    emitted.add(msgId);
    --- End diff --
    
    That is not needed.  If there are no ackers or if acking is disabled for some other reason ```ack``` is called as part of the call to emit.
    
    https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java#L142


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    Case 3 is normally what we think about non-ack in Storm spout, and it's not same as case 1 since it can guarantee at most once.
    
    So one way to keep our delivery guarantee is forcing autocommit to true (not selectable from user), which I'm not sure we're all satisfied. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    LGTM. It doesn't support running with auto commit mode and getting best-effort retry, but I don't know if that's a real use case, so maybe it doesn't matter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    thanks @revans2 , updated doc as advised.
    
    Regarding to anchor, as ack() is called immediately when acker=0, though it's not needed to track.
    
    Originally, this ticket is focused to solve case 1. let's have a separated one to address case 2 and case 3?
      1). autocommit=true + ack=0
      2). autocommit=true + ack>0 //ack track is running idle, to-be-confirm
      3). autocommit=fale + ack=0 //ack immediately, similar as case 1, to-be-confirm
      4). autocommit=false + ack>0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    There are two different settings influencing retry behavior here. Auto commit on the spout, and number of ackers configured for Storm (https://storm.apache.org/releases/1.0.0/javadocs/org/apache/storm/Config.html#TOPOLOGY_ACKER_EXECUTORS). You can configure Storm to require tuple acking, while also using auto commit for this spout, in which case the spout will retry tuples that fail, but only on a best-effort basis (i.e. if the spout crashes, tuples will not be replayed). In that case, the spout actually needs to track tuples even though auto commit is on. You can also set acker executors to 0, which will cause Storm to just ack tuples immediately when emitted. In that case it doesn't hurt anything if the spout adds tuples to emitted, because they'll just get removed again when ack() is called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @XuMingmin 
    OK agree that it's beyond the PR's scope. We still need to sort out case 1 to 4, but this PR itself looks great to me.
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    @HeartSaVioR That's pretty much my understanding of this as well. I'm not sure what order the ack/emit happens in when ackers=0, but if it's ack followed by emit, then it should be enough to put a `KafkaConsumer.commitSync` call in `KafkaSpout.ack`. `commitSync` guarantees that the committed offset is properly committed when it returns.
    
    I agree that this is how it should work for case 3.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix AutoCommitMode issue in KafkaSpout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    Yes, that's what I mean. It's not really at-most-once. For real at-most-once, the spout should call `KafkaConsumer.commitSync` when it receives an ack, not periodically (and then you'd need to disable auto-commit). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    The situations in my mind:
    1. autocommit=true + ack=0, --> work, at-most-once
    2. autocommit=true + ack>0 --> work, at-most-once ( acker be idle ? )
    3. autocommit=fale + ack=0 --> not work, blocked when numUncommittedOffsets < maxUncommittedOffsets;
    4. autocommit=false + ack>0. --> work, at-least-once



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1919#discussion_r99868716
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -319,8 +320,10 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
                     } else {
                         collector.emit(tuple, msgId);
                     }
    -                emitted.add(msgId);
    -                numUncommittedOffsets++;
    +                if(!consumerAutoCommitMode){//only need to track in none-AutoCommitMode
    +                    emitted.add(msgId);
    --- End diff --
    
    I think we still want to track emitted even in auto commit mode.  Replaying a failed tuple will not work unless it is added to emitted and even with auto commit some people may want to replay tuples.
    
    numUncommittedOffsets is only ever decremented in auto commit mode so that part of the change is fine.  Although, it would be good to add a javadoc to numUncommittedOffsets indicating that it is not modified in auto commit mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on the issue:

    https://github.com/apache/storm/pull/1919
  
    Good point to document the impact, is preparing an update PR. 
    
    According to my understand, there's no replay for AT-MOST-ONCE mode in storm. We use this mode when data missing is not a concern, and want to remove the overhead of tuple tracking.
    
    With auto-commit enabled, the consumer's offset will be periodically committed in the background by Kafka consumer, so no need to track commit offset status with emitted/numUncommittedOffsets manually.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by XuMingmin <gi...@git.apache.org>.
Github user XuMingmin commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1919#discussion_r99892349
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -107,7 +107,8 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
     
             // Offset management
             firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
    -        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
    +        // with AutoCommitMode, topology works in 'At-Most-Once' mode, and offset will be periodically committed in the background by Kafka consumer
    --- End diff --
    
    agree, let me update it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1919#discussion_r100547080
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -314,15 +316,26 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
                 boolean isScheduled = retryService.isScheduled(msgId);
                 if (!isScheduled || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
                     final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
    -                if (tuple instanceof KafkaTuple) {
    -                    collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId);
    -                } else {
    -                    collector.emit(tuple, msgId);
    -                }
    -                emitted.add(msgId);
    -                numUncommittedOffsets++;
    -                if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
    -                    retryService.remove(msgId);
    +                
    +                if(consumerAutoCommitMode){
    +                	if (tuple instanceof KafkaTuple) {
    --- End diff --
    
    nit: indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1919: STORM-2340: fix At-Most-Once issue in KafkaSpout

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1919#discussion_r99889319
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -319,8 +320,10 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
                     } else {
                         collector.emit(tuple, msgId);
                     }
    -                emitted.add(msgId);
    -                numUncommittedOffsets++;
    +                if(!consumerAutoCommitMode){//only need to track in none-AutoCommitMode
    +                    emitted.add(msgId);
    --- End diff --
    
    Sorry, nevermind. Just occured to me that Storm will call ack immediately if it's configured that way :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---