You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/13 10:09:47 UTC

[GitHub] [flink] PatrickRen opened a new pull request, #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

PatrickRen opened a new pull request, #19456:
URL: https://github.com/apache/flink/pull/19456

   ## What is the purpose of the change
   
   This pull request fixes a bug that `KafkaSource` would throw exception and fail if `SourceReader` is assigned with empty splits. 
   
   ## Brief change log
   
   - Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case
   - Add case to test consuming empty partitions
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Add unit and integration test cases for consuming empty Kafka topic and partitions
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on PR #19456:
URL: https://github.com/apache/flink/pull/19456#issuecomment-1154675519

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #19456:
URL: https://github.com/apache/flink/pull/19456#discussion_r851041663


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -131,12 +138,7 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
             kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
         }
 
-        // Some splits are discovered as empty when handling split additions. These splits should be
-        // added to finished splits to clean up states in split fetcher and source reader.
-        if (!emptySplits.isEmpty()) {
-            recordsBySplits.finishedSplits.addAll(emptySplits);
-            emptySplits.clear();
-        }
+        markEmptySplitsAsFinished(recordsBySplits);

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #19456:
URL: https://github.com/apache/flink/pull/19456#discussion_r849792841


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -98,9 +98,16 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
         ConsumerRecords<byte[], byte[]> consumerRecords;
         try {
             consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
-        } catch (WakeupException we) {
-            return new KafkaPartitionSplitRecords(
-                    ConsumerRecords.empty(), kafkaSourceReaderMetrics);
+        } catch (WakeupException | IllegalStateException e) {
+            // IllegalStateException will be thrown if the consumer is not assigned any partitions.
+            // This happens if all assigned partitions are invalid or empty (starting offset >=

Review Comment:
   Makes sense. Would it be even better to avoid sending the split from the enumerator if the split is invalid?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #19456:
URL: https://github.com/apache/flink/pull/19456#discussion_r894248255


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -98,9 +98,16 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
         ConsumerRecords<byte[], byte[]> consumerRecords;
         try {
             consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
-        } catch (WakeupException we) {
-            return new KafkaPartitionSplitRecords(
-                    ConsumerRecords.empty(), kafkaSourceReaderMetrics);
+        } catch (WakeupException | IllegalStateException e) {
+            // IllegalStateException will be thrown if the consumer is not assigned any partitions.
+            // This happens if all assigned partitions are invalid or empty (starting offset >=

Review Comment:
   I think it should be "all", which refers to all partitions assigned to **this** consumer. This exception won't be thrown if just some partitions are invalid.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19456:
URL: https://github.com/apache/flink/pull/19456#issuecomment-1097870877

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d6c5ebea58677a317f1bd88070083195f22bb5d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d6c5ebea58677a317f1bd88070083195f22bb5d8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d6c5ebea58677a317f1bd88070083195f22bb5d8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang merged pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

Posted by GitBox <gi...@apache.org>.
leonardBang merged PR #19456:
URL: https://github.com/apache/flink/pull/19456


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #19456:
URL: https://github.com/apache/flink/pull/19456#discussion_r851005151


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -131,12 +138,7 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
             kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
         }
 
-        // Some splits are discovered as empty when handling split additions. These splits should be
-        // added to finished splits to clean up states in split fetcher and source reader.
-        if (!emptySplits.isEmpty()) {
-            recordsBySplits.finishedSplits.addAll(emptySplits);
-            emptySplits.clear();
-        }
+        markEmptySplitsAsFinished(recordsBySplits);

Review Comment:
   The stopping offset of unbounded partitions is set to Long.MAX, so they won't be treated as empty. 
   https://github.com/apache/flink/blob/release-1.14.4/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L311-L331



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #19456:
URL: https://github.com/apache/flink/pull/19456#discussion_r850797500


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -131,12 +138,7 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
             kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
         }
 
