You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2018/01/27 19:19:20 UTC

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/2537

    STORM-2914: Implement ProcessingGuarantee.NONE in the spout instead o…

    …f using enable.auto.commit
    
    https://issues.apache.org/jira/browse/STORM-2914
    
    This change makes the spout commit offsets asynchronously when it is using ProcessingGuarantee.NONE. This behavior is very similar to how the consumer behaves when enable.auto.commit is true, but we need to do it manually due to https://issues.apache.org/jira/browse/STORM-2913. A nice side effect is that the commit timer setting in KafkaSpoutConfig now affects NONE as well as AT_LEAST_ONCE.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-2914

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2537.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2537
    
----
commit 3e09a1d718f8d40424e7ac5d574efe7a74706cf8
Author: Stig Rohde Døssing <sr...@...>
Date:   2018-01-27T18:22:07Z

    STORM-2914: Implement ProcessingGuarantee.NONE in the spout instead of using enable.auto.commit

----


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165850087
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -453,37 +451,33 @@ public Builder(String bootstrapServers, Subscription subscription) {
             return builder;
         }
     
    -    private static void setAutoCommitMode(Builder<?, ?> builder) {
    +    private static void setPropsToFitProcessingGuarantee(Builder<?, ?> builder) {
             if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
    -            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
    -                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee");
    +            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
    --- End diff --
    
    This isn't javadoc, so linking won't work. Changed the message to be friendlier.


---

[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2537
  
    +1. Let's squash and as far as I am concerned it is good to merge. Once this is squash can you please rebase STORM-2913. Thanks.


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830248
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -453,37 +451,33 @@ public Builder(String bootstrapServers, Subscription subscription) {
             return builder;
         }
     
    -    private static void setAutoCommitMode(Builder<?, ?> builder) {
    +    private static void setPropsToFitProcessingGuarantee(Builder<?, ?> builder) {
    --- End diff --
    
    setKafkaPropsForProcessingGuarantee


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165850084
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -210,23 +215,26 @@ public Builder(String bootstrapServers, Subscription subscription) {
             }
     
             /**
    -         * Set a {@link KafkaConsumer} property.
    +         * Set a {@link KafkaConsumer} property. Please don't set enable.auto.commit, instead set the {@link ProcessingGuarantee}
    --- End diff --
    
    Moved it to the documentation


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830031
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -89,7 +89,7 @@
         private transient KafkaSpoutRetryService retryService;
         // Handles tuple events (emit, ack etc.)
         private transient KafkaTupleListener tupleListener;
    -    // timer == null if processing guarantee is none or at-most-once
    +    // timer == null if processing guarantee is at-most-once
    --- End diff --
    
    only if the


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830142
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -519,6 +519,15 @@ private boolean isEmitTuple(List<Object> tuple) {
             return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
         }
     
    +    private void commitConsumedOffsets(Set<TopicPartition> assignedPartitions) {
    --- End diff --
    
    perhaps the name of this method should be "commitFetchedOffsetsAsync" based on the javadoc for [kafkaConsumer.position(tp)](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1396)


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165851895
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -143,28 +146,28 @@ public String toString() {
     
         /**
          * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
    -     * i.e. when the offset can be committed to Kafka.
    +     * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
          * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
          * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep.
    -     *
    -     * <ul>
    -     * <li>AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If
    -     * a tuple fails or times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits on the defined
    -     * interval.</li>
    -     * <br/>
    -     * <li>AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted to the downstream
    -     * components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by
    -     * ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done.</li>
    -     * <br/>
    -     * <li>NO_GUARANTEE - the polled offsets are ready to commit immediately after being polled. The offsets are committed periodically,
    -     * i.e. a message may be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but
    -     * allows the spout to control when commits occur. Commits on the defined interval. </li>
    -     * </ul>
          */
         @InterfaceStability.Unstable
         public enum ProcessingGuarantee {
    +        /**
    +         * An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or
    +         * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits on the defined interval.
    +         */
             AT_LEAST_ONCE,
    +        /**
    +         * Every offset will be committed to Kafka right after being polled but before being emitted to the downstream components of the
    +         * topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by ensuring the spout
    +         * won't retry tuples that fail or time out after the commit to Kafka has been done
    +         */
             AT_MOST_ONCE,
    +        /**
    +         * The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may
    +         * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the
    +         * spout to control when commits occur. Commits on the defined interval
    --- End diff --
    
    Commits are made asynchronously on the defined interval.
    
    Should we also say specifically that for A_L_O and A_M_O the commits are done synchronously ?


---

[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2537
  
    @srdo I left the [comment](https://issues.apache.org/jira/browse/STORM-2914?focusedCommentId=16346102&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16346102) on the JIRA.


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830275
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -453,37 +451,33 @@ public Builder(String bootstrapServers, Subscription subscription) {
             return builder;
         }
     
    -    private static void setAutoCommitMode(Builder<?, ?> builder) {
    +    private static void setPropsToFitProcessingGuarantee(Builder<?, ?> builder) {
             if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
    -            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
    -                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee");
    +            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
    --- End diff --
    
    message should be: 
    
    Kafka enable.auto.commit is not supported. Please set the desired ProcessingGuarantee using {@link setProcessingGuarantee}


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830313
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---
    @@ -196,13 +206,37 @@ public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
         }
         
         @Test
    -    public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception {
    -        //When tuple tracking is enabled, the spout must not commit acked tuples in any-times mode because committing is managed by the consumer
    +    public void testNoneModeCommitsPolledTuples() throws Exception {
    --- End diff --
    
    testProcessingGuaranteeNoGuaranteeCommitsPolledTuples


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830316
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---
    @@ -196,13 +206,37 @@ public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
         }
         
         @Test
    -    public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception {
    -        //When tuple tracking is enabled, the spout must not commit acked tuples in any-times mode because committing is managed by the consumer
    +    public void testNoneModeCommitsPolledTuples() throws Exception {
    +        //When using the none guarantee, the spout must commit tuples periodically, regardless of whether they've been acked
    --- End diff --
    
    no-guarantee


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165852132
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -519,6 +519,15 @@ private boolean isEmitTuple(List<Object> tuple) {
             return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
         }
     
    +    private void commitConsumedOffsets(Set<TopicPartition> assignedPartitions) {
    +        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
    +        for (TopicPartition tp : assignedPartitions) {
    +            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp)));
    +        }
    +        kafkaConsumer.commitAsync(offsetsToCommit, null);
    --- End diff --
    
    agree


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830085
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -307,8 +307,12 @@ public void nextTuple() {
                     kafkaSpoutConfig.getSubscription().refreshAssignment();
                 }
     
    -            if (shouldCommit()) {
    -                commitOffsetsForAckedTuples(kafkaConsumer.assignment());
    +            if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) { //commit timer is null for AT_MOST_ONCE mode
    --- End diff --
    
    I would say this comment is redundant here. We already say that on the field, so I suggest that we just remove it.


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2537


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830279
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java ---
    @@ -85,4 +90,12 @@ public void testMetricsTimeBucketSizeInSecs() {
     
             assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
         }
    +    
    +    @Test
    +    public void testEnableAutoCommitIsBanned() {
    --- End diff --
    
    testEnableAutoCommitNotSupported


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830302
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---
    @@ -153,8 +163,8 @@ public void testAtMostOnceModeCannotReplayTuples() throws Exception {
         }
     
         @Test
    -    public void testAnyTimesModeCannotReplayTuples() throws Exception {
    -        //When tuple tracking is enabled, the spout must not replay tuples in any-times mode
    +    public void testNoneModeCannotReplayTuples() throws Exception {
    +        //When tuple tracking is enabled, the spout must not replay tuples in none mode
    --- End diff --
    
    no-guarantee


---

[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2537
  
    @hmcl Addressed your comments. Also fixed the storm-kafka-client docs to use release specific links, instead of linking to the 1.0.x branch. I've removed some of the listings of enum values in favor of linking to the enum javadoc as well, I don't think it's a good idea to duplicate that information. It's better if we just link to the javadoc, so the documentation doesn't get out of sync.


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165852904
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -453,37 +459,33 @@ public Builder(String bootstrapServers, Subscription subscription) {
             return builder;
         }
     
    -    private static void setAutoCommitMode(Builder<?, ?> builder) {
    +    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) {
             if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
    -            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
    -                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee");
    +            throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
    +                + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
             }
    -        if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
    -            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    -        } else {
    -            String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    -            if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
    -                if (autoOffsetResetPolicy == null) {
    -                    /*
    -                    If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
    -                    for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
    -                    error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer 
    -                    requests an offset that was deleted.
    -                     */
    -                    builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    -                } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
    -                    LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
    -                        + " Some messages may be skipped.");
    -                }
    -            } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
    -                if (autoOffsetResetPolicy != null
    -                    && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
    -                    LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
    -                        + " Some messages may be processed more than once.");
    -                }
    +        String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    +        if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
    +            if (autoOffsetResetPolicy == null) {
    +                /*
    +                 * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
    +                 * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
    +                 * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
    +                 * requests an offset that was deleted.
    +                 */
    +                builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    --- End diff --
    
    Good point, will add the log


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165852072
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -453,37 +459,33 @@ public Builder(String bootstrapServers, Subscription subscription) {
             return builder;
         }
     
    -    private static void setAutoCommitMode(Builder<?, ?> builder) {
    +    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) {
             if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
    -            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
    -                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee");
    +            throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
    +                + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
             }
    -        if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
    -            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    -        } else {
    -            String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    -            if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
    -                if (autoOffsetResetPolicy == null) {
    -                    /*
    -                    If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
    -                    for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
    -                    error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer 
    -                    requests an offset that was deleted.
    -                     */
    -                    builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    -                } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
    -                    LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
    -                        + " Some messages may be skipped.");
    -                }
    -            } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
    -                if (autoOffsetResetPolicy != null
    -                    && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
    -                    LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
    -                        + " Some messages may be processed more than once.");
    -                }
    +        String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    +        if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
    +            if (autoOffsetResetPolicy == null) {
    +                /*
    +                 * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
    +                 * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
    +                 * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
    +                 * requests an offset that was deleted.
    +                 */
    +                builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    +            } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
    +                LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
    +                    + " Some messages may be skipped.");
    +            }
    +        } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
    +            if (autoOffsetResetPolicy != null
    +                && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
    +                LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
    +                    + " Some messages may be processed more than once.");
                 }
    -            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
             }
    +        builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    --- End diff --
    
    We should print and INFO level log here saying:
    
    LOG.info("Set Kafka property {} to {}, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);


---

[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2537
  
    @srdo 
    I've found some divergence between master and 1.x branch. Could you raise pull request on 1.x-branch as well? I'm OK to raise a PR with two commits (STORM-2914/STORM-2913).


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165852060
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -453,37 +459,33 @@ public Builder(String bootstrapServers, Subscription subscription) {
             return builder;
         }
     
    -    private static void setAutoCommitMode(Builder<?, ?> builder) {
    +    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) {
             if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
    -            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
    -                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee");
    +            throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
    +                + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
             }
    -        if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
    -            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    -        } else {
    -            String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    -            if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
    -                if (autoOffsetResetPolicy == null) {
    -                    /*
    -                    If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
    -                    for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
    -                    error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer 
    -                    requests an offset that was deleted.
    -                     */
    -                    builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    -                } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
    -                    LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
    -                        + " Some messages may be skipped.");
    -                }
    -            } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
    -                if (autoOffsetResetPolicy != null
    -                    && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
    -                    LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
    -                        + " Some messages may be processed more than once.");
    -                }
    +        String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    +        if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
    +            if (autoOffsetResetPolicy == null) {
    +                /*
    +                 * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
    +                 * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
    +                 * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
    +                 * requests an offset that was deleted.
    +                 */
    +                builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    --- End diff --
    
    We should print and INFO level log here saying:
    
    LOG.info("Set Kafka property {} to {}, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830237
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -453,37 +451,33 @@ public Builder(String bootstrapServers, Subscription subscription) {
             return builder;
         }
     
    -    private static void setAutoCommitMode(Builder<?, ?> builder) {
    +    private static void setPropsToFitProcessingGuarantee(Builder<?, ?> builder) {
             if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
    -            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
    -                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee");
    +            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
    --- End diff --
    
    IllegalStateException


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165850081
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -519,6 +519,15 @@ private boolean isEmitTuple(List<Object> tuple) {
             return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
         }
     
    +    private void commitConsumedOffsets(Set<TopicPartition> assignedPartitions) {
    +        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
    +        for (TopicPartition tp : assignedPartitions) {
    +            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp)));
    +        }
    +        kafkaConsumer.commitAsync(offsetsToCommit, null);
    --- End diff --
    
    I don't think it improves readability, it also looks a little weird to have a constant null field at the top of the class IMO.


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830294
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---
    @@ -109,7 +119,7 @@ public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception
         }
     
         @Test
    -    public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws Exception {
    +    public void testNoneModeDisregardsMaxUncommittedOffsets() throws Exception {
    --- End diff --
    
    testProcessingGuaranteeNoGuaranteeDisregardsMaxUncommittedOffsets


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830061
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -133,8 +133,8 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
     
             tupleListener = kafkaSpoutConfig.getTupleListener();
     
    -        if (isAtLeastOnceProcessing()) {
    -            // Only used if the spout should commit an offset to Kafka only after the corresponding tuple has been acked.
    +        if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
    +            // In at-most-once mode the offsets are committed after every poll, so the timer is not used
    --- End diff --
    
    In at-most-once mode the offsets are committed after every poll and not periodically as controlled by the timer.


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165829893
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -210,23 +215,26 @@ public Builder(String bootstrapServers, Subscription subscription) {
             }
     
             /**
    -         * Set a {@link KafkaConsumer} property.
    +         * Set a {@link KafkaConsumer} property. Please don't set enable.auto.commit, instead set the {@link ProcessingGuarantee}
    --- End diff --
    
    Should we leave this info here, or add it to the [s-k-c documentation](https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md)? I would say it  belongs in s-k-c documentation. However, if we find it is too important, we can leave it here. Regardless of the location, I would write something along the lines: 
    
    "the Kafka property enable.auto.commit is not supported and if set will throw an exception. All other Kafka properties that control Kafka auto commit mechanism, if set will be ignored.


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830219
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -519,6 +519,15 @@ private boolean isEmitTuple(List<Object> tuple) {
             return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
         }
     
    +    private void commitConsumedOffsets(Set<TopicPartition> assignedPartitions) {
    +        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
    +        for (TopicPartition tp : assignedPartitions) {
    +            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp)));
    +        }
    +        kafkaConsumer.commitAsync(offsetsToCommit, null);
    --- End diff --
    
    I would create a constant called NO_CALLBACK to improve readability


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830301
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---
    @@ -153,8 +163,8 @@ public void testAtMostOnceModeCannotReplayTuples() throws Exception {
         }
     
         @Test
    -    public void testAnyTimesModeCannotReplayTuples() throws Exception {
    -        //When tuple tracking is enabled, the spout must not replay tuples in any-times mode
    +    public void testNoneModeCannotReplayTuples() throws Exception {
    --- End diff --
    
    testProcessingGuaranteeNoGuaranteeCannotReplayTuples


---

[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2537
  
    @hmcl Updated to address your list at https://issues.apache.org/jira/browse/STORM-2914?focusedCommentId=16351147&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16351147. Let me know if you're happy with the changes, and I'll squash.


---

[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2537
  
    @hmcl Thanks for the review, caught a lot of oversights :)


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830283
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java ---
    @@ -85,4 +90,12 @@ public void testMetricsTimeBucketSizeInSecs() {
     
             assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
         }
    +    
    +    @Test
    +    public void testEnableAutoCommitIsBanned() {
    +        expectedException.expect(IllegalArgumentException.class);
    --- End diff --
    
    IllegalStateException.class


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165852894
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -143,28 +146,28 @@ public String toString() {
     
         /**
          * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
    -     * i.e. when the offset can be committed to Kafka.
    +     * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
          * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
          * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep.
    -     *
    -     * <ul>
    -     * <li>AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If
    -     * a tuple fails or times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits on the defined
    -     * interval.</li>
    -     * <br/>
    -     * <li>AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted to the downstream
    -     * components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by
    -     * ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done.</li>
    -     * <br/>
    -     * <li>NO_GUARANTEE - the polled offsets are ready to commit immediately after being polled. The offsets are committed periodically,
    -     * i.e. a message may be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but
    -     * allows the spout to control when commits occur. Commits on the defined interval. </li>
    -     * </ul>
          */
         @InterfaceStability.Unstable
         public enum ProcessingGuarantee {
    +        /**
    +         * An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or
    +         * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits on the defined interval.
    +         */
             AT_LEAST_ONCE,
    +        /**
    +         * Every offset will be committed to Kafka right after being polled but before being emitted to the downstream components of the
    +         * topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by ensuring the spout
    +         * won't retry tuples that fail or time out after the commit to Kafka has been done
    +         */
             AT_MOST_ONCE,
    +        /**
    +         * The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may
    +         * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the
    +         * spout to control when commits occur. Commits on the defined interval
    --- End diff --
    
    Yes, will add notes to all of them


---

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165830128
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -307,8 +307,12 @@ public void nextTuple() {
                     kafkaSpoutConfig.getSubscription().refreshAssignment();
                 }
     
    -            if (shouldCommit()) {
    -                commitOffsetsForAckedTuples(kafkaConsumer.assignment());
    +            if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) { //commit timer is null for AT_MOST_ONCE mode
    +                if (isAtLeastOnceProcessing()) {
    +                    commitOffsetsForAckedTuples(kafkaConsumer.assignment());
    +                } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NONE) {
    --- End diff --
    
    we can delete the if condition and leave only else because if timer!=null the processing guarantee is either at-least-once or no-guarantee. If we want to make it clear we can put a line comment.


---