You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/07/27 14:56:35 UTC
[beam] branch master updated: 21730 fix offset resetting (#22450)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d3ef59b625b 21730 fix offset resetting (#22450)
d3ef59b625b is described below
commit d3ef59b625b2a2e89e497ca8c7382de526efe601
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Wed Jul 27 10:56:27 2022 -0400
21730 fix offset resetting (#22450)
* 21730 fix offset resetting
* update tests to skip kafka versions that do not work with SDF
---
sdks/java/io/kafka/build.gradle | 1 +
.../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 5 +-
.../org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 101 +++++++++++++++++++++
3 files changed, 103 insertions(+), 4 deletions(-)
diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index 889b6a40e89..ec645db385d 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -146,6 +146,7 @@ kafkaVersions.each {kv ->
filter {
excludeTestsMatching "*InStreaming"
if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*DynamicPartitions" //admin client create partitions does not exist in kafka 0.11.0.3 and kafka sdf does not appear to work for kafka versions <2.0.1
+ if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*SDFResumesCorrectly" //Kafka SDF does not work for kafka versions <2.0.1
}
}
}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index 7c674a6221a..28b3c4c7613 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -252,10 +252,7 @@ abstract class ReadFromKafkaDoFn<K, V>
public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSourceDescriptor) {
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
- try (Consumer<byte[], byte[]> offsetConsumer =
- consumerFactoryFn.apply(
- KafkaIOUtils.getOffsetConsumerConfig(
- "initialOffset", offsetConsumerConfig, updatedConsumerConfig))) {
+ try (Consumer<byte[], byte[]> offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
ConsumerSpEL.evaluateAssign(
offsetConsumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
long startOffset;
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index d38560667f3..0d91504e012 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertNull;
import com.google.cloud.Timestamp;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -47,6 +49,7 @@ import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -63,6 +66,7 @@ import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
@@ -87,6 +91,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
@@ -116,6 +122,8 @@ public class KafkaIOIT {
private static final String TIMESTAMP = Timestamp.now().toString();
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaIOIT.class);
+
private static String expectedHashcode;
private static SyntheticSourceOptions sourceOptions;
@@ -124,8 +132,12 @@ public class KafkaIOIT {
private static InfluxDBSettings settings;
+ @Rule public ExpectedLogs kafkaIOITExpectedLogs = ExpectedLogs.none(KafkaIOIT.class);
+
@Rule public TestPipeline writePipeline = TestPipeline.create();
+ @Rule public TestPipeline writePipeline2 = TestPipeline.create();
+
@Rule public TestPipeline readPipeline = TestPipeline.create();
private static ExperimentalOptions sdfPipelineOptions;
@@ -138,6 +150,7 @@ public class KafkaIOIT {
}
@Rule public TestPipeline sdfReadPipeline = TestPipeline.fromOptions(sdfPipelineOptions);
+ @Rule public TestPipeline sdfReadPipeline2 = TestPipeline.fromOptions(sdfPipelineOptions);
private static KafkaContainer kafkaContainer;
@@ -239,6 +252,94 @@ public class KafkaIOIT {
}
}
+ // Because of existing limitations in streaming testing, this is verified via a combination of
+ // DoFns. CrashOnExtra will throw an exception if we see any extra records beyond those we
+ // expect, and LogFn acts as a sink we can inspect using ExpectedLogs to verify that we got all
+ // those we expect.
+ @Test
+ public void testKafkaIOSDFResumesCorrectly() throws IOException {
+ roundtripElements("first-pass", 4, writePipeline, sdfReadPipeline);
+ roundtripElements("second-pass", 3, writePipeline2, sdfReadPipeline2);
+ }
+
+ private void roundtripElements(
+ String recordPrefix, Integer recordCount, TestPipeline wPipeline, TestPipeline rPipeline)
+ throws IOException {
+ AdminClient client =
+ AdminClient.create(
+ ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
+ client.listTopics();
+ Map<Integer, String> records = new HashMap<>();
+ for (int i = 0; i < recordCount; i++) {
+ records.put(i, recordPrefix + "-" + i);
+ }
+
+ wPipeline
+ .apply("Generate Write Elements", Create.of(records))
+ .apply(
+ "Write to Kafka",
+ KafkaIO.<Integer, String>write()
+ .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+ .withTopic(options.getKafkaTopic() + "-resuming")
+ .withKeySerializer(IntegerSerializer.class)
+ .withValueSerializer(StringSerializer.class));
+
+ wPipeline.run().waitUntilFinish(Duration.standardSeconds(10));
+
+ rPipeline
+ .apply(
+ "Read from Kafka",
+ KafkaIO.<Integer, String>read()
+ .withBootstrapServers(options.getKafkaBootstrapServerAddresses())
+ .withConsumerConfigUpdates(
+ ImmutableMap.of(
+ "group.id",
+ "resuming-group",
+ "auto.offset.reset",
+ "earliest",
+ "enable.auto.commit",
+ "true"))
+ .withTopic(options.getKafkaTopic() + "-resuming")
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class)
+ .withoutMetadata())
+ .apply("Get Values", Values.create())
+ .apply(ParDo.of(new CrashOnExtra(records.values())))
+ .apply(ParDo.of(new LogFn()));
+
+ rPipeline.run().waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+ for (String value : records.values()) {
+ kafkaIOITExpectedLogs.verifyError(value);
+ }
+ }
+
+ public static class CrashOnExtra extends DoFn<String, String> {
+ final Set<String> expected;
+
+ public CrashOnExtra(Collection<String> records) {
+ expected = new HashSet<>(records);
+ }
+
+ @ProcessElement
+ public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+ if (!expected.contains(element)) {
+ throw new RuntimeException("Received unexpected element: " + element);
+ } else {
+ expected.remove(element);
+ outputReceiver.output(element);
+ }
+ }
+ }
+
+ public static class LogFn extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+ LOG.error(element);
+ outputReceiver.output(element);
+ }
+ }
+
// This test roundtrips a single KV<Null,Null> to verify that externalWithMetadata
// can handle null keys and values correctly.
@Test