You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by hmcl <gi...@git.apache.org> on 2017/10/23 00:53:23 UTC

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

GitHub user hmcl opened a pull request:

    https://github.com/apache/storm/pull/2380

    STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees

     - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE
     - Refactor method name from setForceEnableTupleTracking to setTupleTrackingEnforced
     - Throw IllegalStateException instead of IllegalArgumentException if spout attempts to emit an already committed message
     - Update documentation to reflect these changes

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hmcl/storm-apache Apache_master_STORM-2781_KSProcGtees

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2380.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2380
    
----
commit 6a5c3c4b043a8ddd1224fb14f55a512b810d05b7
Author: Hugo Louro <hm...@gmail.com>
Date:   2017-10-23T00:44:54Z

    STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees
    
     - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE
     - Refactor method name from setForceEnableTupleTracking to setTupleTrackingEnforced
     - Throw IllegalStateException instead of IllegalArgumentException if spout attempts to emit an already committed message
     - Update documentation to reflect these changes

----


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147053713
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    -                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    -                    + "came from a topic-partition that this consumer group instance is no longer tracking "
    -                    + "due to rebalance/partition reassignment. No action taken.", msgId);
    +        if (isAtLeastOnceProcessing()) {
    --- End diff --
    
    @hmcl I agree that 
    ```
    if condition == false
      do nothing
    else
      do something
    ```
    is bad, but as @HeartSaVioR mentions early return (guard clause) isn't the same thing. Instead of writing
    ```
    if precondition == true
      rest of function
    return
    ```
    it can make the function more readable to write 
    ```
    if precondition == false
      return
    rest of function
    ```
    because the guarding `if` now doesn't cover the rest of the function, and the indendation level is reduced. See https://blog.codinghorror.com/flattening-arrow-code/ or https://en.wikibooks.org/wiki/Computer_Programming/Coding_Style/Minimize_nesting#Early_return.
    
    I agree that since there's no code after the `if` closes it's not that important. Will leave it up to your judgement.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146325123
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    --- End diff --
    
    Nit: "times-out" -> "times out". 


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146997432
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    --- End diff --
    
    Maybe we can come up with a bit better wording for this, but I really don't think that we should say that an offset is marked as processed. Offsets are committed, not processed. ConsumerRecords, wrapped by Tuples are processed.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146328865
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    -                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    -                    + "came from a topic-partition that this consumer group instance is no longer tracking "
    -                    + "due to rebalance/partition reassignment. No action taken.", msgId);
    +        if (isAtLeastOnceProcessing()) {
    --- End diff --
    
    I think this is worse. Putting a guard clause at the top decreases indentation for the whole function and is more readable IMO.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146996683
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    --- End diff --
    
    Well, this is tricky because Storm does not process offsets, storm processes tuples. More exactly, it processes tuples that contain ConsumerRecord's. The offset is just part of the  ConsumerRecord, which also contains key, val, etc... We commit the offset, but by committing the offset we are technically marking that the tuple was processed because even if the tuple fails, it won't be retried (processed again).


---

[GitHub] storm issue #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout Proces...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2380
  
    I have squashed the commits and addressed the early return issue.


---

[GitHub] storm issue #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout Proces...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2380
  
    +1, good work.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146327983
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -255,26 +255,25 @@ private void throwKafkaConsumerInterruptedException() {
         }
     
         private boolean commit() {
    -        return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
    +        return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
         }
     
         private boolean poll() {
             final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
             final int readyMessageCount = retryService.readyMessageCount();
             final boolean poll = !waitingToEmit()
    -            //Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit
    -            //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis,
    -            //and prevents locking up the spout when there are too many retriable tuples
    -            && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets
    -            || !isAtLeastOnce());
    +                // Check that the number of uncommitted, non-retriable tuples is less than the maxUncommittedOffsets limit.
    --- End diff --
    
    I think we indent to 4 spaces in the rest of the file?


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146326837
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    +     the ack gets lost.
    +
    +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted
    +     to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it
    +     won't retry tuples that fail or timeout after the commit to Kafka has been done.
    +
    +* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties
    +     "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens
    +     it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times.
    +     This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown.
    +
    +To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows:
     
    -To set the processing guarantee, use the KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g.
     ```java
     KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
       .builder(String bootstrapServers, String ... topics)
       .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
     ```
     
    -The spout will disable tuple tracking for emitted tuples by default when you use at-most-once or any-times. In some cases you may want to enable tracking anyway, because tuple tracking is necessary for some features of Storm, e.g. showing complete latency in Storm UI, or enabling backpressure through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter.
    +# Tuple Tracking
    +
    +By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE. It may be necessary to track
    +emitted tuples with other processing guarantees to benefit of Storm features such as showing complete latency in the UI,
    +or enabling backpressure with Config.TOPOLOGY_MAX_SPOUT_PENDING.
     
    -If you need to enable tracking, use the KafkaSpoutConfig.Builder.setForceEnableTupleTracking method, e.g.
     ```java
     KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
       .builder(String bootstrapServers, String ... topics)
       .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
    -  .setForceEnableTupleTracking(true)
    +  .setTupleTrackingEnforced(true)
     ```
     
    -Note that this setting has no effect in at-least-once mode, where tuple tracking is always enabled.
    \ No newline at end of file
    +Note: This setting has no effect with AT_LEAST_ONCE processing guarantees where tuple tracking is required and therefore always enabled.
    --- End diff --
    
    "with AT_LEAST_ONCE processing guarantees" -> "with the AT_LEAST_ONCE processing guarantee"


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2380


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147015173
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    -                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    -                    + "came from a topic-partition that this consumer group instance is no longer tracking "
    -                    + "due to rebalance/partition reassignment. No action taken.", msgId);
    +        if (isAtLeastOnceProcessing()) {
    --- End diff --
    
    I disagree. Any method that has only one if condition and nothing else, should be 
    ```java 
    if (condition == true) 
         do_action;
    ```
    imho it is counter natural to have code like 
    ```java
    if (condition = false) 
        do_nothing;
    else 
         do_action;
    ```
    which is what basically the early return is doing.
    
    There are also lengthier reasons related to the semantics of OOP, but I just think that in general wherever possible one should have code like if (condtion==true) do action.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147012706
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -78,17 +78,17 @@
         private transient KafkaSpoutRetryService retryService;
         // Handles tuple events (emit, ack etc.)
         private transient KafkaTupleListener tupleListener;
    -    // timer == null for modes other than at-least-once
    +    // timer == null if processing guarantee is other than at-least-once
    --- End diff --
    
    Done


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147050462
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -336,22 +335,25 @@ private void emit() {
         private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
             final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
             final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset());
    +
             if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {   // has been acked
                 LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
    -        } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
    +        } else if (emitted.contains(msgId)) {   // has been emitted and it is pending ack or fail
                 LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
             } else {
    -            Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp),
    -                "The spout is about to emit a message that has already been committed."
    -                + " This should never occur, and indicates a bug in the spout");
    +            if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
    --- End diff --
    
    Makes sense, thanks.


