You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by EAlexRojas <gi...@git.apache.org> on 2018/05/11 13:36:37 UTC

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

GitHub user EAlexRojas opened a pull request:

    https://github.com/apache/flink/pull/5991

    [FLINK-9303] [kafka] Adding support for unassign dynamically partitions from kafka consumer when they become unavailable

    ## What is the purpose of the change
    
    This pull request add an option on the kafka consumer to check for unavailable partitions and unassign them from the consumer. That way the consumer does not request for records on invalid partitions and prevent Logs noises.
    
    ## Brief change log
    
    - Modify the partition discovery system to check not only new partitions, but also check partitions that are no longer available.
    - Check for partitions no longer available recovered from state.
    - Add option on kafka consumer to activate this checks
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    *Manually verified as follows:*
    - Create a job with a kafka consumer listening to a topic pattern and having partition discovery activated and the property introduced in this PR set to true.
    - Configure Kafka to have set the following properties: 
       delete.topic.enable=true
       auto.create.topics.enable=false
    - Create some topics matching the pattern.
    - Run the job.
    -  While running, remove some of the topics. 
    - Verify the partitions are unassigned and the job continue running without Log noises.
    
    *I guess this can be tested with e2e tests, but I'm not familiarised with the system in place* 
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/EAlexRojas/flink kafka-unassign-partitions-fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5991
    
----
commit a17d0dcdeaac5b2508f4748d08fd4cb879fa5033
Author: EAlexRojas <al...@...>
Date:   2018-04-18T14:35:57Z

    [FLINK-9303] [kafka] Adding support for unassign dynamically partitions from kafka consumer when they become unavailable
    - Check for unavailable partitions recovered from state
    - Using kafka consumer option to activate this validations