-        // Some splits are discovered as empty when handling split additions. These splits should be
-        // added to finished splits to clean up states in split fetcher and source reader.
-        if (!emptySplits.isEmpty()) {
-            recordsBySplits.finishedSplits.addAll(emptySplits);
-            emptySplits.clear();
-        }
+        markEmptySplitsAsFinished(recordsBySplits);

Review Comment:
   We should only finish empty splits in bounded mode, right? Empty partitions may eventually contain data in streaming mode.



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -98,9 +98,16 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
         ConsumerRecords<byte[], byte[]> consumerRecords;
         try {
             consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
-        } catch (WakeupException we) {
-            return new KafkaPartitionSplitRecords(
-                    ConsumerRecords.empty(), kafkaSourceReaderMetrics);
+        } catch (WakeupException | IllegalStateException e) {
+            // IllegalStateException will be thrown if the consumer is not assigned any partitions.
+            // This happens if all assigned partitions are invalid or empty (starting offset >=

Review Comment:
   The design makes sense, but I was thinking in terms of the specified offsets initializer. Looks like it is handled properly by the enumerator in that case though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on a diff in pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

Posted by GitBox <gi...@apache.org>.
leonardBang commented on code in PR #19456:
URL: https://github.com/apache/flink/pull/19456#discussion_r892310026


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -98,9 +98,16 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
         ConsumerRecords<byte[], byte[]> consumerRecords;
         try {
             consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
-        } catch (WakeupException we) {
-            return new KafkaPartitionSplitRecords(
-                    ConsumerRecords.empty(), kafkaSourceReaderMetrics);
+        } catch (WakeupException | IllegalStateException e) {
+            // IllegalStateException will be thrown if the consumer is not assigned any partitions.
+            // This happens if all assigned partitions are invalid or empty (starting offset >=

Review Comment:
    > all assigned partitions are invalid
   
   'all' should be 'some of'? 



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java:
##########
@@ -443,13 +502,15 @@ public void invoke(PartitionAndValue value, Context context) {
                                 .add(partitionAndValue.value));
         resultPerPartition.forEach(
                 (tp, values) -> {
-                    int firstExpectedValue = Integer.parseInt(tp.substring(tp.indexOf('-') + 1));
+                    int firstExpectedValue =
+                            Integer.parseInt(tp.substring(tp.lastIndexOf('-') + 1));

Review Comment:
   we can add a note here:
   the elements of partition is an automatic integer sequence start from the partition number. 



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java:
##########
@@ -309,6 +309,65 @@ public void processElement(
                             });
             env.execute();
         }
+
+        @Test
+        public void testConsumingEmptyTopic() throws Throwable {
+            String emptyTopic = "emptyTopic-" + UUID.randomUUID();
+            KafkaSourceTestEnv.createTestTopic(emptyTopic, 3, 1);
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setTopics(emptyTopic)
+                            .setGroupId("empty-topic-test")
+                            .setDeserializer(new TestingKafkaRecordDeserializationSchema(false))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
+            try (CloseableIterator<PartitionAndValue> iterator =
+                    env.fromSource(
+                                    source,
+                                    WatermarkStrategy.noWatermarks(),
+                                    "testConsumingEmptyTopic")
+                            .executeAndCollect()) {
+                assertThat(iterator.hasNext()).isFalse();
+            }
+        }
+
+        @Test
+        public void testConsumingTopicWithEmptyPartitions() throws Throwable {
+            String topicWithEmptyPartitions = "topicWithEmptyPartitions-" + UUID.randomUUID();
+            KafkaSourceTestEnv.createTestTopic(
+                    topicWithEmptyPartitions, KafkaSourceTestEnv.NUM_PARTITIONS, 1);
+            List<ProducerRecord<String, Integer>> records =
+                    KafkaSourceTestEnv.getRecordsForTopicWithoutTimestamp(topicWithEmptyPartitions);
+            // Only keep records in partition 5
+            int partitionWithRecords = 5;
+            records.removeIf(record -> record.partition() != partitionWithRecords);
+            KafkaSourceTestEnv.produceToKafka(records);
+            KafkaSourceTestEnv.setupEarliestOffsets(
+                    Collections.singletonList(
+                            new TopicPartition(topicWithEmptyPartitions, partitionWithRecords)));
+
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setTopics(topicWithEmptyPartitions)
+                            .setGroupId("topic-with-empty-partition-test")
+                            .setDeserializer(new TestingKafkaRecordDeserializationSchema(false))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(2);

Review Comment:
   Could we use default parallelism?
   What's the purpose to set different  parallelism in these test cases? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #19456:
URL: https://github.com/apache/flink/pull/19456#discussion_r894256191


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java:
##########
@@ -309,6 +309,65 @@ public void processElement(
                             });
             env.execute();
         }
+
+        @Test
+        public void testConsumingEmptyTopic() throws Throwable {
+            String emptyTopic = "emptyTopic-" + UUID.randomUUID();
+            KafkaSourceTestEnv.createTestTopic(emptyTopic, 3, 1);
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setTopics(emptyTopic)
+                            .setGroupId("empty-topic-test")
+                            .setDeserializer(new TestingKafkaRecordDeserializationSchema(false))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
+            try (CloseableIterator<PartitionAndValue> iterator =
+                    env.fromSource(
+                                    source,
+                                    WatermarkStrategy.noWatermarks(),
+                                    "testConsumingEmptyTopic")
+                            .executeAndCollect()) {
+                assertThat(iterator.hasNext()).isFalse();
+            }
+        }
+
+        @Test
+        public void testConsumingTopicWithEmptyPartitions() throws Throwable {
+            String topicWithEmptyPartitions = "topicWithEmptyPartitions-" + UUID.randomUUID();
+            KafkaSourceTestEnv.createTestTopic(
+                    topicWithEmptyPartitions, KafkaSourceTestEnv.NUM_PARTITIONS, 1);
+            List<ProducerRecord<String, Integer>> records =
+                    KafkaSourceTestEnv.getRecordsForTopicWithoutTimestamp(topicWithEmptyPartitions);
+            // Only keep records in partition 5
+            int partitionWithRecords = 5;
+            records.removeIf(record -> record.partition() != partitionWithRecords);
+            KafkaSourceTestEnv.produceToKafka(records);
+            KafkaSourceTestEnv.setupEarliestOffsets(
+                    Collections.singletonList(
+                            new TopicPartition(topicWithEmptyPartitions, partitionWithRecords)));
+
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setTopics(topicWithEmptyPartitions)
+                            .setGroupId("topic-with-empty-partition-test")
+                            .setDeserializer(new TestingKafkaRecordDeserializationSchema(false))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(2);

Review Comment:
   I'm afraid the default parallelism will be quite large on CI (# CPU cores) and occupy too much resources. Setting parallelism to 2 is enough to reproduce the issue. 
   
   This case is for testing the case that only some partitions are empty, so that readers assigned with empty partitions can be closed without exception and other readers can process messages as expected. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #19456: [FLINK-27041][connector/kafka] Catch IllegalStateException in KafkaPartitionSplitReader.fetch() to handle no valid partition case

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #19456:
URL: https://github.com/apache/flink/pull/19456#discussion_r850166745


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -98,9 +98,16 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
         ConsumerRecords<byte[], byte[]> consumerRecords;
         try {
             consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
-        } catch (WakeupException we) {
-            return new KafkaPartitionSplitRecords(
-                    ConsumerRecords.empty(), kafkaSourceReaderMetrics);
+        } catch (WakeupException | IllegalStateException e) {
+            // IllegalStateException will be thrown if the consumer is not assigned any partitions.
+            // This happens if all assigned partitions are invalid or empty (starting offset >=

Review Comment:
   Thanks for the review! I think it's by-design that some kinds of starting / stopping offsets like earliest, latest and committed are handled on `SourceReader` (see `ReaderHandledOffsetsInitializer`) to avoid some race conditions, like the earliest offset changes because of log deletion when split is delivered from enumerator on JM to reader on TM. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org