---

[GitHub] storm issue #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout Proces...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2380
  
    +1 again. The test failure is unrelated. Merged to master. There's a conflict when merging to 1.x, so please open another PR for that branch. Thanks.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147013717
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -255,26 +255,25 @@ private void throwKafkaConsumerInterruptedException() {
         }
     
         private boolean commit() {
    -        return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
    +        return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
         }
     
         private boolean poll() {
             final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
             final int readyMessageCount = retryService.readyMessageCount();
             final boolean poll = !waitingToEmit()
    -            //Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit
    -            //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis,
    -            //and prevents locking up the spout when there are too many retriable tuples
    -            && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets
    -            || !isAtLeastOnce());
    +                // Check that the number of uncommitted, non-retriable tuples is less than the maxUncommittedOffsets limit.
    --- End diff --
    
    Done.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147015438
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received
    --- End diff --
    
    Done.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147012458
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    +     the ack gets lost.
    +
    +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted
    --- End diff --
    
    Done


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147012506
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    +     the ack gets lost.
    +
    +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted
    +     to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it
    +     won't retry tuples that fail or timeout after the commit to Kafka has been done.
    +
    +* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties
    +     "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens
    +     it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times.
    +     This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown.
    +
    +To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows:
     
    -To set the processing guarantee, use the KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g.
     ```java
     KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
       .builder(String bootstrapServers, String ... topics)
       .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
     ```
     
    -The spout will disable tuple tracking for emitted tuples by default when you use at-most-once or any-times. In some cases you may want to enable tracking anyway, because tuple tracking is necessary for some features of Storm, e.g. showing complete latency in Storm UI, or enabling backpressure through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter.
    +# Tuple Tracking
    +
    +By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE. It may be necessary to track
    +emitted tuples with other processing guarantees to benefit of Storm features such as showing complete latency in the UI,
    +or enabling backpressure with Config.TOPOLOGY_MAX_SPOUT_PENDING.
     
    -If you need to enable tracking, use the KafkaSpoutConfig.Builder.setForceEnableTupleTracking method, e.g.
     ```java
     KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
       .builder(String bootstrapServers, String ... topics)
       .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
    -  .setForceEnableTupleTracking(true)
    +  .setTupleTrackingEnforced(true)
     ```
     
    -Note that this setting has no effect in at-least-once mode, where tuple tracking is always enabled.
    \ No newline at end of file
    +Note: This setting has no effect with AT_LEAST_ONCE processing guarantees where tuple tracking is required and therefore always enabled.
    --- End diff --
    
    Done


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146327143
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -78,17 +78,17 @@
         private transient KafkaSpoutRetryService retryService;
         // Handles tuple events (emit, ack etc.)
         private transient KafkaTupleListener tupleListener;
    -    // timer == null for modes other than at-least-once
    +    // timer == null if processing guarantee is other than at-least-once
         private transient Timer commitTimer;
         // Flag indicating that the spout is still undergoing initialization process.
         private transient boolean initialized;
         // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
     
         // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
    -    //or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once mode.
    +    // or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once processing guarantee.
         private transient Map<TopicPartition, OffsetManager> offsetManagers;
         // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed.
    -    // Always empty if not using at-least-once mode.
    +    // Always empty if processing guarantee is other than at-least-once.
    --- End diff --
    
    Same as for the timer comment


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146329730
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    -                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    -                    + "came from a topic-partition that this consumer group instance is no longer tracking "
    -                    + "due to rebalance/partition reassignment. No action taken.", msgId);
    +        if (isAtLeastOnceProcessing()) {
    +            if (!emitted.contains(msgId)) {
    +                if (msgId.isEmitted()) {
    +                    LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    +                        + "came from a topic-partition that this consumer group instance is no longer tracking "
    +                        + "due to rebalance/partition reassignment. No action taken.", msgId);
    +                } else {
    +                    LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
    +                }
                 } else {
    -                LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
    +                Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
    +                    + " This should never occur barring errors in the RetryService implementation or the spout code.");
    +                offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
    +                emitted.remove(msgId);
                 }
    -        } else {
    -            Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
    -                + " This should never occur barring errors in the RetryService implementation or the spout code.");
    -            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
    -            emitted.remove(msgId);
    +            tupleListener.onAck(msgId);
             }
    -        tupleListener.onAck(msgId);
         }
     
         // ======== Fail =======
         @Override
         public void fail(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of failed tuples if commits are done based on acks
    -            return;
    -        }
    +        // Only need to keep track of failed tuples if commits to Kafka are done after a tuple ack is received
    --- End diff --
    
    "after a tuple ack is received" -> "based on received tuple acks"


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147013620
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -125,8 +125,8 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
     
             tupleListener = kafkaSpoutConfig.getTupleListener();
     
    -        if (isAtLeastOnce()) {
    -            // Only used if the spout commits offsets for acked tuples
    +        if (isAtLeastOnceProcessing()) {
    +            // Only used if the spout should commit to Kafka an offset only after its tuple has been acked.
    --- End diff --
    
    Done.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146329456
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    -                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    -                    + "came from a topic-partition that this consumer group instance is no longer tracking "
    -                    + "due to rebalance/partition reassignment. No action taken.", msgId);
    +        if (isAtLeastOnceProcessing()) {
    +            if (!emitted.contains(msgId)) {
    +                if (msgId.isEmitted()) {
    +                    LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    +                        + "came from a topic-partition that this consumer group instance is no longer tracking "
    +                        + "due to rebalance/partition reassignment. No action taken.", msgId);
    +                } else {
    +                    LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
    +                }
                 } else {
    -                LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
    +                Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
    +                    + " This should never occur barring errors in the RetryService implementation or the spout code.");
    +                offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
    +                emitted.remove(msgId);
                 }
    -        } else {
    -            Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
    -                + " This should never occur barring errors in the RetryService implementation or the spout code.");
    -            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
    -            emitted.remove(msgId);
    +            tupleListener.onAck(msgId);
             }
    -        tupleListener.onAck(msgId);
         }
     
         // ======== Fail =======
         @Override
         public void fail(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of failed tuples if commits are done based on acks
    -            return;
    -        }
    +        // Only need to keep track of failed tuples if commits to Kafka are done after a tuple ack is received
    +        if (isAtLeastOnceProcessing()) {
    --- End diff --
    
    Same as for ack


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146329596
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received
    --- End diff --
    
    "after a tuple ack is received" -> "based on received tuple acks"


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147050615
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    --- End diff --
    
    I like the new wording, thanks.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147012467
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    +     the ack gets lost.
    +
    +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted
    +     to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it
    --- End diff --
    
    Done


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147025782
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    -                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    -                    + "came from a topic-partition that this consumer group instance is no longer tracking "
    -                    + "due to rebalance/partition reassignment. No action taken.", msgId);
    +        if (isAtLeastOnceProcessing()) {
    --- End diff --
    
    Btw I don't think it is a big deal to decide and stick with specific approach, cause there's no statement after if statement. 
    @srdo Could we leave the decision of this review comment to @hmcl?


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147025420
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    -                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    -                    + "came from a topic-partition that this consumer group instance is no longer tracking "
    -                    + "due to rebalance/partition reassignment. No action taken.", msgId);
    +        if (isAtLeastOnceProcessing()) {
    --- End diff --
    
    I know there's a huge debate on if else vs early return, but I'm in favor of early return, especially it is used as some pruning on early stage of logic.
    Sure I'm not in favor of second statements (if do nothing else do action) but early return is completely different as do nothing, as it does pruning the logic branch and no need to follow the if statement and statements after end of if statement.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147022902
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    +     the ack gets lost.
    +
    +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted
    +     to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it
    +     won't retry tuples that fail or timeout after the commit to Kafka has been done.
    +
    +* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties
    +     "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens
    +     it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times.
    +     This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown.
    +
    +To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows:
     
    -To set the processing guarantee, use the KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g.
     ```java
     KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
       .builder(String bootstrapServers, String ... topics)
       .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
     ```
     
    -The spout will disable tuple tracking for emitted tuples by default when you use at-most-once or any-times. In some cases you may want to enable tracking anyway, because tuple tracking is necessary for some features of Storm, e.g. showing complete latency in Storm UI, or enabling backpressure through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter.
    +# Tuple Tracking
    +
    +By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE. It may be necessary to track
    +emitted tuples with other processing guarantees to benefit of Storm features such as showing complete latency in the UI,
    --- End diff --
    
    Done


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147014032
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -336,22 +335,25 @@ private void emit() {
         private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
             final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
             final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset());
    +
             if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {   // has been acked
                 LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
    -        } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
    +        } else if (emitted.contains(msgId)) {   // has been emitted and it is pending ack or fail
                 LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
             } else {
    -            Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp),
    -                "The spout is about to emit a message that has already been committed."
    -                + " This should never occur, and indicates a bug in the spout");
    +            if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
    --- End diff --
    
    Validate throws an IllegalArgumentException where the correct exception here is IllegalStateException. Furthermore, Validate in my opinion as a confusing API - Validate(true) throws an exception if false. It is misleading to me. I would rather leave it like this.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147013385
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -78,17 +78,17 @@
         private transient KafkaSpoutRetryService retryService;
         // Handles tuple events (emit, ack etc.)
         private transient KafkaTupleListener tupleListener;
    -    // timer == null for modes other than at-least-once
    +    // timer == null if processing guarantee is other than at-least-once
         private transient Timer commitTimer;
         // Flag indicating that the spout is still undergoing initialization process.
         private transient boolean initialized;
         // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
     
         // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
    -    //or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once mode.
    +    // or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once processing guarantee.
    --- End diff --
    
    Done. Removed the double negation.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146327422
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -125,8 +125,8 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
     
             tupleListener = kafkaSpoutConfig.getTupleListener();
     
    -        if (isAtLeastOnce()) {
    -            // Only used if the spout commits offsets for acked tuples
    +        if (isAtLeastOnceProcessing()) {
    +            // Only used if the spout should commit to Kafka an offset only after its tuple has been acked.
    --- End diff --
    
    "Should commit an offset to Kafka only after the corresponding tuple has been acked"


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147015407
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    -                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    -                    + "came from a topic-partition that this consumer group instance is no longer tracking "
    -                    + "due to rebalance/partition reassignment. No action taken.", msgId);
    +        if (isAtLeastOnceProcessing()) {
    +            if (!emitted.contains(msgId)) {
    +                if (msgId.isEmitted()) {
    +                    LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    +                        + "came from a topic-partition that this consumer group instance is no longer tracking "
    +                        + "due to rebalance/partition reassignment. No action taken.", msgId);
    +                } else {
    +                    LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
    +                }
                 } else {
    -                LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
    +                Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
    +                    + " This should never occur barring errors in the RetryService implementation or the spout code.");
    +                offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
    +                emitted.remove(msgId);
                 }
    -        } else {
    -            Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
    -                + " This should never occur barring errors in the RetryService implementation or the spout code.");
    -            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
    -            emitted.remove(msgId);
    +            tupleListener.onAck(msgId);
             }
    -        tupleListener.onAck(msgId);
         }
     
         // ======== Fail =======
         @Override
         public void fail(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of failed tuples if commits are done based on acks
    -            return;
    -        }
    +        // Only need to keep track of failed tuples if commits to Kafka are done after a tuple ack is received
    +        if (isAtLeastOnceProcessing()) {
    --- End diff --
    
    Same comment above


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147016997
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
         // ======== Ack =======
         @Override
         public void ack(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of acked tuples if commits are done based on acks
    -            return;
    -        }
    -
    +        // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    -                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    -                    + "came from a topic-partition that this consumer group instance is no longer tracking "
    -                    + "due to rebalance/partition reassignment. No action taken.", msgId);
    +        if (isAtLeastOnceProcessing()) {
    +            if (!emitted.contains(msgId)) {
    +                if (msgId.isEmitted()) {
    +                    LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
    +                        + "came from a topic-partition that this consumer group instance is no longer tracking "
    +                        + "due to rebalance/partition reassignment. No action taken.", msgId);
    +                } else {
    +                    LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
    +                }
                 } else {
    -                LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
    +                Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
    +                    + " This should never occur barring errors in the RetryService implementation or the spout code.");
    +                offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
    +                emitted.remove(msgId);
                 }
    -        } else {
    -            Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
    -                + " This should never occur barring errors in the RetryService implementation or the spout code.");
    -            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
    -            emitted.remove(msgId);
    +            tupleListener.onAck(msgId);
             }
    -        tupleListener.onAck(msgId);
         }
     
         // ======== Fail =======
         @Override
         public void fail(Object messageId) {
    -        if (!isAtLeastOnce()) {
    -            // Only need to keep track of failed tuples if commits are done based on acks
    -            return;
    -        }
    +        // Only need to keep track of failed tuples if commits to Kafka are done after a tuple ack is received
    --- End diff --
    
    Done.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146327070
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -78,17 +78,17 @@
         private transient KafkaSpoutRetryService retryService;
         // Handles tuple events (emit, ack etc.)
         private transient KafkaTupleListener tupleListener;
    -    // timer == null for modes other than at-least-once
    +    // timer == null if processing guarantee is other than at-least-once
         private transient Timer commitTimer;
         // Flag indicating that the spout is still undergoing initialization process.
         private transient boolean initialized;
         // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
     
         // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
    -    //or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once mode.
    +    // or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once processing guarantee.
    --- End diff --
    
    "the at-least-once processing guarantee"


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146329393
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -336,22 +335,25 @@ private void emit() {
         private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
             final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
             final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset());
    +
             if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {   // has been acked
                 LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
    -        } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
    +        } else if (emitted.contains(msgId)) {   // has been emitted and it is pending ack or fail
                 LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
             } else {
    -            Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp),
    -                "The spout is about to emit a message that has already been committed."
    -                + " This should never occur, and indicates a bug in the spout");
    +            if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
    --- End diff --
    
    I like the new error message, but why get rid of Validate?


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146997051
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    --- End diff --
    
    Done


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146326263
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    +     the ack gets lost.
    +
    +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted
    +     to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it
    +     won't retry tuples that fail or timeout after the commit to Kafka has been done.
    +
    +* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties
    +     "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens
    +     it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times.
    +     This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown.
    +
    +To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows:
     
    -To set the processing guarantee, use the KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g.
     ```java
     KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
       .builder(String bootstrapServers, String ... topics)
       .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
     ```
     
    -The spout will disable tuple tracking for emitted tuples by default when you use at-most-once or any-times. In some cases you may want to enable tracking anyway, because tuple tracking is necessary for some features of Storm, e.g. showing complete latency in Storm UI, or enabling backpressure through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter.
    +# Tuple Tracking
    +
    +By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE. It may be necessary to track
    +emitted tuples with other processing guarantees to benefit of Storm features such as showing complete latency in the UI,
    --- End diff --
    
    "to benefit of" -> "to benefit from"


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146326949
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -78,17 +78,17 @@
         private transient KafkaSpoutRetryService retryService;
         // Handles tuple events (emit, ack etc.)
         private transient KafkaTupleListener tupleListener;
    -    // timer == null for modes other than at-least-once
    +    // timer == null if processing guarantee is other than at-least-once
    --- End diff --
    
    Probably nicer to write "if processing guarantee is not at-least-once"


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146324431
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    --- End diff --
    
    "For AT_LEAST_ONCE" -> "For the AT_LEAST_ONCE"


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146325771
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    +     the ack gets lost.
    +
    +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted
    +     to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it
    +     won't retry tuples that fail or timeout after the commit to Kafka has been done.
    --- End diff --
    
    Consider dropping " after the commit to Kafka has been done". The spout just won't retry tuples period.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146325303
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    +     the ack gets lost.
    +
    +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted
    +     to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it
    --- End diff --
    
    "It guarantees" -> "This setting guarantees". "because it won't retry" -> "because the spout won't retry". Also nit: "at-most-once" -> "at most once"


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147026299
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -336,22 +335,25 @@ private void emit() {
         private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
             final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
             final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset());
    +
             if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {   // has been acked
                 LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
    -        } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
    +        } else if (emitted.contains(msgId)) {   // has been emitted and it is pending ack or fail
                 LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
             } else {
    -            Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp),
    -                "The spout is about to emit a message that has already been committed."
    -                + " This should never occur, and indicates a bug in the spout");
    +            if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
    --- End diff --
    
    For me the behavior of Validate.isTrue() is not confusing (we have more classes from Guava like checking null), but throwing appropriate exception is more important to me.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147013419
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -78,17 +78,17 @@
         private transient KafkaSpoutRetryService retryService;
         // Handles tuple events (emit, ack etc.)
         private transient KafkaTupleListener tupleListener;
    -    // timer == null for modes other than at-least-once
    +    // timer == null if processing guarantee is other than at-least-once
         private transient Timer commitTimer;
         // Flag indicating that the spout is still undergoing initialization process.
         private transient boolean initialized;
         // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
     
         // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
    -    //or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once mode.
    +    // or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once processing guarantee.
         private transient Map<TopicPartition, OffsetManager> offsetManagers;
         // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed.
    -    // Always empty if not using at-least-once mode.
    +    // Always empty if processing guarantee is other than at-least-once.
    --- End diff --
    
    Done. Removed the double negation.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146324267
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    --- End diff --
    
    "... when the tuple with the ConsumerRecord for an offset" -> "when an offset", since the offsets are marked as processed, not the tuples or records.


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146325232
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    +     the ack gets lost.
    +
    +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted
    --- End diff --
    
    "Every offset" -> "Offsets"


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r146996955
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    --- End diff --
    
    Done


---

[GitHub] storm pull request #2380: STORM-2781: Refactor storm-kafka-client KafkaSpout...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2380#discussion_r147012486
  
    --- Diff: docs/storm-kafka-client.md ---
    @@ -298,25 +298,44 @@ Currently the Kafka spout has has the following default values, which have been
     * max.uncommitted.offsets = 10000000
     <br/>
     
    -# Messaging reliability modes
    +# Processing Guarantees
     
    -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
    +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when the tuple with the `ConsumerRecord` for an offset is marked
    +as processed, i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE guarantees the spout controls when
    +the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
    +
    +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once)
    +     and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance
    +     the ack gets lost.
    +
    +* AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted
    +     to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it
    +     won't retry tuples that fail or timeout after the commit to Kafka has been done.
    --- End diff --
    
    Done


---