You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/09/10 01:08:06 UTC
[beam] branch master updated: Improvements to SchemaTransform implementations for BQ and Kafka (#23045)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1526ca8c4cc Improvements to SchemaTransform implementations for BQ and Kafka (#23045)
1526ca8c4cc is described below
commit 1526ca8c4cc6d58b3c28d816fc2597e51603d75f
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Fri Sep 9 18:07:58 2022 -0700
Improvements to SchemaTransform implementations for BQ and Kafka (#23045)
* Improvements to SchemaTransform implementations for BQ and Kafka
* Fix kafka null checs
* fixup
* fixup?
* few improvements
* fixup
* adding test
* fix issue with hashcode
---
.../io/gcp/bigquery/BigQuerySchemaIOProvider.java | 20 ++-
.../KafkaSchemaTransformReadConfiguration.java | 170 ++++-----------------
.../io/kafka/KafkaSchemaTransformReadProvider.java | 167 ++++++++++++++++++++
.../KafkaSchemaTransformReadProviderTest.java | 124 +++++++++++++++
4 files changed, 337 insertions(+), 144 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
index a1024a8ba82..59d798f8dc5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
@@ -23,6 +23,9 @@ import java.io.Serializable;
import java.util.HashMap;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.io.SchemaIO;
@@ -34,6 +37,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
/**
* An implementation of {@link SchemaIOProvider} for reading and writing to BigQuery with {@link
@@ -90,6 +94,7 @@ public class BigQuerySchemaIOProvider implements SchemaIOProvider {
.addNullableField("query", FieldType.STRING)
.addNullableField("queryLocation", FieldType.STRING)
.addNullableField("createDisposition", FieldType.STRING)
+ .addNullableField("useTestingBigQueryServices", FieldType.BOOLEAN)
.build();
}
@@ -194,7 +199,20 @@ public class BigQuerySchemaIOProvider implements SchemaIOProvider {
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.useBeamSchema()
- .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+ .withTriggeringFrequency(Duration.standardSeconds(5))
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
+ .withAutoSharding();
+
+ final Boolean useTestingBigQueryServices =
+ config.getBoolean("useTestingBigQueryServices");
+ if (useTestingBigQueryServices != null && useTestingBigQueryServices) {
+ FakeBigQueryServices fbqs =
+ new FakeBigQueryServices()
+ .withDatasetService(new FakeDatasetService())
+ .withJobService(new FakeJobService());
+ write = write.withTestServices(fbqs);
+ }
String table = config.getString("table");
if (table != null) {
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java
index 29bd3a23c23..9280de6c13f 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java
@@ -18,13 +18,13 @@
package org.apache.beam.sdk.io.kafka;
import com.google.auto.value.AutoValue;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
/**
* Configuration for reading from a Kafka topic.
@@ -38,70 +38,48 @@ import org.apache.kafka.common.TopicPartition;
@AutoValue
public abstract class KafkaSchemaTransformReadConfiguration {
+ public static final Set<String> VALID_START_OFFSET_VALUES = Sets.newHashSet("earliest", "latest");
+ public static final Set<String> VALID_DATA_FORMATS = Sets.newHashSet("AVRO", "JSON");
+
+ public void validate() {
+ final String startOffset = this.getAutoOffsetResetConfig();
+ assert startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset)
+ : "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES;
+ final String dataFormat = this.getDataFormat();
+ assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
+ : "Valid data formats are " + VALID_DATA_FORMATS;
+ }
+
/** Instantiates a {@link KafkaSchemaTransformReadConfiguration.Builder} instance. */
public static Builder builder() {
return new AutoValue_KafkaSchemaTransformReadConfiguration.Builder();
}
/** Sets the bootstrap servers for the Kafka consumer. */
- @Nullable
public abstract String getBootstrapServers();
- /** Flags whether finalized offsets are committed to Kafka. */
- @Nullable
- public abstract Boolean getCommitOffsetsInFinalize();
-
- /** Configuration updates for the backend main consumer. */
- @Nullable
- public abstract Map<String, Object> getConsumerConfigUpdates();
-
- /**
- * Sets the timestamps policy based on KafkaTimestampType.CREATE_TIME timestamp of the records.
- */
- @Nullable
- public abstract Long getCreateTimeMillisecondsMaximumDelay();
-
- /**
- * Configure the KafkaIO to use WatchKafkaTopicPartitionDoFn to detect and emit any new available
- * {@link TopicPartition} for ReadFromKafkaDoFn to consume during pipeline execution time.
- */
- @Nullable
- public abstract Long getDynamicReadMillisecondsDuration();
-
- /** Additional configuration for the backend offset consumer. */
@Nullable
- public abstract Map<String, Object> getOffsetConsumerConfiguration();
+ public abstract String getConfluentSchemaRegistryUrl();
- /** Specifies whether to include metadata when reading from Kafka topic. */
+ // TODO(pabloem): Make data format an ENUM
@Nullable
- public abstract Boolean getReadWithMetadata();
+ public abstract String getDataFormat();
- /** Sets "isolation_level" to "read_committed" in Kafka consumer configuration. */
@Nullable
- public abstract Boolean getReadCommitted();
+ public abstract String getConfluentSchemaRegistrySubject();
- /** Use timestamp to set up start offset. */
@Nullable
- public abstract Long getStartReadTimeMillisecondsEpoch();
+ public abstract String getAvroSchema();
- /** Use timestamp to set up stop offset. */
@Nullable
- public abstract Long getStopReadTimeMillisecondsEpoch();
+ public abstract String getAutoOffsetResetConfig();
- /**
- * A timestamp policy to assign event time for messages in a Kafka partition and watermark for it.
- */
@Nullable
- public abstract TimestampPolicyConfiguration getTimestampPolicy();
+ public abstract Map<String, String> getConsumerConfigUpdates();
/** Sets the topic from which to read. */
- @Nullable
public abstract String getTopic();
- /** Kafka partitions from which to read. */
- @Nullable
- public abstract List<TopicPartitionConfiguration> getTopicPartitions();
-
/** Builder for the {@link KafkaSchemaTransformReadConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
@@ -109,116 +87,22 @@ public abstract class KafkaSchemaTransformReadConfiguration {
/** Sets the bootstrap servers for the Kafka consumer. */
public abstract Builder setBootstrapServers(String value);
- /** Flags whether finalized offsets are committed to Kafka. */
- public abstract Builder setCommitOffsetsInFinalize(Boolean value);
-
- /** Configuration updates for the backend main consumer. */
- public abstract Builder setConsumerConfigUpdates(Map<String, Object> value);
-
- /**
- * Sets the timestamps policy based on KafkaTimestampType.CREATE_TIME timestamp of the records.
- */
- public abstract Builder setCreateTimeMillisecondsMaximumDelay(Long value);
-
- /**
- * Configure the KafkaIO to use WatchKafkaTopicPartitionDoFn to detect and emit any new
- * available {@link TopicPartition} for ReadFromKafkaDoFn to consume during pipeline execution
- * time.
- */
- public abstract Builder setDynamicReadMillisecondsDuration(Long value);
+ public abstract Builder setConfluentSchemaRegistryUrl(String schemaRegistry);
- /** Additional configuration for the backend offset consumer. */
- public abstract Builder setOffsetConsumerConfiguration(Map<String, Object> value);
+ public abstract Builder setConfluentSchemaRegistrySubject(String subject);
- /** Specifies whether to include metadata when reading from Kafka topic. */
- public abstract Builder setReadWithMetadata(Boolean value);
+ public abstract Builder setAvroSchema(String schema);
- /** Sets "isolation_level" to "read_committed" in Kafka consumer configuration. */
- public abstract Builder setReadCommitted(Boolean value);
+ public abstract Builder setDataFormat(String dataFormat);
- /** Use timestamp to set up start offset. */
- public abstract Builder setStartReadTimeMillisecondsEpoch(Long value);
+ public abstract Builder setAutoOffsetResetConfig(String startOffset);
- /** Use timestamp to set up stop offset. */
- public abstract Builder setStopReadTimeMillisecondsEpoch(Long value);
-
- /**
- * A timestamp policy to assign event time for messages in a Kafka partition and watermark for
- * it.
- */
- public abstract Builder setTimestampPolicy(TimestampPolicyConfiguration value);
+ public abstract Builder setConsumerConfigUpdates(Map<String, String> consumerConfigUpdates);
/** Sets the topic from which to read. */
public abstract Builder setTopic(String value);
- /** Kafka partitions from which to read. */
- public abstract Builder setTopicPartitions(List<TopicPartitionConfiguration> value);
-
/** Builds a {@link KafkaSchemaTransformReadConfiguration} instance. */
public abstract KafkaSchemaTransformReadConfiguration build();
}
-
- /**
- * A configuration for a {@link TopicPartition}.
- *
- * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
- * provide no backwards compatibility guarantees, and it should not be implemented outside the
- * Beam repository.
- */
- @Experimental
- @DefaultSchema(AutoValueSchema.class)
- @AutoValue
- public abstract static class TopicPartitionConfiguration {
-
- /** Instantiates a {@link TopicPartitionConfiguration.Builder} instance. */
- public static Builder builder() {
- return new AutoValue_KafkaSchemaTransformReadConfiguration_TopicPartitionConfiguration
- .Builder();
- }
-
- /** The name of the topic defining the partition. */
- public abstract String getTopic();
-
- /** The number of the topic partition. */
- public abstract Integer getPartition();
-
- /** Builder for the {@link TopicPartitionConfiguration}. */
- @AutoValue.Builder
- public abstract static class Builder {
-
- /** The name of the topic defining the partition. */
- public abstract Builder setTopic(String value);
-
- /** The number of the topic partition. */
- public abstract Builder setPartition(Integer value);
-
- /** Builds a {@link TopicPartitionConfiguration} instance. */
- public abstract TopicPartitionConfiguration build();
- }
- }
-
- /**
- * A timestamp policy to assign event time for messages in a Kafka partition and watermark for it.
- *
- * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
- * provide no backwards compatibility guarantees, and it should not be implemented outside the
- * Beam repository.
- */
- @Experimental
- public enum TimestampPolicyConfiguration {
-
- /**
- * Assigns Kafka's log append time (server side ingestion time) to each record. The watermark
- * for each Kafka partition is the timestamp of the last record read. If a partition is idle,
- * the watermark advances roughly to 'current time - 2 seconds'. See {@link
- * KafkaIO.Read#withLogAppendTime()} for longer description.
- */
- LOG_APPEND_TIME,
-
- /**
- * A simple policy that uses current time for event time and watermark. This should be used when
- * better timestamps like LogAppendTime are not available for a topic.
- */
- PROCESSING_TIME,
- }
}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java
new file mode 100644
index 00000000000..a13c54c22aa
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.service.AutoService;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+@AutoService(SchemaTransformProvider.class)
+public class KafkaSchemaTransformReadProvider
+ extends TypedSchemaTransformProvider<KafkaSchemaTransformReadConfiguration> {
+
+ @Override
+ protected Class<KafkaSchemaTransformReadConfiguration> configurationClass() {
+ return KafkaSchemaTransformReadConfiguration.class;
+ }
+
+ @Override
+ protected SchemaTransform from(KafkaSchemaTransformReadConfiguration configuration) {
+ return new KafkaReadSchemaTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ return "kafka:read";
+ }
+
+ @Override
+ public List<String> inputCollectionNames() {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public List<String> outputCollectionNames() {
+ return Lists.newArrayList("OUTPUT");
+ }
+
+ private static class KafkaReadSchemaTransform implements SchemaTransform {
+ private final KafkaSchemaTransformReadConfiguration configuration;
+
+ KafkaReadSchemaTransform(KafkaSchemaTransformReadConfiguration configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+ final String avroSchema = configuration.getAvroSchema();
+ final Integer groupId = configuration.hashCode() % Integer.MAX_VALUE;
+ final String autoOffsetReset =
+ configuration.getAutoOffsetResetConfig() == null
+ ? "latest"
+ : configuration.getAutoOffsetResetConfig();
+ if (avroSchema != null) {
+ assert configuration.getConfluentSchemaRegistryUrl() == null
+ : "To read from Kafka, a schema must be provided directly or though Confluent "
+ + "Schema Registry, but not both.";
+ final Schema beamSchema =
+ AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(avroSchema));
+ SerializableFunction<byte[], Row> valueMapper =
+ AvroUtils.getAvroBytesToRowFunction(beamSchema);
+ return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ KafkaIO.Read<byte[], byte[]> kafkaRead =
+ KafkaIO.readBytes()
+ .withConsumerConfigUpdates(
+ ImmutableMap.of(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "kafka-read-provider-" + groupId,
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ true,
+ ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
+ 100,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ autoOffsetReset))
+ .withTopic(configuration.getTopic())
+ .withBootstrapServers(configuration.getBootstrapServers());
+
+ return PCollectionRowTuple.of(
+ "OUTPUT",
+ input
+ .getPipeline()
+ .apply(kafkaRead.withoutMetadata())
+ .apply(Values.create())
+ .apply(MapElements.into(TypeDescriptors.rows()).via(valueMapper))
+ .setRowSchema(beamSchema));
+ }
+ };
+ } else {
+ return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ final String confluentSchemaRegUrl = configuration.getConfluentSchemaRegistryUrl();
+ final String confluentSchemaRegSubject =
+ configuration.getConfluentSchemaRegistrySubject();
+ if (confluentSchemaRegUrl == null || confluentSchemaRegSubject == null) {
+ throw new IllegalArgumentException(
+ "To read from Kafka, a schema must be provided directly or though Confluent "
+ + "Schema Registry. Make sure you are providing one of these parameters.");
+ }
+ KafkaIO.Read<byte[], GenericRecord> kafkaRead =
+ KafkaIO.<byte[], GenericRecord>read()
+ .withTopic(configuration.getTopic())
+ .withBootstrapServers(configuration.getBootstrapServers())
+ .withConsumerConfigUpdates(
+ ImmutableMap.of(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "kafka-read-provider-" + groupId,
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ true,
+ ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
+ 100,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ autoOffsetReset))
+ .withKeyDeserializer(ByteArrayDeserializer.class)
+ .withValueDeserializer(
+ ConfluentSchemaRegistryDeserializerProvider.of(
+ confluentSchemaRegUrl, confluentSchemaRegSubject));
+
+ PCollection<GenericRecord> kafkaValues =
+ input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
+
+ assert kafkaValues.getCoder().getClass() == AvroCoder.class;
+ AvroCoder<GenericRecord> coder = (AvroCoder<GenericRecord>) kafkaValues.getCoder();
+ kafkaValues = kafkaValues.setCoder(AvroUtils.schemaCoder(coder.getSchema()));
+ return PCollectionRowTuple.of("OUTPUT", kafkaValues.apply(Convert.toRows()));
+ }
+ };
+ }
+ }
+ };
+}
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java
new file mode 100644
index 00000000000..c8f76ecf3cc
--- /dev/null
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link KafkaSchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class KafkaSchemaTransformReadProviderTest {
+ private static final String AVRO_SCHEMA =
+ "{\"type\":\"record\",\"namespace\":\"com.example\","
+ + "\"name\":\"FullName\",\"fields\":[{\"name\":\"first\",\"type\":\"string\"},"
+ + "{\"name\":\"last\",\"type\":\"string\"}]}";
+
+ @Test
+ public void testValidConfigurations() {
+ assertThrows(
+ AssertionError.class,
+ () -> {
+ KafkaSchemaTransformReadConfiguration.builder()
+ .setDataFormat("UNUSUAL_FORMAT")
+ .setTopic("a_valid_topic")
+ .setBootstrapServers("a_valid_server")
+ .build()
+ .validate();
+ });
+
+ assertThrows(
+ IllegalStateException.class,
+ () -> {
+ KafkaSchemaTransformReadConfiguration.builder()
+ .setDataFormat("UNUSUAL_FORMAT")
+ // .setTopic("a_valid_topic") // Topic is mandatory
+ .setBootstrapServers("a_valid_server")
+ .build()
+ .validate();
+ });
+
+ assertThrows(
+ IllegalStateException.class,
+ () -> {
+ KafkaSchemaTransformReadConfiguration.builder()
+ .setDataFormat("UNUSUAL_FORMAT")
+ .setTopic("a_valid_topic")
+ // .setBootstrapServers("a_valid_server") // Bootstrap server is mandatory
+ .build()
+ .validate();
+ });
+ }
+
+ @Test
+ public void testFindTransformAndMakeItWork() {
+ ServiceLoader<SchemaTransformProvider> serviceLoader =
+ ServiceLoader.load(SchemaTransformProvider.class);
+ List<SchemaTransformProvider> providers =
+ StreamSupport.stream(serviceLoader.spliterator(), false)
+ .filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class)
+ .collect(Collectors.toList());
+ SchemaTransformProvider kafkaProvider = providers.get(0);
+ assertEquals(kafkaProvider.outputCollectionNames(), Lists.newArrayList("OUTPUT"));
+ assertEquals(kafkaProvider.inputCollectionNames(), Lists.newArrayList());
+
+ assertEquals(
+ Sets.newHashSet(
+ "bootstrapServers",
+ "topic",
+ "avroSchema",
+ "autoOffsetResetConfig",
+ "consumerConfigUpdates",
+ "dataFormat",
+ "confluentSchemaRegistrySubject",
+ "confluentSchemaRegistryUrl"),
+ kafkaProvider.configurationSchema().getFields().stream()
+ .map(field -> field.getName())
+ .collect(Collectors.toSet()));
+ }
+
+ @Test
+ public void testBuildTransformWithAvroSchema() {
+ ServiceLoader<SchemaTransformProvider> serviceLoader =
+ ServiceLoader.load(SchemaTransformProvider.class);
+ List<SchemaTransformProvider> providers =
+ StreamSupport.stream(serviceLoader.spliterator(), false)
+ .filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class)
+ .collect(Collectors.toList());
+ KafkaSchemaTransformReadProvider kafkaProvider =
+ (KafkaSchemaTransformReadProvider) providers.get(0);
+ kafkaProvider
+ .from(
+ KafkaSchemaTransformReadConfiguration.builder()
+ .setTopic("anytopic")
+ .setBootstrapServers("anybootstrap")
+ .setAvroSchema(AVRO_SCHEMA)
+ .build())
+ .buildTransform();
+ }
+}