You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "nbali (via GitHub)" <gi...@apache.org> on 2023/03/24 06:36:56 UTC

[GitHub] [beam] nbali opened a new issue, #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

nbali opened a new issue, #25962:
URL: https://github.com/apache/beam/issues/25962

   ### What happened?
   
   Scenario:
   Batch Kafka ingestion (`.withStartReadTime()`, `.withStopReadTime()`, experiments: `beam_fn_api`, `unsafely_attempt_to_process_unbounded_data_in_batch_mode`, `shuffle_mode=appliance`) reading a few billions / few hundred gb of kafka records from a few hundred ktable topic partitions. Some of these partitions has a tendency to get stuck.
   
   The issue started happening with 2.45 (still do with 2.46), so my original idea was that there must have been some change introduced in 2.45 that caused this. ... but after some investigation my guess is that previously it was just hidden/supressed, but the issue already existed and some change only revealed/emphasized it.
   
   So I think the problematic part is this: https://github.com/apache/beam/blob/ddae966f3346fbe247486324cbf8a8a532895316/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L383-L388
   
   What causes the issue is that the `consumer.poll` can return `ConsumerRecords` that contains no `KafkaRecord` - so we assume we have reached the end - when there are still remaining records to be consumed. This happens because if you check the kafka consumer implementation (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1259-L1276 and https://github.com/apache/kafka/blob/f79c2a6e04129832eacec60bbc180173ecc37549/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java#L102-L104 ) you can see that the `poll()` will consider a `Fetch` "successful" if the "position has advanced" even if the `numRecords` is 0.
   
   I'm not completely sure this is the reason, but totally seems plausible to me:
   So lets assume we have KTable kafka topics with key compaction turned on. Lets assume they exist for a long time, and has a very huge compaction rate (only a few percent of the offsets contain a record.) It is not hard to imagine there are huge ranges of offsets that has no records. Well this is the case with our topics.
   
   So what I see is that it tries to poll, gets back an "empty" `ConsumerRecords` (0 record, but `positionAdvanced` is true), and returns with a `ProcessContinuation.resume()`. This keeps repeating forever.
   
   The loop happens because, we stop the current consumption with a return, and the next `processElement()` will `seek()` the consumer to the `startOffset` (aka `tracker.currentRestriction().getFrom()`  https://github.com/apache/beam/blob/ddae966f3346fbe247486324cbf8a8a532895316/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L376 ), but in the previous `processElement()` we didn't notify the tracker of the progress at all, so the `startOffset` stays the same.
   
   Essentially what was fixed with the watermark for streaming jobs with https://github.com/apache/beam/pull/24205 has to be also fixed for the tracker/restriction/offset.
   
   In order to solve this we have to know if the "empty" `ConsumerRecord` is really at the end of the stream, or it's just at a "gap".
   
   So what I came up with as some possible solutions:
   1. Contribute to [Kafka](https://github.com/apache/kafka) so the returned `ConsumerRecords` contains some distinction between the two types of "empty". (Essentially representing `positionAdvanced`, or possible the consumer position.) - _No idea if they will accept it, and even if they do it's going to take the longest to include in beam_
   2. Compare `consumer.position()` before and after `consumer.poll()` to figure out if we have "advanced" or not. - _It can be used to detect the advancement, but not sure if the position of the consumer always correlates with the end of the returned range or it might differ so if it can be used to update the tracker or not_
   3. Compare the returned `ConsumerRecords` to `ConsumerRecords.EMPTY` - _I would say this is ugly as it's an implementation detail of the Kafka library, but we can test if they change it with unit tests_
   4. Check if `((HasProgress) tracker).getProgress().getWorkRemaining()` is positive. - _I'm not sure how up-to-date that information is, but it can certainly indicate if we are in a huge gap in the middle of the processing._
   5. Check if this is a BOUNDED or UNBOUNDED consumption as batch pipelines should return with a `.stop()` when they reach the end of the range anyway. - _This will fix this issue only when consuming kafka in batch pipelines, but this could most likely happen with streaming pipelines too if they use the same SDF implementation.
   
   Once we know what case we encountered, we can either continue the `while (true)` loop and `poll` the next batch, or if we have the position/offset, we can update the tracker and return with `.resume()`.
   
   FYI increasing batch/poll size by increasing `ConsumerConfig.FETCH_MAX_BYTES_CONFIG`, `ConsumerConfig.MAX_POLL_RECORDS_CONFIG`, etc alleviates the issue, but doesn't fix it completely.
   
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1497921558

   Thank you for this. I agree this is a breaking change.
   
   I'd rather not do a failfast. I think 2 or 3 are the least intrusive, as we can then remove it when there is a fix in kafka. I prefer 2, but I'm also not sure about the gaurentees provided by consumer.position()


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1497956296

   Well, if we won't explicitly forbid using certain kafka versions with a failfast, that means we have to support even the currently "bugged" versions. That means "_as we can then remove it when there is a fix in kafka_" will never happen. So if the goal is to have an implementation that supports pre-3.2.0, the current bugged versions, the possibly fixed versions, and pretty much anything that might be changed by kafka implementation details then I would say none of the 5 proposal works perfectly.
   
   I would say the most generic solution would be to call the `consumer.poll(KAFKA_POLL_TIMEOUT)` in a loop as well, and break the loop only if we received a `rawRecords` that isn't empty OR the `KAFKA_POLL_TIMEOUT` is over. That essentially simulates pre-3.2.0 behaviour.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1487941813

   https://issues.apache.org/jira/browse/KAFKA-14865


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1489294702

   Thanks so much for the detailed investigation.
   
   > Essentially what was fixed with the watermark for streaming jobs with #24205 has to be also fixed for the tracker/restriction/offset.
   
   CC: @johnjcasey who did the fix for the watermark. A fix for the tracker would hopefully resolve this long standing issue.
   
   
   > the generated jar and every dependency tool output if I deploy it with 2.44 it still uses 'org.apache.kafka:kafka-clients:2.4.1'
   
   kafka client is a provided version for Beam: https://github.com/apache/beam/blob/9c614557c51ad55230211f864e70f48ad0914326/sdks/java/io/kafka/build.gradle#L68
   
   should be able to assign different versions. Not sure why it still picked the default 2.4.1
   
   Also, this reminds me that kafka version is also important for KafkaIO issue. We saw incoming issues that did not capture by our tests. We may want to update the default kafka version also.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] gabrywu commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "gabrywu (via GitHub)" <gi...@apache.org>.
gabrywu commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1709376989

   #26142 is not able to close this issue. It's still there


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] gabrywu commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "gabrywu (via GitHub)" <gi...@apache.org>.
gabrywu commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1672379194

   encounter the same issue!!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey closed issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey closed issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions
URL: https://github.com/apache/beam/issues/25962


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions [beam]

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1755675738

   Yes, please create a new issue. This existing issue was fixed, even if the symptoms are similar


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1498120210

   @johnjcasey https://github.com/apache/beam/pull/26142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] gabrywu commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "gabrywu (via GitHub)" <gi...@apache.org>.
gabrywu commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1711392689

   > @gabrywu isn't it stuck in a different way?
   
   need more investigation on it. if a flink task manager OOM, it's stuck when the task restart


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1709380086

   @gabrywu isn't it stuck in a different way? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1486178715

   I just noticed that despite having `'org.apache.kafka:kafka-clients:3.2.0'` in the POM, the generated jar and every dependency tool output if I deploy it with `2.44` it still uses `'org.apache.kafka:kafka-clients:2.4.1'`, meanwhile `2.45` uses `3.2.0` as expected. I'm not really sure why.
   
   Anyway as I have modified the `3.2.0` to `2.4.1` the issue stopped happening in newer versions as well.
   
   It turns out these changes were introduced in `3.2.0` due to https://issues.apache.org/jira/browse/KAFKA-12980
   https://github.com/apache/kafka/blob/3.1.2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1238-L1251
   vs
   https://github.com/apache/kafka/blob/3.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1242-L1260
   
   This is a breaking change for `beam`. Either the implementation has to follow-up on the changed consumer implementation (basically how I described before), or it should fail-fast with kafka consumers with version>`3.2.0`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1489509210

   I'm aware that it is `provided`, but I can't find any difference that causes the different behaviour between 2.44 and 2.45, and why would 2.45 use my custom version from my pom.xml, and 2.44 would use 2.4.1 no matter what I do. Anyway I quickly checked 2.43/2.44/2.45, and it seems that when I launch the flex template all 3 shows `Kafka version: 3.2.0` in the `Job logs` and 2.45 shows that in the `Worker logs` too, but both 2.43 and 2.44 shows `Kafka version: 2.4.1`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #25962: [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1498006075

   True. I hadn't thought through how my statements contradicted each other.
   
   I think doing the pre 3.2.0 simulation is a good short term fix, though I'm beginning to believe we will eventually want to stop supporting older Kafka versions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions [beam]

Posted by "gabrywu (via GitHub)" <gi...@apache.org>.
gabrywu commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1754763248

   @nbali our kafka reader stuck after this error
   `org.apache.kafka.clients.FetchSessionHandler                 [] - [Consumer clientId=xxxconsumer-1, groupId=xxxconsumer] Error sending fetch request (sessionId=1286546196, epoch=546094) to node 4:
   org.apache.kafka.common.errors.DisconnectException: null`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Java SDF-based Kafka ingestion can be stuck at certain offsets in certain conditions [beam]

Posted by "gabrywu (via GitHub)" <gi...@apache.org>.
gabrywu commented on issue #25962:
URL: https://github.com/apache/beam/issues/25962#issuecomment-1754765036

   @johnjcasey should I create a new issue if the issue still there?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org