You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/27 03:35:33 UTC

[GitHub] [flink] PatrickRen opened a new pull request, #19828: [FLINK-27762][connector/kafka] KafkaPartitionSplitReader#wakeup should only unblock KafkaConsumer#poll invocation

PatrickRen opened a new pull request, #19828:
URL: https://github.com/apache/flink/pull/19828

   ## What is the purpose of the change
   
   This pull request improves the logic of `KafkaPartitionSplitReader` that `wakeup()` should only wake up the blocking `KafkaConsumer#poll` operation, otherwise the wake-up might be delayed to the next `ConsumerNetworkClient#poll` call when handling new splits, which will throw a `WakeupException`.
   
   ## Brief change log
   
   - Add a volatile variable `isPolling` to make sure the wakeup method only unblocks KafkaConsumer#poll
   - Add new unit test for `KafkaPartitionSplitReader`
   
   
   ## Verifying this change
   
   This change is covered by the new `KafkaPartitionSplitReaderTest#testWakeupThenAssign` case.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on a diff in pull request #19828: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment

Posted by GitBox <gi...@apache.org>.
leonardBang commented on code in PR #19828:
URL: https://github.com/apache/flink/pull/19828#discussion_r897508261


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -398,6 +400,32 @@ private void maybeRegisterKafkaConsumerMetrics(
         }
     }
 