----


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r190150419
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
     			Pattern topicPattern,
     			KeyedDeserializationSchema<T> deserializer,
     			long discoveryIntervalMillis,
    -			boolean useMetrics) {
    +			boolean useMetrics,
    +			boolean checkUnavailablePartitions) {
    --- End diff --
    
    Why do we want this to be configurable? Is there any case that we would prefer to leave them untouched?


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by EAlexRojas <gi...@git.apache.org>.
Github user EAlexRojas commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r190519624
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---
    @@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
     			Pattern topicPattern,
     			KeyedDeserializationSchema<T> deserializer,
     			long discoveryIntervalMillis,
    -			boolean useMetrics) {
    +			boolean useMetrics,
    +			boolean checkUnavailablePartitions) {
    --- End diff --
    
    I did it in that way only because this is something new, so I though that maybe you may want it to be configurable. But you are right I cannot think of a case we would prefer to keep the unavailable partitions.
    I'll update the PR to make it the default behaviour if it's ok for you.


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r187647828
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -80,6 +82,9 @@
     	/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
     	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue;
     
    +	/** The list of partitions to be removed from kafka consumer. */
    +	private final List<TopicPartition> partitionsToBeRemoved;
    --- End diff --
    
    Should this be Set to facilitate fast lookup ?


---

[GitHub] flink issue #5991: [FLINK-9303] [kafka] Adding support for unassign dynamica...

Posted by EAlexRojas <gi...@git.apache.org>.
Github user EAlexRojas commented on the issue:

    https://github.com/apache/flink/pull/5991
  
    PR updated taking into account comments


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by EAlexRojas <gi...@git.apache.org>.
Github user EAlexRojas commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r188564095
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---
    @@ -221,7 +221,8 @@ private FlinkKafkaConsumer08(
     				getLong(
     					checkNotNull(props, "props"),
     					KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
    -				!getBoolean(props, KEY_DISABLE_METRICS, false));
    +				!getBoolean(props, KEY_DISABLE_METRICS, false),
    +				getBoolean(props, KEY_CHECK_UNAVAILABLE_TOPICS, false));
    --- End diff --
    
    You're right, I'll change it


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by EAlexRojas <gi...@git.apache.org>.
Github user EAlexRojas commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r190529981
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -374,8 +385,8 @@ void setOffsetsToCommit(
     	 * <p>This method is exposed for testing purposes.
     	 */
     	@VisibleForTesting
    -	void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
    -		if (newPartitions.size() == 0) {
    +	void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions, Set<TopicPartition> partitionsToBeRemoved) throws Exception {
    --- End diff --
    
    I though about it, but my only concern is the case where we'd have both, partitions to add and partitions to remove...  
    the `consumerCallBridge.assignPartitions()` takes the whole new list of partitions, so in that case, we would need to wait for the first assignment (e.g. add new partitions) before doing the second assignment (e.g. remove partitions) in order to have a consistent list of partitions. 
    I think we would try to have only one call to `consumerCallBridge.assignPartitions()`.
    
    Maybe I could refactor the part where partitions are removed from old partitions to a separate private method like `removeFromOldPartitions()` ?
    
    What do you think ?


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by EAlexRojas <gi...@git.apache.org>.
Github user EAlexRojas commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r190518142
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -80,6 +83,9 @@
     	/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
     	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue;
     
    +	/** The list of partitions to be removed from kafka consumer. */
    +	private final Set<TopicPartition> partitionsToBeRemoved;
    --- End diff --
    
    From my understanding, for unassigned partitions we can use a Queue because it does not matter which consumer will take the new partitions.
    But we can not use a Queue for partitions to be removed because we only can remove the partitions from the consumer that is actually subscribed to that partition.
    Does that make sense ?


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by EAlexRojas <gi...@git.apache.org>.
Github user EAlexRojas commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r188596584
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -80,6 +82,9 @@
     	/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
     	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue;
     
    +	/** The list of partitions to be removed from kafka consumer. */
    +	private final List<TopicPartition> partitionsToBeRemoved;
    --- End diff --
    
    You are right, a Set should be better for all the calls to the `contains()` method. 


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r190153528
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -80,6 +83,9 @@
     	/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
     	private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue;
     
    +	/** The list of partitions to be removed from kafka consumer. */
    +	private final Set<TopicPartition> partitionsToBeRemoved;
    --- End diff --
    
    Would it actually make more sense that we have a queue for this? Like how we are handling unassigned new partitions via the `unassignedPartitionsQueue`. The fact that this is a set means that we will need to eventually remove entries from it anyways.


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r190152570
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -374,8 +385,8 @@ void setOffsetsToCommit(
     	 * <p>This method is exposed for testing purposes.
     	 */
     	@VisibleForTesting
    -	void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
    -		if (newPartitions.size() == 0) {
    +	void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions, Set<TopicPartition> partitionsToBeRemoved) throws Exception {
    --- End diff --
    
    I have the feeling that this method is way too complex now, to a point that it might make more sense to break this up into 2 different methods - `addPartitionsToAssignment` and `removePartitionsFromAssignment`.


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r187648458
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -374,8 +384,8 @@ void setOffsetsToCommit(
     	 * <p>This method is exposed for testing purposes.
     	 */
     	@VisibleForTesting
    -	void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions) throws Exception {
    -		if (newPartitions.size() == 0) {
    +	void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions, List<TopicPartition> partitionsToBeRemoved) throws Exception {
    +		if (newPartitions.size() == 0 && partitionsToBeRemoved.isEmpty()) {
    --- End diff --
    
    size() -> ! isEmpty


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by EAlexRojas <gi...@git.apache.org>.
Github user EAlexRojas commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r190530647
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---
    @@ -240,7 +249,9 @@ public void run() {
     						newPartitions = unassignedPartitionsQueue.getBatchBlocking();
     					}
     					if (newPartitions != null) {
    -						reassignPartitions(newPartitions);
    +						reassignPartitions(newPartitions, new HashSet<>());
    --- End diff --
    
    I just realized this should be actually
    `reassignPartitions(newPartitions, partitionsToBeRemoved);`


---

[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5991#discussion_r187647275
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---
    @@ -221,7 +221,8 @@ private FlinkKafkaConsumer08(
     				getLong(
     					checkNotNull(props, "props"),
     					KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
    -				!getBoolean(props, KEY_DISABLE_METRICS, false));
    +				!getBoolean(props, KEY_DISABLE_METRICS, false),
    +				getBoolean(props, KEY_CHECK_UNAVAILABLE_TOPICS, false));
    --- End diff --
    
    Should this be named KEY_CHECK_UNAVAILABLE_PARTITIONS ?


---