You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/09/10 01:08:06 UTC

[beam] branch master updated: Improvements to SchemaTransform implementations for BQ and Kafka (#23045)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1526ca8c4cc Improvements to SchemaTransform implementations for BQ and Kafka (#23045)
1526ca8c4cc is described below

commit 1526ca8c4cc6d58b3c28d816fc2597e51603d75f
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Fri Sep 9 18:07:58 2022 -0700

    Improvements to SchemaTransform implementations for BQ and Kafka (#23045)
    
    * Improvements to SchemaTransform implementations for BQ and Kafka
    
    * Fix kafka null checs
    
    * fixup
    
    * fixup?
    
    * few improvements
    
    * fixup
    
    * adding test
    
    * fix issue with hashcode
---
 .../io/gcp/bigquery/BigQuerySchemaIOProvider.java  |  20 ++-
 .../KafkaSchemaTransformReadConfiguration.java     | 170 ++++-----------------
 .../io/kafka/KafkaSchemaTransformReadProvider.java | 167 ++++++++++++++++++++
 .../KafkaSchemaTransformReadProviderTest.java      | 124 +++++++++++++++
 4 files changed, 337 insertions(+), 144 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
index a1024a8ba82..59d798f8dc5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
@@ -23,6 +23,9 @@ import java.io.Serializable;
 import java.util.HashMap;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.io.SchemaIO;
@@ -34,6 +37,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.Row;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
 
 /**
  * An implementation of {@link SchemaIOProvider} for reading and writing to BigQuery with {@link
@@ -90,6 +94,7 @@ public class BigQuerySchemaIOProvider implements SchemaIOProvider {
         .addNullableField("query", FieldType.STRING)
         .addNullableField("queryLocation", FieldType.STRING)
         .addNullableField("createDisposition", FieldType.STRING)
+        .addNullableField("useTestingBigQueryServices", FieldType.BOOLEAN)
         .build();
   }
 
@@ -194,7 +199,20 @@ public class BigQuerySchemaIOProvider implements SchemaIOProvider {
           BigQueryIO.Write<Row> write =
               BigQueryIO.<Row>write()
                   .useBeamSchema()
-                  .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);
+                  .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+                  .withTriggeringFrequency(Duration.standardSeconds(5))
+                  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
+                  .withAutoSharding();
+
+          final Boolean useTestingBigQueryServices =
+              config.getBoolean("useTestingBigQueryServices");
+          if (useTestingBigQueryServices != null && useTestingBigQueryServices) {
+            FakeBigQueryServices fbqs =
+                new FakeBigQueryServices()
+                    .withDatasetService(new FakeDatasetService())
+                    .withJobService(new FakeJobService());
+            write = write.withTestServices(fbqs);
+          }
 
           String table = config.getString("table");
           if (table != null) {
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java
index 29bd3a23c23..9280de6c13f 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java
@@ -18,13 +18,13 @@
 package org.apache.beam.sdk.io.kafka;
 
 import com.google.auto.value.AutoValue;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 
 /**
  * Configuration for reading from a Kafka topic.
@@ -38,70 +38,48 @@ import org.apache.kafka.common.TopicPartition;
 @AutoValue
 public abstract class KafkaSchemaTransformReadConfiguration {
 
+  public static final Set<String> VALID_START_OFFSET_VALUES = Sets.newHashSet("earliest", "latest");
+  public static final Set<String> VALID_DATA_FORMATS = Sets.newHashSet("AVRO", "JSON");
+
+  public void validate() {
+    final String startOffset = this.getAutoOffsetResetConfig();
+    assert startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset)
+        : "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES;
+    final String dataFormat = this.getDataFormat();
+    assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
+        : "Valid data formats are " + VALID_DATA_FORMATS;
+  }
+
   /** Instantiates a {@link KafkaSchemaTransformReadConfiguration.Builder} instance. */
   public static Builder builder() {
     return new AutoValue_KafkaSchemaTransformReadConfiguration.Builder();
   }
 
   /** Sets the bootstrap servers for the Kafka consumer. */
-  @Nullable
   public abstract String getBootstrapServers();
 
-  /** Flags whether finalized offsets are committed to Kafka. */
-  @Nullable
-  public abstract Boolean getCommitOffsetsInFinalize();
-
-  /** Configuration updates for the backend main consumer. */
-  @Nullable
-  public abstract Map<String, Object> getConsumerConfigUpdates();
-
-  /**
-   * Sets the timestamps policy based on KafkaTimestampType.CREATE_TIME timestamp of the records.
-   */
-  @Nullable
-  public abstract Long getCreateTimeMillisecondsMaximumDelay();
-
-  /**
-   * Configure the KafkaIO to use WatchKafkaTopicPartitionDoFn to detect and emit any new available
-   * {@link TopicPartition} for ReadFromKafkaDoFn to consume during pipeline execution time.
-   */
-  @Nullable
-  public abstract Long getDynamicReadMillisecondsDuration();
-
-  /** Additional configuration for the backend offset consumer. */
   @Nullable
-  public abstract Map<String, Object> getOffsetConsumerConfiguration();
+  public abstract String getConfluentSchemaRegistryUrl();
 
-  /** Specifies whether to include metadata when reading from Kafka topic. */
+  // TODO(pabloem): Make data format an ENUM
   @Nullable
-  public abstract Boolean getReadWithMetadata();
+  public abstract String getDataFormat();
 
-  /** Sets "isolation_level" to "read_committed" in Kafka consumer configuration. */
   @Nullable
-  public abstract Boolean getReadCommitted();
+  public abstract String getConfluentSchemaRegistrySubject();
 
-  /** Use timestamp to set up start offset. */
   @Nullable
-  public abstract Long getStartReadTimeMillisecondsEpoch();
+  public abstract String getAvroSchema();
 
-  /** Use timestamp to set up stop offset. */
   @Nullable
-  public abstract Long getStopReadTimeMillisecondsEpoch();
+  public abstract String getAutoOffsetResetConfig();
 
-  /**
-   * A timestamp policy to assign event time for messages in a Kafka partition and watermark for it.
-   */
   @Nullable
-  public abstract TimestampPolicyConfiguration getTimestampPolicy();
+  public abstract Map<String, String> getConsumerConfigUpdates();
 
   /** Sets the topic from which to read. */
-  @Nullable
   public abstract String getTopic();
 
-  /** Kafka partitions from which to read. */
-  @Nullable
-  public abstract List<TopicPartitionConfiguration> getTopicPartitions();
-
   /** Builder for the {@link KafkaSchemaTransformReadConfiguration}. */
   @AutoValue.Builder
   public abstract static class Builder {
@@ -109,116 +87,22 @@ public abstract class KafkaSchemaTransformReadConfiguration {
     /** Sets the bootstrap servers for the Kafka consumer. */
     public abstract Builder setBootstrapServers(String value);
 
-    /** Flags whether finalized offsets are committed to Kafka. */
-    public abstract Builder setCommitOffsetsInFinalize(Boolean value);
-
-    /** Configuration updates for the backend main consumer. */
-    public abstract Builder setConsumerConfigUpdates(Map<String, Object> value);
-
-    /**
-     * Sets the timestamps policy based on KafkaTimestampType.CREATE_TIME timestamp of the records.
-     */
-    public abstract Builder setCreateTimeMillisecondsMaximumDelay(Long value);
-
-    /**
-     * Configure the KafkaIO to use WatchKafkaTopicPartitionDoFn to detect and emit any new
-     * available {@link TopicPartition} for ReadFromKafkaDoFn to consume during pipeline execution
-     * time.
-     */
-    public abstract Builder setDynamicReadMillisecondsDuration(Long value);
+    public abstract Builder setConfluentSchemaRegistryUrl(String schemaRegistry);
 
-    /** Additional configuration for the backend offset consumer. */
-    public abstract Builder setOffsetConsumerConfiguration(Map<String, Object> value);
+    public abstract Builder setConfluentSchemaRegistrySubject(String subject);
 
-    /** Specifies whether to include metadata when reading from Kafka topic. */
-    public abstract Builder setReadWithMetadata(Boolean value);
+    public abstract Builder setAvroSchema(String schema);
 
-    /** Sets "isolation_level" to "read_committed" in Kafka consumer configuration. */
-    public abstract Builder setReadCommitted(Boolean value);
+    public abstract Builder setDataFormat(String dataFormat);
 
-    /** Use timestamp to set up start offset. */
-    public abstract Builder setStartReadTimeMillisecondsEpoch(Long value);
+    public abstract Builder setAutoOffsetResetConfig(String startOffset);
 
-    /** Use timestamp to set up stop offset. */
-    public abstract Builder setStopReadTimeMillisecondsEpoch(Long value);
-
-    /**
-     * A timestamp policy to assign event time for messages in a Kafka partition and watermark for
-     * it.
-     */
-    public abstract Builder setTimestampPolicy(TimestampPolicyConfiguration value);
+    public abstract Builder setConsumerConfigUpdates(Map<String, String> consumerConfigUpdates);
 
     /** Sets the topic from which to read. */
     public abstract Builder setTopic(String value);
 
-    /** Kafka partitions from which to read. */
-    public abstract Builder setTopicPartitions(List<TopicPartitionConfiguration> value);
-
     /** Builds a {@link KafkaSchemaTransformReadConfiguration} instance. */
     public abstract KafkaSchemaTransformReadConfiguration build();
   }
-
-  /**
-   * A configuration for a {@link TopicPartition}.
-   *
-   * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
-   * provide no backwards compatibility guarantees, and it should not be implemented outside the
-   * Beam repository.
-   */
-  @Experimental
-  @DefaultSchema(AutoValueSchema.class)
-  @AutoValue
-  public abstract static class TopicPartitionConfiguration {
-
-    /** Instantiates a {@link TopicPartitionConfiguration.Builder} instance. */
-    public static Builder builder() {
-      return new AutoValue_KafkaSchemaTransformReadConfiguration_TopicPartitionConfiguration
-          .Builder();
-    }
-
-    /** The name of the topic defining the partition. */
-    public abstract String getTopic();
-
-    /** The number of the topic partition. */
-    public abstract Integer getPartition();
-
-    /** Builder for the {@link TopicPartitionConfiguration}. */
-    @AutoValue.Builder
-    public abstract static class Builder {
-
-      /** The name of the topic defining the partition. */
-      public abstract Builder setTopic(String value);
-
-      /** The number of the topic partition. */
-      public abstract Builder setPartition(Integer value);
-
-      /** Builds a {@link TopicPartitionConfiguration} instance. */
-      public abstract TopicPartitionConfiguration build();
-    }
-  }
-
-  /**
-   * A timestamp policy to assign event time for messages in a Kafka partition and watermark for it.
-   *
-   * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
-   * provide no backwards compatibility guarantees, and it should not be implemented outside the
-   * Beam repository.
-   */
-  @Experimental
-  public enum TimestampPolicyConfiguration {
-
-    /**
-     * Assigns Kafka's log append time (server side ingestion time) to each record. The watermark
-     * for each Kafka partition is the timestamp of the last record read. If a partition is idle,
-     * the watermark advances roughly to 'current time - 2 seconds'. See {@link
-     * KafkaIO.Read#withLogAppendTime()} for longer description.
-     */
-    LOG_APPEND_TIME,
-
-    /**
-     * A simple policy that uses current time for event time and watermark. This should be used when
-     * better timestamps like LogAppendTime are not available for a topic.
-     */
-    PROCESSING_TIME,
-  }
 }
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java
new file mode 100644
index 00000000000..a13c54c22aa
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.service.AutoService;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+@AutoService(SchemaTransformProvider.class)
+public class KafkaSchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<KafkaSchemaTransformReadConfiguration> {
+
+  @Override
+  protected Class<KafkaSchemaTransformReadConfiguration> configurationClass() {
+    return KafkaSchemaTransformReadConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(KafkaSchemaTransformReadConfiguration configuration) {
+    return new KafkaReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return "kafka:read";
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Lists.newArrayList();
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Lists.newArrayList("OUTPUT");
+  }
+
+  private static class KafkaReadSchemaTransform implements SchemaTransform {
+    private final KafkaSchemaTransformReadConfiguration configuration;
+
+    KafkaReadSchemaTransform(KafkaSchemaTransformReadConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      final String avroSchema = configuration.getAvroSchema();
+      final Integer groupId = configuration.hashCode() % Integer.MAX_VALUE;
+      final String autoOffsetReset =
+          configuration.getAutoOffsetResetConfig() == null
+              ? "latest"
+              : configuration.getAutoOffsetResetConfig();
+      if (avroSchema != null) {
+        assert configuration.getConfluentSchemaRegistryUrl() == null
+            : "To read from Kafka, a schema must be provided directly or though Confluent "
+                + "Schema Registry, but not both.";
+        final Schema beamSchema =
+            AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(avroSchema));
+        SerializableFunction<byte[], Row> valueMapper =
+            AvroUtils.getAvroBytesToRowFunction(beamSchema);
+        return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+          @Override
+          public PCollectionRowTuple expand(PCollectionRowTuple input) {
+            KafkaIO.Read<byte[], byte[]> kafkaRead =
+                KafkaIO.readBytes()
+                    .withConsumerConfigUpdates(
+                        ImmutableMap.of(
+                            ConsumerConfig.GROUP_ID_CONFIG,
+                            "kafka-read-provider-" + groupId,
+                            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+                            true,
+                            ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
+                            100,
+                            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                            autoOffsetReset))
+                    .withTopic(configuration.getTopic())
+                    .withBootstrapServers(configuration.getBootstrapServers());
+
+            return PCollectionRowTuple.of(
+                "OUTPUT",
+                input
+                    .getPipeline()
+                    .apply(kafkaRead.withoutMetadata())
+                    .apply(Values.create())
+                    .apply(MapElements.into(TypeDescriptors.rows()).via(valueMapper))
+                    .setRowSchema(beamSchema));
+          }
+        };
+      } else {
+        return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+          @Override
+          public PCollectionRowTuple expand(PCollectionRowTuple input) {
+            final String confluentSchemaRegUrl = configuration.getConfluentSchemaRegistryUrl();
+            final String confluentSchemaRegSubject =
+                configuration.getConfluentSchemaRegistrySubject();
+            if (confluentSchemaRegUrl == null || confluentSchemaRegSubject == null) {
+              throw new IllegalArgumentException(
+                  "To read from Kafka, a schema must be provided directly or though Confluent "
+                      + "Schema Registry. Make sure you are providing one of these parameters.");
+            }
+            KafkaIO.Read<byte[], GenericRecord> kafkaRead =
+                KafkaIO.<byte[], GenericRecord>read()
+                    .withTopic(configuration.getTopic())
+                    .withBootstrapServers(configuration.getBootstrapServers())
+                    .withConsumerConfigUpdates(
+                        ImmutableMap.of(
+                            ConsumerConfig.GROUP_ID_CONFIG,
+                            "kafka-read-provider-" + groupId,
+                            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+                            true,
+                            ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
+                            100,
+                            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                            autoOffsetReset))
+                    .withKeyDeserializer(ByteArrayDeserializer.class)
+                    .withValueDeserializer(
+                        ConfluentSchemaRegistryDeserializerProvider.of(
+                            confluentSchemaRegUrl, confluentSchemaRegSubject));
+
+            PCollection<GenericRecord> kafkaValues =
+                input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
+
+            assert kafkaValues.getCoder().getClass() == AvroCoder.class;
+            AvroCoder<GenericRecord> coder = (AvroCoder<GenericRecord>) kafkaValues.getCoder();
+            kafkaValues = kafkaValues.setCoder(AvroUtils.schemaCoder(coder.getSchema()));
+            return PCollectionRowTuple.of("OUTPUT", kafkaValues.apply(Convert.toRows()));
+          }
+        };
+      }
+    }
+  };
+}
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java
new file mode 100644
index 00000000000..c8f76ecf3cc
--- /dev/null
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link KafkaSchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class KafkaSchemaTransformReadProviderTest {
+  private static final String AVRO_SCHEMA =
+      "{\"type\":\"record\",\"namespace\":\"com.example\","
+          + "\"name\":\"FullName\",\"fields\":[{\"name\":\"first\",\"type\":\"string\"},"
+          + "{\"name\":\"last\",\"type\":\"string\"}]}";
+
+  @Test
+  public void testValidConfigurations() {
+    assertThrows(
+        AssertionError.class,
+        () -> {
+          KafkaSchemaTransformReadConfiguration.builder()
+              .setDataFormat("UNUSUAL_FORMAT")
+              .setTopic("a_valid_topic")
+              .setBootstrapServers("a_valid_server")
+              .build()
+              .validate();
+        });
+
+    assertThrows(
+        IllegalStateException.class,
+        () -> {
+          KafkaSchemaTransformReadConfiguration.builder()
+              .setDataFormat("UNUSUAL_FORMAT")
+              // .setTopic("a_valid_topic")  // Topic is mandatory
+              .setBootstrapServers("a_valid_server")
+              .build()
+              .validate();
+        });
+
+    assertThrows(
+        IllegalStateException.class,
+        () -> {
+          KafkaSchemaTransformReadConfiguration.builder()
+              .setDataFormat("UNUSUAL_FORMAT")
+              .setTopic("a_valid_topic")
+              // .setBootstrapServers("a_valid_server")  // Bootstrap server is mandatory
+              .build()
+              .validate();
+        });
+  }
+
+  @Test
+  public void testFindTransformAndMakeItWork() {
+    ServiceLoader<SchemaTransformProvider> serviceLoader =
+        ServiceLoader.load(SchemaTransformProvider.class);
+    List<SchemaTransformProvider> providers =
+        StreamSupport.stream(serviceLoader.spliterator(), false)
+            .filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class)
+            .collect(Collectors.toList());
+    SchemaTransformProvider kafkaProvider = providers.get(0);
+    assertEquals(kafkaProvider.outputCollectionNames(), Lists.newArrayList("OUTPUT"));
+    assertEquals(kafkaProvider.inputCollectionNames(), Lists.newArrayList());
+
+    assertEquals(
+        Sets.newHashSet(
+            "bootstrapServers",
+            "topic",
+            "avroSchema",
+            "autoOffsetResetConfig",
+            "consumerConfigUpdates",
+            "dataFormat",
+            "confluentSchemaRegistrySubject",
+            "confluentSchemaRegistryUrl"),
+        kafkaProvider.configurationSchema().getFields().stream()
+            .map(field -> field.getName())
+            .collect(Collectors.toSet()));
+  }
+
+  @Test
+  public void testBuildTransformWithAvroSchema() {
+    ServiceLoader<SchemaTransformProvider> serviceLoader =
+        ServiceLoader.load(SchemaTransformProvider.class);
+    List<SchemaTransformProvider> providers =
+        StreamSupport.stream(serviceLoader.spliterator(), false)
+            .filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class)
+            .collect(Collectors.toList());
+    KafkaSchemaTransformReadProvider kafkaProvider =
+        (KafkaSchemaTransformReadProvider) providers.get(0);
+    kafkaProvider
+        .from(
+            KafkaSchemaTransformReadConfiguration.builder()
+                .setTopic("anytopic")
+                .setBootstrapServers("anybootstrap")
+                .setAvroSchema(AVRO_SCHEMA)
+                .build())
+        .buildTransform();
+  }
+}