You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/27 13:35:29 UTC

[GitHub] [beam] johnjcasey commented on a diff in pull request #22450: 21730 fix offset resetting

johnjcasey commented on code in PR #22450:
URL: https://github.com/apache/beam/pull/22450#discussion_r931071851


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java:
##########
@@ -239,6 +252,94 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException {
     }
   }
 
+  // Because of existing limitations in streaming testing, this is verified via a combination of
+  // DoFns.  CrashOnExtra will throw an exception if we see any extra records beyond those we
+  // expect, and LogFn acts as a sink we can inspect using ExpectedLogs to verify that we got all
+  // those we expect.
+  @Test
+  public void testKafkaIOSDFResumesCorrectly() throws IOException {
+    roundtripElements("first-pass", 4, writePipeline, sdfReadPipeline);
+    roundtripElements("second-pass", 3, writePipeline2, sdfReadPipeline2);
+  }
+
+  private void roundtripElements(
+      String recordPrefix, Integer recordCount, TestPipeline wPipeline, TestPipeline rPipeline)
+      throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+    client.listTopics();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < recordCount; i++) {
+      records.put(i, recordPrefix + "-" + i);
+    }
+
+    wPipeline
+        .apply("Generate Write Elements", Create.of(records))
+        .apply(
+            "Write to Kafka",
+            KafkaIO.<Integer, String>write()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeySerializer(IntegerSerializer.class)
+                .withValueSerializer(StringSerializer.class));
+
+    wPipeline.run().waitUntilFinish(Duration.standardSeconds(10));
+
+    rPipeline
+        .apply(
+            "Read from Kafka",
+            KafkaIO.<Integer, String>read()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withConsumerConfigUpdates(
+                    ImmutableMap.of(
+                        "group.id",
+                        "resuming-group",
+                        "auto.offset.reset",
+                        "earliest",
+                        "enable.auto.commit",
+                        "true"))
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeyDeserializer(IntegerDeserializer.class)
+                .withValueDeserializer(StringDeserializer.class)
+                .withoutMetadata())
+        .apply("Get Values", Values.create())
+        .apply(ParDo.of(new CrashOnExtra(records.values())))
+        .apply(ParDo.of(new LogFn()));
+
+    rPipeline.run().waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    for (String value : records.values()) {
+      kafkaIOITExpectedLogs.verifyError(value);
+    }
+  }
+
+  public static class CrashOnExtra extends DoFn<String, String> {
+    final Set<String> expected;
+
+    public CrashOnExtra(Collection<String> records) {
+      expected = new HashSet<>(records);
+    }
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+      if (!expected.contains(element)) {
+        throw new RuntimeException("Received unexpected element: " + element);
+      } else {
+        expected.remove(element);

Review Comment:
   I do that via the log and expected logs. Its essentially a workaround sink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org