You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by hmcl <gi...@git.apache.org> on 2017/11/29 02:27:04 UTC

[GitHub] storm pull request #2438: STORM-2835: storm-kafka-client KafkaSpout can fail...

GitHub user hmcl opened a pull request:

    https://github.com/apache/storm/pull/2438

    STORM-2835: storm-kafka-client KafkaSpout can fail to remove all tuples from waitingToEmit

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hmcl/storm-apache Apache_master_STORM-2385

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2438.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2438
    
----
commit e76d86a10cf6367acdaae1da071843ab8e83630e
Author: Hugo Louro <hm...@gmail.com>
Date:   2017-11-29T02:25:19Z

    STORM-2835: storm-kafka-client KafkaSpout can fail to remove all tuples from waitingToEmit

----


---

[GitHub] storm pull request #2438: STORM-2835: storm-kafka-client KafkaSpout can fail...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2438#discussion_r154863225
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -242,9 +242,7 @@ public void nextTuple() {
                         }
                     }
     
    -                if (waitingToEmit()) {
    -                    emit();
    -                }
    +                emitIfWaitingNotEmitted();
    --- End diff --
    
    If you're removing the waitingToEmit check, I think you should check for null in emitIfWaitingNotEmitted.


---

[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2438
  
    @hmcl 
    I'm still +1 and I think @srdo agreed to make change. Let's merge in.


---

[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2438
  
    +1


---

[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/2438
  
    @HeartSaVioR , can we look at merging this if your comments have been addressed ?


---

[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2438
  
    @srdo `waitingToEmit()` is also called from `poll()` to determine whether it should poll or not. Got it. Thanks for letting me know. :)


---

[GitHub] storm pull request #2438: STORM-2835: storm-kafka-client KafkaSpout can fail...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2438#discussion_r155007745
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -242,9 +242,7 @@ public void nextTuple() {
                         }
                     }
     
    -                if (waitingToEmit()) {
    -                    emit();
    -                }
    +                emitIfWaitingNotEmitted();
    --- End diff --
    
    Done


---

[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2438
  
    @arunmahadevan @HeartSaVioR @srdo this patch has been merged into master and 1.x-branch


---

[GitHub] storm pull request #2438: STORM-2835: storm-kafka-client KafkaSpout can fail...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl closed the pull request at:

    https://github.com/apache/storm/pull/2438


---

[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2438
  
    @srdo @HeartSaVioR thanks for the review. Can you please take another look. Thanks.


---

[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2438
  
    Sorry I didn't recognize the code has changed. I'll revoke my +1 and take a look. Sorry about that.


---

[GitHub] storm pull request #2438: STORM-2835: storm-kafka-client KafkaSpout can fail...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2438#discussion_r154863967
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -372,19 +370,23 @@ private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earlie
         }
     
         // ======== emit  =========
    -    private void emit() {
    -        while (!emitTupleIfNotEmitted(waitingToEmit.next()) && waitingToEmit.hasNext()) {
    +    private void emitIfWaitingNotEmitted() {
    +        while (waitingToEmit.hasNext()) {
    +            final boolean emitted = emitOrRetryTuple(waitingToEmit.next());
                 waitingToEmit.remove();
    +            if (emitted) {
    +                break;
    +            }
             }
         }
     
         /**
    -     * Creates a tuple from the kafka record and emits it if it was not yet emitted.
    +     * Creates a tuple from the kafka record and emits it if it was never emitted or is ready to be retried
    --- End diff --
    
    nit: If my memory is right, checksytle will check the ending '.' in javadoc. It is missing.


---

[GitHub] storm pull request #2438: STORM-2835: storm-kafka-client KafkaSpout can fail...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2438#discussion_r154863797
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -242,9 +242,7 @@ public void nextTuple() {
                         }
                     }
     
    -                if (waitingToEmit()) {
    -                    emit();
    -                }
    +                emitIfWaitingNotEmitted();
    --- End diff --
    
    +1 Nice finding.


---

[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2438
  
    Could you clarify what the bug is here? I'm having trouble understanding why this could cause issues. `waitingToEmit` is an iterator on a list that we don't use for anything else, so I'm not clear on why we even need to call `waitingToEmit.remove()`?


---

[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2438
  
    So there's `hasNext()` in `waitingToEmit()` which made me confused...
    
    I agree technically the patch may not be required, but I'd like to propose removing `hasNext()` from `waitingToEmit()` and adopt this patch, which helps to read `emit()` method.
    
    And I prefer throwing out the list instead of removing elements if we discard the list just after loop end, but this is not the case so I'm +1 to remove element when element is not needed.


---

[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2438
  
    @hmcl Thanks for elaborating.
    
    As @HeartSaVioR mentions, there's a `hasNext` call in `waitingToEmit()`, which is called before this loop, so I don't think it's possible that we call `next` without calling `hasNext` first. That said, I agree that this change makes the code easier to read, so we should make the change regardless.
    
    Regarding `waitingToEmit.remove`, we aren't just failing to remove the last element from the list. We end up back in this loop in the next call to `nextTuple` if there are still unemitted records, so we're failing to remove any element that results in a tuple getting emitted, since we skip `waitingToEmit.remove` when `emitTupleIfNotEmitted` returns true. Since we never read any element from `waitingToEmit` more than once, I think we should remove elements as soon as they're read, i.e. something like 
    ```java
    while (waitingToEmit.hasNext()) {
      boolean emitted = emitTupleIfNotEmitted(waitingToEmit.next());
      waitingToEmit.remove();
      if (emitted) {
        break;
      }
    }
    ```
    
    @HeartSaVioR We might need to keep the `hasNext` call in `waitingToEmit()` since `waitingToEmit()` is also used to decide whether to poll for new records. I think if we remove it the spout will poll for more tuples even if there are still pending records in the `waitingToEmit` list.


---

[GitHub] storm issue #2438: STORM-2835: storm-kafka-client KafkaSpout can fail to rem...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/2438
  
    Yes, you are right. Technically it is not necessary to remove the elements from the collection. Nevertheless, this code change addresses the following, important, issues:
    
    1. Calling next() before hasNext() can throw NoSuchElementException
    2. It is good practice remove the elements from the iterator (list) for two reasons
       a.  to relief memory pressure. Otherwise, until a new poll happens, all the tuples will remain in memory
       b. It makes sense from a system internals standpoint: Once you emit the record/tuple, it should no longer be in the waitingToEmit. Why should it be on this list, if it has already been emitted?
    3. The code has is is a much more common pattern, and hence better practice as compared to the previous one.
    
    The only reason I can think not to remove the elements from this list is performance. If we feel it gives that much more throughput, we can can consider not removing the elements. However, even if we don't remove the elements the code should be along the lines (perhaps refactoring a bit to avoid the empty body in the while loop)
    
    ```java
    while (waitingToEmit.hasNext() && !emitTupleIfNotEmitted(waitingToEmit.next()));
    ```


---