You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2016/09/27 19:12:50 UTC

[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/2559

    [FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously

    The offset commit calls to Kafka may occasionally take very long. In that case, the notifyCheckpointComplete() method blocks for long and the KafkaConsumer cannot make progress and cannot perform checkpoints.
    
    This pull request changes the offset committing to use Kafka's `commitAsync()` method.
    It also makes sure that no more than one commit is concurrently in progress, to that commit requests do not pile up.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink kafka_commit_async

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2559
    
----
commit eafba8600863c18e09397366485bcfc6ff44960f
Author: Stephan Ewen <se...@apache.org>
Date:   2016-09-27T18:59:35Z

    [FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2559
  
    @robert and @tzulitai What is your take on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2559#discussion_r80914495
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
     
     		if (this.consumer != null) {
     			synchronized (consumerLock) {
    -				this.consumer.commitSync(offsetsToCommit);
    +				if (!commitInProgress) {
    +					commitInProgress = true;
    +					this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
    +				}
    +				else {
    +					LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " +
    --- End diff --
    
    I agree, makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2559#discussion_r80909904
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
     
     		if (this.consumer != null) {
     			synchronized (consumerLock) {
    -				this.consumer.commitSync(offsetsToCommit);
    +				if (!commitInProgress) {
    +					commitInProgress = true;
    +					this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
    +				}
    +				else {
    +					LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " +
    --- End diff --
    
    Possibly yes. But on the other hand, this should be pretty visible if it happens.
    I would expect that with proper options to participate in group checkpoint committing, most Flink jobs run without committing to Kafka/ZooKeeper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2559
  
    @StephanEwen I think you've tagged the wrong Github ID for Robert ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2559#discussion_r81089161
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -301,4 +316,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
     		}
     		return result;
     	}
    +
    +	private class CommitCallback implements OffsetCommitCallback {
    +
    +		@Override
    +		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
    +			commitInProgress = false;
    +
    +			if (exception != null) {
    +				LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints", exception);
    --- End diff --
    
    Oops, this is actually correct, sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2559
  
    Seems like currently only the 0.8 Kafka connector have tests related to offset committing (in `Kafka08ITCase`).
    
    My two cents for testing this for now is that a IT test for correct offset committing back to Kafka in the 0.9 connector is sufficient (can take a look at `Kafka08ITCase#testOffsetInZookeeper`, but replacing `ZookeeperOffsetHandler` with the new `KafkaConsumer` methods). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2559
  
    @tzulitai Thanks for thorough review!
    
    I don't understand the problem why the `commitSpecificOffsetsToKafka` method is designed to commit synchronously. The `FlinkKafkaConsumerBase` has the pending checkpoints (I think that is what you refer to). It removes the HashMap of "offsets to commit" from the `pendingCheckpoints` Map synchronously, before even calling the fetcher to commit.
    After that, it looks to me like it does not make a difference how that Map "offsets to commit" is used (sync or async)...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2559#discussion_r80907326
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
     
     		if (this.consumer != null) {
     			synchronized (consumerLock) {
    -				this.consumer.commitSync(offsetsToCommit);
    +				if (!commitInProgress) {
    +					commitInProgress = true;
    +					this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
    +				}
    +				else {
    +					LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " +
    --- End diff --
    
    If the user sets a relatively short checkpoint interval, will this be flooding log?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen closed the pull request at:

    https://github.com/apache/flink/pull/2559


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2559
  
    Btw, just curious, does 0.8 Kafka connector have the same issue with sync committing? I haven't looked into the code for this, but just wondering if we need a ticket for 0.8 too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2559
  
    Just had a look at the API of `commitAsync`, and it seems like the committed offsets back to Kafka through this API (likewise for `commitSync`) need to be `lastProcessedMessageOffset + 1` ([https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback)](https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback))).
    
    This mainly effects that when starting from group offsets in Kafka, `FlinkKafkaConsumer09` currently starts from the wrong offset. There's a separate JIRA for this bug: [FLINK-4618](https://issues.apache.org/jira/browse/FLINK-4618).
    
    Another contributor had already picked up FLINK-4618, so I'd say it's ok to leave this PR as it is. I'll help check on FLINK-4618 progress and make sure it gets merged after this PR.
    
    Minus the above, this looks good to me. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2559#discussion_r80903481
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -86,6 +90,9 @@
     	/** Flag to mark the main work loop as alive */
     	private volatile boolean running = true;
     
    +	/** Flag indicating whether a commit of offsets to Kafka it currently happening */
    --- End diff --
    
    nit: it --> "is"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2559#discussion_r80906814
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -301,4 +316,16 @@ public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
     		}
     		return result;
     	}
    +
    +	private class CommitCallback implements OffsetCommitCallback {
    +
    +		@Override
    +		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
    +			commitInProgress = false;
    +
    +			if (exception != null) {
    +				LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints", exception);
    --- End diff --
    
    The exception message isn't included in the log warning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2559
  
    @StephanEwen 
    On a second look, I think the `commitSpecificOffsetsToKafka` method was designed to commit synchronously in the first place. `AbstractFetcher` holds a Map of all current pending offsets for committing by checkpointID, and on every `notifyCheckpointComplete` the offsets are removed from the Map before `commitSpecificOffsetsToKafka` is called.
    
    So, for async committing, I think we need to remove cleaning up the offsets in `AbstractFetcher#notifyCheckpointComplete()` and instead clean them up in a new separate callback handle method in `AbstractFetcher`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2559
  
    Thanks @tzulitai for looking at this. I will leave the offset then as it is (fixed via followup) and 
    
    The Kafka 0.8 connector needs a similar change. This here is encountered by a user, so I wanted to get the 0.9 fix in faster. Will do a follow-up for Kafka 0.8. Will also correct the issue tag ;-)
    
    I have no good idea how to test this, though, so any thoughts there are welcome!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2559
  
    Closing this for #2574 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2559
  
    Actually, just discovered that the problem is different all together.
    
    While the KafkaConsumer is polling for new data (with a timeout), it holds the consumer lock. If no data comes in Kafka, the lock is not released before the poll timeout is over.
    During that time, neither a "commitSync" nor "commitAsync" call can be fired off. The `notifyCheckpointComplete` method hence blocks until the poll timeout is over and the lock is released.
    
    We can fix this by making sure that the consumer is "woken up" to release the lock, and by making sure the lock acquisition is fair, so the committer will get it next.
    
    For the sake of releasing the lock fast in the committer method, it should still be an asynchronous commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---