You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2017/02/21 15:18:11 UTC

[GitHub] flink pull request #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3378

    [FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determination to open()

    This PR fixes a regression due to the recently merged #2509 (FLINK-4280).
    The new start position feature added in #2509 needed to assume that on restore, all offsets are defined. This was not true, if a restored checkpoint was taken before the fetcher was ever initialized or run.
    
    This PR fixes this by changing the following:
    1. Move the start position determination logic to `open()`. This assures that when `snapshotState()` is called, we will always have defined offsets.
    2. Introduce special "magic offset values" to represent that a partition is to be started from either `EARLIEST`, `LATEST`, or `GROUP_OFFSETS`. These values are set as placeholders in `open()`. The consumer follows a lazy evaluation approach to only replace these magic values with actual offsets when the fetcher actually starts running.
    
    Therefore, with this PR, if a checkpoint happens before a fetcher fully starts consuming all of its subscribed partitions, it will at least contain the "magic offset value" in the state, instead of an undefined offset like before.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5849

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3378.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3378
    
----
commit 7e7bf1d106d4dc0d24fa6746e94ccdadbc06088e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-02-21T15:05:32Z

    [FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determination to open()

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3378
  
    Tests pass, merging ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3378#discussion_r103199655
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---
    @@ -42,13 +43,7 @@
     
     import java.io.Serializable;
     import java.lang.reflect.Field;
    -import java.util.ArrayList;
    -import java.util.Arrays;
    -import java.util.Collections;
    -import java.util.HashMap;
    -import java.util.HashSet;
    -import java.util.List;
    -import java.util.Set;
    +import java.util.*;
    --- End diff --
    
    Yikes, second time :/
    I think there's a settings to disable star imports in Intellij, will try to use it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3378#discussion_r103196367
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -330,8 +315,49 @@ public void cancel() {
     	public void open(Configuration configuration) {
    --- End diff --
    
    I wonder if it makes sense to move the method above the run() method.
    Then its more logical going through the source code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3378
  
    Rebased on `master`.
    
    Note about changes to partition assignment logic in deleted lines 538 - 553 and added lines 563 -565 of `FlinkKafkaConsumerBase`:
    The change is irrelevant to this issue, but something I stumbled across when touching that part of the code. Problems:
    
    1. The `KafkaConsumerPartitionAssignmentTest` was testing a no-longer used `assignPartitions` method, so the tests actually never covered the actual behaviour.
    
    2. Previously, the partition assignment was changed from using the "modulo on KafkaTopicPartition hashes" approach to "pre-sorting the partition list and round-robin assigning". This change should actually breaks the tests in `KafkaConsumerPartitionAssignmentTest`, but didn't because as mentioned above, the tests were testing an unused method. The current approach will also be problematic for dynamically growing subscribed partition lists, because the sorting order will change as the list grows with newly discovered partitions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3378
  
    Thanks for the review @rmetzger!
    For the moving of `open()` to before `run()`, I've included that as the commit tagged with FLINK-5849.
    For the star import fix, I'm going to include that within a follow-up hotfix that cleansup all Flink Kafka connector tests of star & unused imports.
    
    Doing one final Travis run locally and merging this to `master` once it turns green.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3378#discussion_r103199733
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -330,8 +315,49 @@ public void cancel() {
     	public void open(Configuration configuration) {
    --- End diff --
    
    Makes sense, will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3378#discussion_r103197252
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---
    @@ -42,13 +43,7 @@
     
     import java.io.Serializable;
     import java.lang.reflect.Field;
    -import java.util.ArrayList;
    -import java.util.Arrays;
    -import java.util.Collections;
    -import java.util.HashMap;
    -import java.util.HashSet;
    -import java.util.List;
    -import java.util.Set;
    +import java.util.*;
    --- End diff --
    
    Star imports are not wanted in Flink :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3378


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---