You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/16 19:50:03 UTC

[GitHub] [beam] johnjcasey opened a new pull request, #24205: Update watermark based on policy when records poll empty, to ensure w…

johnjcasey opened a new pull request, #24205:
URL: https://github.com/apache/beam/pull/24205

   …atermark can advance even on empty partitions
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey merged pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey merged PR #24205:
URL: https://github.com/apache/beam/pull/24205


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1355939620

   @kennknowles this is ready for review again. The three pending checks are good on jenkins, but that doesn't seem to be getting back to github


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1366805833

   run Java_Examples_Dataflow precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1366805693

   Run Java_Examples_Dataflow_Java17 precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1324212684

   Run Java Precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #24205:
URL: https://github.com/apache/beam/pull/24205#discussion_r1030629314


##########
sdks/java/io/kafka/build.gradle:
##########
@@ -148,6 +148,7 @@ kafkaVersions.each {kv ->
       if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*DynamicPartitions" //admin client create partitions does not exist in kafka 0.11.0.3 and kafka sdf does not appear to work for kafka versions <2.0.1
       if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*SDFResumesCorrectly" //Kafka SDF does not work for kafka versions <2.0.1
       if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*StopReadingFunction" //Kafka SDF does not work for kafka versions <2.0.1
+      if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*WatermarkUpdateWithSparseMessages" //Kafka SDF does not work for kafka versions <2.0.1

Review Comment:
   minor. since there are multiple exclude with same condition.
   ```
   if (!(kv.key in sdfKafkaVersions)) {
     excludeTestsMatching ...
     excludeTestsMatching ...
   }
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -430,6 +429,19 @@ public ProcessContinuation processElement(
     }
   }
 
+  private TimestampPolicyContext updateWatermark(

Review Comment:
   consider naming 'updateWatermarkManually' ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1353730060

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1324224385

   r: @Abacn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1366805534

   Run Spotless Precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #24205:
URL: https://github.com/apache/beam/pull/24205#discussion_r1049782192


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -545,6 +547,68 @@ private PipelineResult runWithStopReadingFn(
     return readResult;
   }
 
+  @Test
+  public void testWatermarkUpdateWithSparseMessages() throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+
+    String topicName = "SparseDataTopicPartition-" + UUID.randomUUID();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < 5; i++) {
+      records.put(i, String.valueOf(i));
+    }
+
+    try {
+      client.createTopics(ImmutableSet.of(new NewTopic(topicName, 1, (short) 1)));
+
+      writePipeline
+          .apply("Generate Write Elements", Create.of(records))
+          .apply(
+              "Write to Kafka",
+              KafkaIO.<Integer, String>write()
+                  .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                  .withTopic(topicName)
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(StringSerializer.class));
+
+      writePipeline.run().waitUntilFinish(Duration.standardSeconds(15));
+
+      client.createPartitions(ImmutableMap.of(topicName, NewPartitions.increaseTo(3)));
+
+      PCollection<String> values =
+          sdfReadPipeline
+              .apply(
+                  "Read from Kafka",
+                  KafkaIO.<Integer, String>read()
+                      .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                      .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"))
+                      .withTopic(topicName)
+                      .withKeyDeserializer(IntegerDeserializer.class)
+                      .withValueDeserializer(StringDeserializer.class)
+                      .withoutMetadata())
+              .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
+              .apply("GroupKey", GroupByKey.create())
+              .apply("GetValues", Values.create())
+              .apply(
+                  "Flatten",
+                  FlatMapElements.into(TypeDescriptor.of(String.class)).via(iterable -> iterable));
+
+      PAssert.that(values).containsInAnyOrder("0", "1", "2", "3", "4");
+
+      PipelineResult readResult = sdfReadPipeline.run();
+
+      PipelineResult.State readState =
+          readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout() / 2));

Review Comment:
   No, the /2 is a copy paste from another test case. I hadn't thought about the wait to finish forcing advances, but that makes sense. I need to rework this test case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1324165757

   run java precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
kennknowles commented on code in PR #24205:
URL: https://github.com/apache/beam/pull/24205#discussion_r1035087107


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -545,6 +547,68 @@ private PipelineResult runWithStopReadingFn(
     return readResult;
   }
 
+  @Test
+  public void testWatermarkUpdateWithSparseMessages() throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+
+    String topicName = "SparseDataTopicPartition-" + UUID.randomUUID();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < 5; i++) {
+      records.put(i, String.valueOf(i));
+    }
+
+    try {
+      client.createTopics(ImmutableSet.of(new NewTopic(topicName, 1, (short) 1)));
+
+      writePipeline
+          .apply("Generate Write Elements", Create.of(records))
+          .apply(
+              "Write to Kafka",
+              KafkaIO.<Integer, String>write()
+                  .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                  .withTopic(topicName)
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(StringSerializer.class));
+
+      writePipeline.run().waitUntilFinish(Duration.standardSeconds(15));
+
+      client.createPartitions(ImmutableMap.of(topicName, NewPartitions.increaseTo(3)));
+
+      PCollection<String> values =
+          sdfReadPipeline
+              .apply(
+                  "Read from Kafka",
+                  KafkaIO.<Integer, String>read()
+                      .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                      .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"))
+                      .withTopic(topicName)
+                      .withKeyDeserializer(IntegerDeserializer.class)
+                      .withValueDeserializer(StringDeserializer.class)
+                      .withoutMetadata())
+              .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
+              .apply("GroupKey", GroupByKey.create())
+              .apply("GetValues", Values.create())
+              .apply(
+                  "Flatten",
+                  FlatMapElements.into(TypeDescriptor.of(String.class)).via(iterable -> iterable));
+
+      PAssert.that(values).containsInAnyOrder("0", "1", "2", "3", "4");
+
+      PipelineResult readResult = sdfReadPipeline.run();
+
+      PipelineResult.State readState =
+          readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout() / 2));

Review Comment:
   Does this test fail before your change? I would expect all the output to be produced based on `waitToFinish`. Is the set up that you only wait half as long as the read, so you are depending on the timestamp policy to advance the watermark? In this case, it would be good to have some assertion or way of making sure you don't get spurious success.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1324249986

   r: @klk for watermark stuff


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1353730546

   Run Spotless PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1376165391

   agreed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1353730281

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1353730910

   Run Whitespace PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1353257737

   closes #22809


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1324225268

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1324212898

   Test failures in recent runs have been flakes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1324149852

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @lukecwik for label java.
   R: @chamikaramj for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #24205: Update Kafka watermark based on policy when records poll empty

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on PR #24205:
URL: https://github.com/apache/beam/pull/24205#issuecomment-1324250318

   r: @kennknowles for watermark stuff


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org