You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/07/27 14:56:35 UTC

[beam] branch master updated: 21730 fix offset resetting (#22450)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d3ef59b625b 21730 fix offset resetting (#22450)
d3ef59b625b is described below

commit d3ef59b625b2a2e89e497ca8c7382de526efe601
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Wed Jul 27 10:56:27 2022 -0400

    21730 fix offset resetting (#22450)
    
    * 21730 fix offset resetting
    
    * update tests to skip kafka versions that do not work with SDF
---
 sdks/java/io/kafka/build.gradle                    |   1 +
 .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java       |   5 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIOIT.java    | 101 +++++++++++++++++++++
 3 files changed, 103 insertions(+), 4 deletions(-)

diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index 889b6a40e89..ec645db385d 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -146,6 +146,7 @@ kafkaVersions.each {kv ->
     filter {
       excludeTestsMatching "*InStreaming"
       if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*DynamicPartitions" //admin client create partitions does not exist in kafka 0.11.0.3 and kafka sdf does not appear to work for kafka versions <2.0.1
+      if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*SDFResumesCorrectly" //Kafka SDF does not work for kafka versions <2.0.1
     }
   }
 }
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 7c674a6221a..28b3c4c7613 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -252,10 +252,7 @@ abstract class ReadFromKafkaDoFn<K, V>
   public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSourceDescriptor) {
     Map<String, Object> updatedConsumerConfig =
         overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
-    try (Consumer<byte[], byte[]> offsetConsumer =
-        consumerFactoryFn.apply(
-            KafkaIOUtils.getOffsetConsumerConfig(
-                "initialOffset", offsetConsumerConfig, updatedConsumerConfig))) {
+    try (Consumer<byte[], byte[]> offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
       ConsumerSpEL.evaluateAssign(
           offsetConsumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
       long startOffset;
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index d38560667f3..0d91504e012 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertNull;
 import com.google.cloud.Timestamp;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -47,6 +49,7 @@ import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -63,6 +66,7 @@ import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
@@ -87,6 +91,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.utility.DockerImageName;
 
@@ -116,6 +122,8 @@ public class KafkaIOIT {
 
   private static final String TIMESTAMP = Timestamp.now().toString();
 
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaIOIT.class);
+
   private static String expectedHashcode;
 
   private static SyntheticSourceOptions sourceOptions;
@@ -124,8 +132,12 @@ public class KafkaIOIT {
 
   private static InfluxDBSettings settings;
 
+  @Rule public ExpectedLogs kafkaIOITExpectedLogs = ExpectedLogs.none(KafkaIOIT.class);
+
   @Rule public TestPipeline writePipeline = TestPipeline.create();
 
+  @Rule public TestPipeline writePipeline2 = TestPipeline.create();
+
   @Rule public TestPipeline readPipeline = TestPipeline.create();
 
   private static ExperimentalOptions sdfPipelineOptions;
@@ -138,6 +150,7 @@ public class KafkaIOIT {
   }
 
   @Rule public TestPipeline sdfReadPipeline = TestPipeline.fromOptions(sdfPipelineOptions);
+  @Rule public TestPipeline sdfReadPipeline2 = TestPipeline.fromOptions(sdfPipelineOptions);
 
   private static KafkaContainer kafkaContainer;
 
@@ -239,6 +252,94 @@ public class KafkaIOIT {
     }
   }
 
+  // Because of existing limitations in streaming testing, this is verified via a combination of
+  // DoFns.  CrashOnExtra will throw an exception if we see any extra records beyond those we
+  // expect, and LogFn acts as a sink we can inspect using ExpectedLogs to verify that we got all
+  // those we expect.
+  @Test
+  public void testKafkaIOSDFResumesCorrectly() throws IOException {
+    roundtripElements("first-pass", 4, writePipeline, sdfReadPipeline);
+    roundtripElements("second-pass", 3, writePipeline2, sdfReadPipeline2);
+  }
+
+  private void roundtripElements(
+      String recordPrefix, Integer recordCount, TestPipeline wPipeline, TestPipeline rPipeline)
+      throws IOException {
+    AdminClient client =
+        AdminClient.create(
+            ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+    client.listTopics();
+    Map<Integer, String> records = new HashMap<>();
+    for (int i = 0; i < recordCount; i++) {
+      records.put(i, recordPrefix + "-" + i);
+    }
+
+    wPipeline
+        .apply("Generate Write Elements", Create.of(records))
+        .apply(
+            "Write to Kafka",
+            KafkaIO.<Integer, String>write()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeySerializer(IntegerSerializer.class)
+                .withValueSerializer(StringSerializer.class));
+
+    wPipeline.run().waitUntilFinish(Duration.standardSeconds(10));
+
+    rPipeline
+        .apply(
+            "Read from Kafka",
+            KafkaIO.<Integer, String>read()
+                .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+                .withConsumerConfigUpdates(
+                    ImmutableMap.of(
+                        "group.id",
+                        "resuming-group",
+                        "auto.offset.reset",
+                        "earliest",
+                        "enable.auto.commit",
+                        "true"))
+                .withTopic(options.getKafkaTopic() + "-resuming")
+                .withKeyDeserializer(IntegerDeserializer.class)
+                .withValueDeserializer(StringDeserializer.class)
+                .withoutMetadata())
+        .apply("Get Values", Values.create())
+        .apply(ParDo.of(new CrashOnExtra(records.values())))
+        .apply(ParDo.of(new LogFn()));
+
+    rPipeline.run().waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    for (String value : records.values()) {
+      kafkaIOITExpectedLogs.verifyError(value);
+    }
+  }
+
+  public static class CrashOnExtra extends DoFn<String, String> {
+    final Set<String> expected;
+
+    public CrashOnExtra(Collection<String> records) {
+      expected = new HashSet<>(records);
+    }
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+      if (!expected.contains(element)) {
+        throw new RuntimeException("Received unexpected element: " + element);
+      } else {
+        expected.remove(element);
+        outputReceiver.output(element);
+      }
+    }
+  }
+
+  public static class LogFn extends DoFn<String, String> {
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+      LOG.error(element);
+      outputReceiver.output(element);
+    }
+  }
+
   // This test roundtrips a single KV<Null,Null> to verify that externalWithMetadata
   // can handle null keys and values correctly.
   @Test