+    /**
+     * Catch {@link WakeupException} in Kafka consumer call and retry the invocation on exception.
+     *
+     * <p>This helper function handles a race condition as below:
+     *
+     * <ol>
+     *   <li>Fetcher thread finishes a {@link KafkaConsumer#poll(Duration)} call
+     *   <li>Task thread assigns new splits so invokes {@link #wakeUp()}, then the wakeup is
+     *       recorded and held by the consumer
+     *   <li>Later fetcher thread invokes {@link #handleSplitsChanges(SplitsChange)}, and
+     *       interactions with consumer will throw {@link WakeupException} because of the previously
+     *       held wakeup in the consumer
+     * </ol>
+     *
+     * <p>Under this case we need to catch the {@link WakeupException} and retry the operation.
+     */
+    private <V> V retryOnWakeup(Supplier<V> consumerCall) {
+        while (true) {
+            try {
+                return consumerCall.get();
+            } catch (WakeupException we) {
+                // Do nothing here and the loop will retry the consumer call.

Review Comment:
   please add log here which is helpful if we can not leave the loop



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang merged pull request #19828: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment

Posted by GitBox <gi...@apache.org>.
leonardBang merged PR #19828:
URL: https://github.com/apache/flink/pull/19828


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zou-can commented on pull request #19828: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment

Posted by GitBox <gi...@apache.org>.
zou-can commented on PR #19828:
URL: https://github.com/apache/flink/pull/19828#issuecomment-1156236831

   @PatrickRen It looks good. Thanks for your efforts!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on a diff in pull request #19828: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment

Posted by GitBox <gi...@apache.org>.
leonardBang commented on code in PR #19828:
URL: https://github.com/apache/flink/pull/19828#discussion_r897589085


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -398,6 +407,34 @@ private void maybeRegisterKafkaConsumerMetrics(
         }
     }
 
+    /**
+     * Catch {@link WakeupException} in Kafka consumer call and retry the invocation on exception.
+     *
+     * <p>This helper function handles a race condition as below:
+     *
+     * <ol>
+     *   <li>Fetcher thread finishes a {@link KafkaConsumer#poll(Duration)} call
+     *   <li>Task thread assigns new splits so invokes {@link #wakeUp()}, then the wakeup is
+     *       recorded and held by the consumer
+     *   <li>Later fetcher thread invokes {@link #handleSplitsChanges(SplitsChange)}, and
+     *       interactions with consumer will throw {@link WakeupException} because of the previously
+     *       held wakeup in the consumer
+     * </ol>
+     *
+     * <p>Under this case we need to catch the {@link WakeupException} and retry the operation.
+     */
+    private <V> V retryOnWakeup(Supplier<V> consumerCall, String description) {
+        while (true) {

Review Comment:
   I think we‘d better remove the loop condition as all calls on consumer is thread-safe that means we can meet the WakeupException at most once, we can retry in catch code block



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -398,6 +407,34 @@ private void maybeRegisterKafkaConsumerMetrics(
         }
     }
 
+    /**
+     * Catch {@link WakeupException} in Kafka consumer call and retry the invocation on exception.
+     *
+     * <p>This helper function handles a race condition as below:
+     *
+     * <ol>
+     *   <li>Fetcher thread finishes a {@link KafkaConsumer#poll(Duration)} call
+     *   <li>Task thread assigns new splits so invokes {@link #wakeUp()}, then the wakeup is
+     *       recorded and held by the consumer
+     *   <li>Later fetcher thread invokes {@link #handleSplitsChanges(SplitsChange)}, and
+     *       interactions with consumer will throw {@link WakeupException} because of the previously
+     *       held wakeup in the consumer
+     * </ol>
+     *
+     * <p>Under this case we need to catch the {@link WakeupException} and retry the operation.
+     */
+    private <V> V retryOnWakeup(Supplier<V> consumerCall, String description) {
+        while (true) {
+            try {
+                return consumerCall.get();
+            } catch (WakeupException we) {
+                LOG.debug(

Review Comment:
   debug level log is meaningless for troubleshooting, because we still need to change the online logger config when exception happens, could we improve it to INFO level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zou-can commented on pull request #19828: [FLINK-27762][connector/kafka] KafkaPartitionSplitReader#wakeup should only unblock KafkaConsumer#poll invocation

Posted by GitBox <gi...@apache.org>.
zou-can commented on PR #19828:
URL: https://github.com/apache/flink/pull/19828#issuecomment-1140717266

   As i commented on https://issues.apache.org/jira/browse/FLINK-27762
   
   The exception we met is in method **KafkaPartitionSplitReader#removeEmptySplits**. But i can't find any action for handling this exception in that method.
   ```
   // KafkaPartitionSplitReader#removeEmptySplits
   
   private void removeEmptySplits() {
       List<TopicPartition> emptyPartitions = new ArrayList<>();
      
       for (TopicPartition tp : consumer.assignment()) {
           // WakeUpException is thrown here.
           // since KafkaConsumer#wakeUp is called before, if execute KafkaConsumer#postion in 'if statement' above, it will throw WakeUpException.   
           if (consumer.position(tp) >= getStoppingOffset(tp)) {
               emptyPartitions.add(tp);
           }
       }
   
      // ignore irrelevant code...     
   } 
   ```
   What I'm focus is **how to handle WakeUpException** after it was thrown.
   
   As what is done in **KafkaPartitionSplitReader#fetch**
   ```
   // KafkaPartitionSplitReader#fetch
   
   public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOException {
       ConsumerRecords<byte[], byte[]> consumerRecords;
       try {
           consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
       } catch (WakeupException we) {
           // catch exception and return empty result.
           return new KafkaPartitionSplitRecords(
                   ConsumerRecords.empty(), kafkaSourceReaderMetrics);
       }
   
      // ignore irrelevant code...     
   }
   ```
   Maybe we should catch this exception in **KafkaPartitionSplitReader#removeEmptySplits**, and retry **KafkaConsumer#postion** again.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on pull request #19828: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on PR #19828:
URL: https://github.com/apache/flink/pull/19828#issuecomment-1154652936

   Thanks for the review @zou-can ! Initially I was trying to fix this case from the root that `KafkaPartitionSplitReader#wakeup` should only wake up the blocking `KafkaConsumer#poll` invocation, but I realized that it's not possible to do so because Kafka consumer doesn't expose such an API. I have updated my code to add a wrapper on Kafka consumer calls that catch `WakeupException` and retry on exception.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on pull request #19828: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment

Posted by GitBox <gi...@apache.org>.
leonardBang commented on PR #19828:
URL: https://github.com/apache/flink/pull/19828#issuecomment-1156418089

   @PatrickRen Could you also open  the PR for release-1.15 and release-1.14?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on pull request #19828: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on PR #19828:
URL: https://github.com/apache/flink/pull/19828#issuecomment-1154653485

   @becketqin @leonardBang Please take a look when you are available. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19828: [FLINK-27762][connector/kafka] KafkaPartitionSplitReader#wakeup should only unblock KafkaConsumer#poll invocation

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19828:
URL: https://github.com/apache/flink/pull/19828#issuecomment-1139247931

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fb07199d6a64b60d986c364edb348ceed9178d33",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fb07199d6a64b60d986c364edb348ceed9178d33",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fb07199d6a64b60d986c364edb348ceed9178d33 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org