You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2018/03/05 09:42:10 UTC

[GitHub] flink pull request #5634: [FLINK-5479] [kafka] Idleness detection for period...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5634

    [FLINK-5479] [kafka] Idleness detection for periodic per-partition watermarks in FlinkKafkaConsumer

    ## What is the purpose of the change
    
    This commit adds the capability to detect idle partitions in the `FlinkKafkaConsumer` when using periodic per-partition watermark generation. Users set the partition idle timeout using `setPartitionIdleTimeout(long)`. The value of the timeout determines how long a an idle partition may block watermark advancement downstream.
    
    
    ## Brief change log
    
    - Adds a `setPartitionIdleTimeout(long)` configuration method
    - Modifies `KafkaTopicPartitionStateWithPeriodicWatermarks` to keep track of necessary information to determine partition idleness.
    - Adds idleness detection logic to `AsbtractFetcher.PeriodicWatermarkEmitter`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5479

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5634.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5634
    
----
commit d7b95ca1c8bb85f77dd6b83becf8fc1d5cccb810
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-03-05T09:35:02Z

    [FLINK-5479] [kafka] Idleness detection for periodic per-partition watermarks
    
    This commit adds the capability to detect idle partitions in the
    FlinkKafkaConsumer when using periodic per-partition watermark
    generation. Users set the partition idle timeout using
    `setPartitionIdleTimeout(long)`. The value of the timeout determines how
    long a an idle partition may block watermark advancement downstream.

----


---

[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the issue:

    https://github.com/apache/flink/pull/5634
  
    @tzulitai @StephanEwen the current idleness detection in the source context isn't a replacement for what is required to deal with an inactive partition (or Kinesis shard). When a connector subtask consumes multiple partitions and one is idle, then it should be possible to still generate a watermark. This can only be solved outside of the connector when the multiple source partitions are visible (like it would be for an operator with multiple input streams).


---

[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5634
  
    A few points to mention:
    
    1. @StephanEwen there is already common idleness detection implemented within `SourceContext`s, see `StreamSourceContexts`. The idleness detection, however, is currently always disabled and we do not allow users to configure it.
    
    2. I agree that we shouldn't add anything more to the connector, have also discussed this offline with @aljoscha. We should maybe do this only as part of the new connector rework that @tweise and I were talking about, in 1.6.


---

[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5634
  
    @tweise @tzulitai 
    
    I would suggest to solve this the following way, which should be both simple and cover our cases:
    
      - We extend the current periodic watermark generators for idleness. We can do that for example by maintaining a record counter and remembering the last counter and a System.nanoTime() timestamp each time the call whether to generate a watermark is called. If no record came for too long, return a special watermark object that indicated "idle". Or change the return type to return either 'none', 'idle', or 'watermark'
    
      - The Kinesis Concumer needs per-shard watermarks, same way as the Kafka Consumer does. That part needs to be added to the Kinesis consumer anyways.
    
    That way, we automatically get per-shard idleness in Kinesis and per-partition idleness in Kafka without doing anything specific for the source connectors.
    
    We can then also remove the idleness logic from the source context - it would be duplicate there.


---

[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

Posted by juhoautio <gi...@git.apache.org>.
Github user juhoautio commented on the issue:

    https://github.com/apache/flink/pull/5634
  
    @tzulitai did you ever test your code? I tried it and it allowed watermarks to proceed but apparently too aggressively, as it caused a lot of data loss.
    
    I'm looking for a quick fix for this issue, as it seems that FLINK-5479 won't be fixed too soon. So I would very much like to hear if you have been able to fix this in some lighter way.
    
    My understanding of your PR is that it doesn't work reliably because it just seems to add an internal timeout, that could be surpassed whenever the consumer is for example busy consuming other partitions. Please comment if this perception is wrong.
    
    I'm thinking that it should instead get the information that a partition was idle from the kafka client, and only in that case (empty result from client) create a newer watermark for that partition. It shouldn't mark the partition to some idle state – and shouldn't create newer watermarks periodically without any connection to another empty result from the client. New watermarks should be only generated as a callback of the kafka client result..?


---

[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/5634
  
    I think the ideal would be that idleness would occur only for tail reads, i.e. due to a timeout from `kafkaConsumer.poll(pollTimeout)`.    In other words, an intermittent connection issue would ideally not trigger idleness.


---

[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the issue:

    https://github.com/apache/flink/pull/5634
  
    This is a good proposal, it should also survive a general connector refactor that will be necessary to address other code duplication. The Kinesis ticket is https://issues.apache.org/jira/browse/FLINK-5697 and I will add a reference back to this thread. I would be happy to add the watermark support based on the revised generator. Perhaps it would be good to recognize the special "idle" watermark in SourceContext also?


---

[GitHub] flink pull request #5634: [FLINK-5479] [kafka] Idleness detection for period...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai closed the pull request at:

    https://github.com/apache/flink/pull/5634


---

[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5634
  
    @juhoautio
    
    Your concerns for this fix is quite correct, and is why this PR was closed in the first place as there are a lot of ill-defined semantics introduced by this.
    
    Regarding your thought on relating idleness to empty results returned from Kafka: I think that seems like a good approach, and should also capture Eron's comment quite well.


---

[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5634
  
    I'm closing this PR now, as it seems the overall agreement is that we want to implement this differently, or at least not touch the Kafka connector code any more now.


---

[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5634
  
    I would suggest to approach this in a different way.
    
      1. Idleness detection is something that watermark generation benefits from in general, not just in Kafka
      2. Unless there is a very strong reason, I would not want to add anything anymore to the Kafka Connector. This connector implementation is so big already. We saw multiple issues in the past, where the Kafka Connector's complexity was the cause of problems.


---

[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the issue:

    https://github.com/apache/flink/pull/5634
  
    There was a related discussion on the mailing list; this and several other features could be provided by a common connector framework. Such initiative is a much larger effort though and it is not clear to me that users can wait? The Kinesis consumer has virtually identical requirements and we have already written custom code for it.


---

[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5634
  
    Just saw a good comment from @EronWright 
    
    > I think the ideal would be that idleness would occur only for tail reads, i.e. due to a timeout from `kafkaConsumer.poll(pollTimeout)`. In other words, an intermittent connection issue would ideally not trigger idleness.
    
    Let's see if we can get that into the design somehow, without having too specific logic inside the Kafka Consumer (making the Kafka Consumer more complex is my personal Pet Peeve)


---