You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "nbali (via GitHub)" <gi...@apache.org> on 2023/04/05 20:35:44 UTC

[GitHub] [beam] nbali opened a new pull request, #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

nbali opened a new pull request, #26142:
URL: https://github.com/apache/beam/pull/26142

   fixes #25962
   
   **Please** add a meaningful description for your change here
   This fix simulate the behaviour of 'org.apache.kafka:kafka-clients` prior to `3.2.0` even if a newer version is being used.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - ~[ ] Update `CHANGES.md` with noteworthy changes.~
    - ~[ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).~
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #26142:
URL: https://github.com/apache/beam/pull/26142#discussion_r1164195084


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -424,6 +425,29 @@ public ProcessContinuation processElement(
     }
   }
 
+  // see https://github.com/apache/beam/issues/25962
+  private ConsumerRecords<byte[], byte[]> poll(
+      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+    final Stopwatch sw = Stopwatch.createStarted();
+    long previousPosition = -1;
+    while (true) {
+      final ConsumerRecords<byte[], byte[]> rawRecords =
+          consumer.poll(KAFKA_POLL_TIMEOUT.minus(sw.elapsed()));

Review Comment:
   Based on a test failure, it looks like this can get negative, which we would need to avoid



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on code in PR #26142:
URL: https://github.com/apache/beam/pull/26142#discussion_r1159017618


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -424,6 +425,29 @@ public ProcessContinuation processElement(
     }
   }
 
+  // see https://github.com/apache/beam/issues/25962
+  private ConsumerRecords<byte[], byte[]> poll(
+      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+    final Stopwatch sw = Stopwatch.createStarted();
+    long previousPosition = -1;
+    while (true) {
+      final ConsumerRecords<byte[], byte[]> rawRecords =
+          consumer.poll(KAFKA_POLL_TIMEOUT.minus(sw.elapsed()));
+      if (!rawRecords.isEmpty()) {
+        // return as we have found some entries
+        return rawRecords;
+      }
+      if (previousPosition == (previousPosition = consumer.position(topicPartition))) {

Review Comment:
   (end of stream -> no offset change possible)
   pre-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   post-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   
   (big gap -> timeout isn't enough to iterate over it)
   pre-3.2.0+big gap -> poll should block for timeout already, return empty, *timeout check should return*
   post-3.2.0+big gap -> poll return empty immediately, *every check fails*
   
   (small gap -> timeout is enough to iterate over it)
   pre-3.2.0+small gap -> poll blocks until it returns a non-empty result, *non-empty check should return*
   post-3.2.0+small gap -> poll return empty immediately, *every check fails*
   
   So _theoretically_ there isn't any scenario where the position check triggers the return, but nothing else.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on PR #26142:
URL: https://github.com/apache/beam/pull/26142#issuecomment-1542586976

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on PR #26142:
URL: https://github.com/apache/beam/pull/26142#issuecomment-1541026638

   .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on code in PR #26142:
URL: https://github.com/apache/beam/pull/26142#discussion_r1159017618


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -424,6 +425,29 @@ public ProcessContinuation processElement(
     }
   }
 
+  // see https://github.com/apache/beam/issues/25962
+  private ConsumerRecords<byte[], byte[]> poll(
+      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+    final Stopwatch sw = Stopwatch.createStarted();
+    long previousPosition = -1;
+    while (true) {
+      final ConsumerRecords<byte[], byte[]> rawRecords =
+          consumer.poll(KAFKA_POLL_TIMEOUT.minus(sw.elapsed()));
+      if (!rawRecords.isEmpty()) {
+        // return as we have found some entries
+        return rawRecords;
+      }
+      if (previousPosition == (previousPosition = consumer.position(topicPartition))) {

Review Comment:
   (end of stream -> no offset change possible)
   pre-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   post-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   
   (big gap -> timeout isn't enough to iterate over it)
   pre-3.2.0+big gap -> poll should block for timeout already, return empty, *timeout check should return*
   post-3.2.0+big gap -> poll returns empty immediately, *every check fails*
   
   (small gap -> timeout is enough to iterate over it)
   pre-3.2.0+small gap -> poll blocks until it returns a non-empty result, *non-empty check should return*
   post-3.2.0+small gap -> poll returns empty immediately, *every check fails*
   
   So _theoretically_ there isn't any scenario where the position check triggers the return, but nothing else.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #26142:
URL: https://github.com/apache/beam/pull/26142#discussion_r1164193380


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -424,6 +425,29 @@ public ProcessContinuation processElement(
     }
   }
 
+  // see https://github.com/apache/beam/issues/25962
+  private ConsumerRecords<byte[], byte[]> poll(
+      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+    final Stopwatch sw = Stopwatch.createStarted();
+    long previousPosition = -1;
+    while (true) {
+      final ConsumerRecords<byte[], byte[]> rawRecords =
+          consumer.poll(KAFKA_POLL_TIMEOUT.minus(sw.elapsed()));
+      if (!rawRecords.isEmpty()) {
+        // return as we have found some entries
+        return rawRecords;
+      }
+      if (previousPosition == (previousPosition = consumer.position(topicPartition))) {

Review Comment:
   I'm ok either way here. I don't think we need this, based on your analysis, but I don't think it hurts performance enough either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on code in PR #26142:
URL: https://github.com/apache/beam/pull/26142#discussion_r1183013252


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -424,6 +425,29 @@ public ProcessContinuation processElement(
     }
   }
 
+  // see https://github.com/apache/beam/issues/25962
+  private ConsumerRecords<byte[], byte[]> poll(
+      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+    final Stopwatch sw = Stopwatch.createStarted();
+    long previousPosition = -1;
+    while (true) {
+      final ConsumerRecords<byte[], byte[]> rawRecords =
+          consumer.poll(KAFKA_POLL_TIMEOUT.minus(sw.elapsed()));

Review Comment:
   Should be fixed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on code in PR #26142:
URL: https://github.com/apache/beam/pull/26142#discussion_r1159000162


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -424,6 +425,29 @@ public ProcessContinuation processElement(
     }
   }
 
+  // see https://github.com/apache/beam/issues/25962
+  private ConsumerRecords<byte[], byte[]> poll(
+      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+    final Stopwatch sw = Stopwatch.createStarted();
+    long previousPosition = -1;
+    while (true) {
+      final ConsumerRecords<byte[], byte[]> rawRecords =
+          consumer.poll(KAFKA_POLL_TIMEOUT.minus(sw.elapsed()));
+      if (!rawRecords.isEmpty()) {
+        // return as we have found some entries
+        return rawRecords;
+      }
+      if (previousPosition == (previousPosition = consumer.position(topicPartition))) {

Review Comment:
   I'm not completely sure we need this. There is a pretty high chance it works flawlessly even without this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on code in PR #26142:
URL: https://github.com/apache/beam/pull/26142#discussion_r1159017618


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -424,6 +425,29 @@ public ProcessContinuation processElement(
     }
   }
 
+  // see https://github.com/apache/beam/issues/25962
+  private ConsumerRecords<byte[], byte[]> poll(
+      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+    final Stopwatch sw = Stopwatch.createStarted();
+    long previousPosition = -1;
+    while (true) {
+      final ConsumerRecords<byte[], byte[]> rawRecords =
+          consumer.poll(KAFKA_POLL_TIMEOUT.minus(sw.elapsed()));
+      if (!rawRecords.isEmpty()) {
+        // return as we have found some entries
+        return rawRecords;
+      }
+      if (previousPosition == (previousPosition = consumer.position(topicPartition))) {

Review Comment:
   (end of stream -> no offset change possible)
   pre-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   post-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   
   (big gap -> timeout isn't enough to iterate over it)
   pre-3.2.0+big gap -> poll should block for timeout already, return empty, *timeout check should return*
   post-3.2.0+big gap -> poll returns empty immediately, *every check fails, but after multiple loops the timeout check should return*
   
   (small gap -> timeout is enough to iterate over it)
   pre-3.2.0+small gap -> poll blocks until it returns a non-empty result, *non-empty check should return*
   post-3.2.0+small gap -> poll returns empty immediately, *every check fails, but after multiple loops the non-empty check should return*
   
   So _theoretically_ there isn't any scenario where the position check triggers the return, but nothing else.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26142:
URL: https://github.com/apache/beam/pull/26142#issuecomment-1516223819

   Reminder, please take a look at this pr: @kennknowles @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on code in PR #26142:
URL: https://github.com/apache/beam/pull/26142#discussion_r1159017618


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -424,6 +425,29 @@ public ProcessContinuation processElement(
     }
   }
 
+  // see https://github.com/apache/beam/issues/25962
+  private ConsumerRecords<byte[], byte[]> poll(
+      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+    final Stopwatch sw = Stopwatch.createStarted();
+    long previousPosition = -1;
+    while (true) {
+      final ConsumerRecords<byte[], byte[]> rawRecords =
+          consumer.poll(KAFKA_POLL_TIMEOUT.minus(sw.elapsed()));
+      if (!rawRecords.isEmpty()) {
+        // return as we have found some entries
+        return rawRecords;
+      }
+      if (previousPosition == (previousPosition = consumer.position(topicPartition))) {

Review Comment:
   (end of stream -> no offset change possible)
   pre-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   post-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   
   (big gap -> timeout isn't enough to iterate over it)
   pre-3.2.0+big gap -> poll should block for timeout already, return empty, *timeout check should return*
   post-3.2.0+big gap -> poll returns empty immediately, *every check fails, but after multiple loops the timeout check should return*
   
   (small gap -> timeout is enough to iterate over it)
   pre-3.2.0+small gap -> poll blocks until it returns a non-empty result, *non-empty check should return*
   post-3.2.0+small gap -> poll returns empty immediately, *every check fails*
   
   So _theoretically_ there isn't any scenario where the position check triggers the return, but nothing else.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26142:
URL: https://github.com/apache/beam/pull/26142#issuecomment-1498161013

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   R: @chamikaramj for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on code in PR #26142:
URL: https://github.com/apache/beam/pull/26142#discussion_r1159017618


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -424,6 +425,29 @@ public ProcessContinuation processElement(
     }
   }
 
+  // see https://github.com/apache/beam/issues/25962
+  private ConsumerRecords<byte[], byte[]> poll(
+      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+    final Stopwatch sw = Stopwatch.createStarted();
+    long previousPosition = -1;
+    while (true) {
+      final ConsumerRecords<byte[], byte[]> rawRecords =
+          consumer.poll(KAFKA_POLL_TIMEOUT.minus(sw.elapsed()));
+      if (!rawRecords.isEmpty()) {
+        // return as we have found some entries
+        return rawRecords;
+      }
+      if (previousPosition == (previousPosition = consumer.position(topicPartition))) {

Review Comment:
   (end of stream -> no offset change possible)
   pre-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   post-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   
   (big gap -> timeout isn't enough to iterate over it)
   pre-3.2.0+big gap -> poll should block for timeout already, return empty, *timeout check should return*
   post-3.2.0+big gap -> poll returns empty immediately, *every check fails, but after multiple loops the timeout check should return*
   
   (small gap -> timeout is enough to iterate over it)
   pre-3.2.0+small gap -> poll blocks until it returns a non-empty result, *non-empty check should return*
   post-3.2.0+small gap -> poll returns empty immediately, *every check fails, but after multiple loops the non-empty check should return*
   
   So _theoretically_ there isn't any scenario where the position check triggers the return, but nothing else in the very same loop.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on code in PR #26142:
URL: https://github.com/apache/beam/pull/26142#discussion_r1159017618


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -424,6 +425,29 @@ public ProcessContinuation processElement(
     }
   }
 
+  // see https://github.com/apache/beam/issues/25962
+  private ConsumerRecords<byte[], byte[]> poll(
+      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+    final Stopwatch sw = Stopwatch.createStarted();
+    long previousPosition = -1;
+    while (true) {
+      final ConsumerRecords<byte[], byte[]> rawRecords =
+          consumer.poll(KAFKA_POLL_TIMEOUT.minus(sw.elapsed()));
+      if (!rawRecords.isEmpty()) {
+        // return as we have found some entries
+        return rawRecords;
+      }
+      if (previousPosition == (previousPosition = consumer.position(topicPartition))) {

Review Comment:
   (end of stream -> no offset change possible)
   pre-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   post-3.2.0+EOS -> poll should block for timeout already, return empty, *both position and timeout check should return*
   
   (small gap -> timeout is enough to iterate over it)
   pre-3.2.0+small gap -> poll blocks until it returns a non-empty result, *non-empty check should return*
   post-3.2.0+small gap -> poll return empty immediately, *every check fails*
   
   (big gap -> timeout isn't enough to iterate over it)
   pre-3.2.0+big gap -> poll should block for timeout already, return empty, *timeout check should return*
   post-3.2.0+big gap -> poll return empty immediately, *every check fails*
   
   So _theoretically_ there isn't any scenario where the position check triggers the return, but nothing else.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #26142:
URL: https://github.com/apache/beam/pull/26142#issuecomment-1500342302

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #26142:
URL: https://github.com/apache/beam/pull/26142#issuecomment-1516465579

   waiting on author


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey merged pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey merged PR #26142:
URL: https://github.com/apache/beam/pull/26142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "nbali (via GitHub)" <gi...@apache.org>.
nbali commented on PR #26142:
URL: https://github.com/apache/beam/pull/26142#issuecomment-1504112799

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #26142: #25962 Fix for Java SDF-based Kafka ingestion being stuck at certain offsets in certain conditions

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on PR #26142:
URL: https://github.com/apache/beam/pull/26142#issuecomment-1505349657

   Structurally, this looks good to me. There are some test failures, and I'll defer to your judgement on whether we need the extra checking block or not


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org