You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by wangzzu <gi...@git.apache.org> on 2018/08/03 09:20:34 UTC

[GitHub] storm pull request #2791: STORM-3176: KafkaSpout commit offset occurs Commit...

GitHub user wangzzu opened a pull request:

    https://github.com/apache/storm/pull/2791

    STORM-3176: KafkaSpout commit offset occurs CommitFailedException which leads to worker dead

    KafkaSpout use the commitAsync api of Consumer, if the interval time between call consumer.poll() more than max.poll.interval.ms or the heartbeat of consumer timeout, that will occur CommitFailedException,  and then the worker will dead, the log like this:
    
    ```
    2018-07-31 19:19:03.341 o.a.s.util [ERROR] Async loop died!
    org.apache.mtkafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer th
    an the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in
    poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) ~[stormjar.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) ~[stormjar.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1126) ~[stormjar.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:XXX) ~[stormjar.jar:?]
    at org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:430) ~[stormjar.jar:?]
    at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:264) ~[stormjar.jar:?]
    at org.apache.storm.daemon.executor$fn__10936$fn__10951$fn__10982.invoke(executor.clj:647) ~[storm-core-1.1.2-mt001.jar:?]
    at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.1.2-mt001.jar:?]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
    2018-07-31 19:19:03.342 o.a.s.d.executor [ERROR]
    ```
    
    I find it will catch the Exception in auto-commit mode of consumer, the source code is:
    
    ```java
    private void maybeAutoCommitOffsetsSync(long timeoutMs) {
        if (autoCommitEnabled) {
            Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
            try {
                log.debug("Sending synchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId);
                if (!commitOffsetsSync(allConsumedOffsets, timeoutMs))
                    log.debug("Auto-commit of offsets {} for group {} timed out before completion",
                            allConsumedOffsets, groupId);
            } catch (WakeupException | InterruptException e) {
                log.debug("Auto-commit of offsets {} for group {} was interrupted before completion",
                        allConsumedOffsets, groupId);
                // rethrow wakeups since they are triggered by the user
                throw e;
            } catch (Exception e) {
                // consistent with async auto-commit failures, we do not propagate the exception
                log.warn("Auto-commit of offsets {} failed for group {}: {}", allConsumedOffsets, groupId,
                        e.getMessage());
            }
        }
    }
    ```
    
    I think KafkaSpout should do like this, catch the Exception avoid to worker die. And when the msg ack fail, Spout should judge the offset of the msgID is larger than the last commit offset(Spout can guarantee that these msgs which offset less than the last commit offset are all ack), if not, the msg should not retry.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wangzzu/storm storm-kafka-client

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2791.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2791
    
----
commit 54005a2bd28be7928cee05d6500136d3f1fb926d
Author: wangmeng36 <wa...@...>
Date:   2018-08-03T08:44:40Z

    storm-kafka-client fix the CommitFailedException bug

----


---

[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...

Posted by wangzzu <gi...@git.apache.org>.
Github user wangzzu commented on the issue:

    https://github.com/apache/storm/pull/2791
  
    @srdo I found that we are using the deprecated Subscription subtypes actually, thx.


---

[GitHub] storm pull request #2791: STORM-3176: KafkaSpout commit offset occurs Commit...

Posted by wangzzu <gi...@git.apache.org>.
Github user wangzzu closed the pull request at:

    https://github.com/apache/storm/pull/2791


---

[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...

Posted by wangzzu <gi...@git.apache.org>.
Github user wangzzu commented on the issue:

    https://github.com/apache/storm/pull/2791
  
    @srdo sorry, the version I used is 1.1.2. This problem has already fixed in 1.2.0.


---

[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2791
  
    @wangzzu Hi, I'm not sure this should be happening. We're not using the `KafkaConsumer.subscribe` API anymore, so Kafka shouldn't be managing the partition assignment.
    
    Can you confirm that you're using storm-kafka-client 1.2.0? Could you please post your KafkaSpoutConfig? 


---

[GitHub] storm issue #2791: STORM-3176: KafkaSpout commit offset occurs CommitFailedE...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2791
  
    @wangzzu Yes, but it should also have been fixed in 1.1.2. Please make sure you're not using one of the deprecated Subscription subtypes, e.g. https://github.com/apache/storm/blob/v1.1.2/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java


---