You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/29 17:58:33 UTC

[GitHub] [beam] kennknowles commented on a diff in pull request #24205: Update Kafka watermark based on policy when records poll empty

kennknowles commented on code in PR #24205:
URL: https://github.com/apache/beam/pull/24205#discussion_r1035087107


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -545,6 +547,68 @@ private PipelineResult runWithStopReadingFn(
     return readResult;
   }
 
+  @Test
+  public void testWatermarkUpdateWithSparseMessages() throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+
+    String topicName = "SparseDataTopicPartition-" + UUID.randomUUID();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < 5; i++) {
+      records.put(i, String.valueOf(i));
+    }
+
+    try {
+      client.createTopics(ImmutableSet.of(new NewTopic(topicName, 1, (short) 1)));
+
+      writePipeline
+          .apply("Generate Write Elements", Create.of(records))
+          .apply(
+              "Write to Kafka",
+              KafkaIO.<Integer, String>write()
+                  .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                  .withTopic(topicName)
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(StringSerializer.class));
+
+      writePipeline.run().waitUntilFinish(Duration.standardSeconds(15));
+
+      client.createPartitions(ImmutableMap.of(topicName, NewPartitions.increaseTo(3)));
+
+      PCollection<String> values =
+          sdfReadPipeline
+              .apply(
+                  "Read from Kafka",
+                  KafkaIO.<Integer, String>read()
+                      .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                      .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"))
+                      .withTopic(topicName)
+                      .withKeyDeserializer(IntegerDeserializer.class)
+                      .withValueDeserializer(StringDeserializer.class)
+                      .withoutMetadata())
+              .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
+              .apply("GroupKey", GroupByKey.create())
+              .apply("GetValues", Values.create())
+              .apply(
+                  "Flatten",
+                  FlatMapElements.into(TypeDescriptor.of(String.class)).via(iterable -> iterable));
+
+      PAssert.that(values).containsInAnyOrder("0", "1", "2", "3", "4");
+
+      PipelineResult readResult = sdfReadPipeline.run();
+
+      PipelineResult.State readState =
+          readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout() / 2));

Review Comment:
   Does this test fail before your change? I would expect all the output to be produced based on `waitToFinish`. Is the set up that you only wait half as long as the read, so you are depending on the timestamp policy to advance the watermark? In this case, it would be good to have some assertion or way of making sure you don't get spurious success.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org