You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/12/05 18:22:37 UTC

[beam] branch master updated: Support for JsonSchema in Kafka Read Schema Transform (#24272)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dbb584997c7 Support for JsonSchema in Kafka Read Schema Transform (#24272)
dbb584997c7 is described below

commit dbb584997c74766c10f8d9ad15d960bedf82e6f2
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Mon Dec 5 10:22:28 2022 -0800

    Support for JsonSchema in Kafka Read Schema Transform (#24272)
    
    * Implement KafkaReadSchemaTransform support for JSON
    
    * fixup
    
    * add dependencies
    
    * fixup
---
 sdks/java/io/kafka/build.gradle                    |  3 ++
 ... => KafkaReadSchemaTransformConfiguration.java} | 16 +++----
 ....java => KafkaReadSchemaTransformProvider.java} | 30 +++++++-----
 ...a => KafkaReadSchemaTransformProviderTest.java} | 55 +++++++++++++++++-----
 4 files changed, 72 insertions(+), 32 deletions(-)

diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index 68ce449394b..5e3680a74ea 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -85,6 +85,9 @@ dependencies {
     // "kafka-clients" has to be provided since user can use its own version.
     exclude group: "org.apache.kafka", module: "kafka-clients"
   }
+  // everit_json is needed for Kafka Read SchemaTransform tests that rely on JSON-schema translation.
+  permitUnusedDeclared library.java.everit_json_schema
+  provided library.java.everit_json_schema
   testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
   testImplementation project(":sdks:java:io:synthetic")
   testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
similarity index 88%
rename from sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java
rename to sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
index 9280de6c13f..b930e7baa46 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
@@ -36,7 +36,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 @Experimental
 @DefaultSchema(AutoValueSchema.class)
 @AutoValue
-public abstract class KafkaSchemaTransformReadConfiguration {
+public abstract class KafkaReadSchemaTransformConfiguration {
 
   public static final Set<String> VALID_START_OFFSET_VALUES = Sets.newHashSet("earliest", "latest");
   public static final Set<String> VALID_DATA_FORMATS = Sets.newHashSet("AVRO", "JSON");
@@ -50,9 +50,9 @@ public abstract class KafkaSchemaTransformReadConfiguration {
         : "Valid data formats are " + VALID_DATA_FORMATS;
   }
 
-  /** Instantiates a {@link KafkaSchemaTransformReadConfiguration.Builder} instance. */
+  /** Instantiates a {@link KafkaReadSchemaTransformConfiguration.Builder} instance. */
   public static Builder builder() {
-    return new AutoValue_KafkaSchemaTransformReadConfiguration.Builder();
+    return new AutoValue_KafkaReadSchemaTransformConfiguration.Builder();
   }
 
   /** Sets the bootstrap servers for the Kafka consumer. */
@@ -69,7 +69,7 @@ public abstract class KafkaSchemaTransformReadConfiguration {
   public abstract String getConfluentSchemaRegistrySubject();
 
   @Nullable
-  public abstract String getAvroSchema();
+  public abstract String getSchema();
 
   @Nullable
   public abstract String getAutoOffsetResetConfig();
@@ -80,7 +80,7 @@ public abstract class KafkaSchemaTransformReadConfiguration {
   /** Sets the topic from which to read. */
   public abstract String getTopic();
 
-  /** Builder for the {@link KafkaSchemaTransformReadConfiguration}. */
+  /** Builder for the {@link KafkaReadSchemaTransformConfiguration}. */
   @AutoValue.Builder
   public abstract static class Builder {
 
@@ -91,7 +91,7 @@ public abstract class KafkaSchemaTransformReadConfiguration {
 
     public abstract Builder setConfluentSchemaRegistrySubject(String subject);
 
-    public abstract Builder setAvroSchema(String schema);
+    public abstract Builder setSchema(String schema);
 
     public abstract Builder setDataFormat(String dataFormat);
 
@@ -102,7 +102,7 @@ public abstract class KafkaSchemaTransformReadConfiguration {
     /** Sets the topic from which to read. */
     public abstract Builder setTopic(String value);
 
-    /** Builds a {@link KafkaSchemaTransformReadConfiguration} instance. */
-    public abstract KafkaSchemaTransformReadConfiguration build();
+    /** Builds a {@link KafkaReadSchemaTransformConfiguration} instance. */
+    public abstract KafkaReadSchemaTransformConfiguration build();
   }
 }
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
similarity index 86%
rename from sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java
rename to sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index a13c54c22aa..5c84922db20 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
 
 import com.google.auto.service.AutoService;
 import java.util.List;
+import java.util.Objects;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.schemas.Schema;
@@ -27,6 +28,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.utils.JsonUtils;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -41,22 +43,22 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 
 @AutoService(SchemaTransformProvider.class)
-public class KafkaSchemaTransformReadProvider
-    extends TypedSchemaTransformProvider<KafkaSchemaTransformReadConfiguration> {
+public class KafkaReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<KafkaReadSchemaTransformConfiguration> {
 
   @Override
-  protected Class<KafkaSchemaTransformReadConfiguration> configurationClass() {
-    return KafkaSchemaTransformReadConfiguration.class;
+  protected Class<KafkaReadSchemaTransformConfiguration> configurationClass() {
+    return KafkaReadSchemaTransformConfiguration.class;
   }
 
   @Override
-  protected SchemaTransform from(KafkaSchemaTransformReadConfiguration configuration) {
+  protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) {
     return new KafkaReadSchemaTransform(configuration);
   }
 
   @Override
   public String identifier() {
-    return "kafka:read";
+    return "beam:schematransform:org.apache.beam:kafka_read:v1";
   }
 
   @Override
@@ -70,29 +72,33 @@ public class KafkaSchemaTransformReadProvider
   }
 
   private static class KafkaReadSchemaTransform implements SchemaTransform {
-    private final KafkaSchemaTransformReadConfiguration configuration;
+    private final KafkaReadSchemaTransformConfiguration configuration;
 
-    KafkaReadSchemaTransform(KafkaSchemaTransformReadConfiguration configuration) {
+    KafkaReadSchemaTransform(KafkaReadSchemaTransformConfiguration configuration) {
       configuration.validate();
       this.configuration = configuration;
     }
 
     @Override
     public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
-      final String avroSchema = configuration.getAvroSchema();
+      final String inputSchema = configuration.getSchema();
       final Integer groupId = configuration.hashCode() % Integer.MAX_VALUE;
       final String autoOffsetReset =
           configuration.getAutoOffsetResetConfig() == null
               ? "latest"
               : configuration.getAutoOffsetResetConfig();
-      if (avroSchema != null) {
+      if (inputSchema != null) {
         assert configuration.getConfluentSchemaRegistryUrl() == null
             : "To read from Kafka, a schema must be provided directly or though Confluent "
                 + "Schema Registry, but not both.";
         final Schema beamSchema =
-            AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(avroSchema));
+            Objects.equals(configuration.getDataFormat(), "JSON")
+                ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
+                : AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema));
         SerializableFunction<byte[], Row> valueMapper =
-            AvroUtils.getAvroBytesToRowFunction(beamSchema);
+            Objects.equals(configuration.getDataFormat(), "JSON")
+                ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
+                : AvroUtils.getAvroBytesToRowFunction(beamSchema);
         return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
           @Override
           public PCollectionRowTuple expand(PCollectionRowTuple input) {
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
similarity index 66%
rename from sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java
rename to sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index c8f76ecf3cc..0ac42c38462 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -20,20 +20,24 @@ package org.apache.beam.sdk.io.kafka;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Objects;
 import java.util.ServiceLoader;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/** Tests for {@link KafkaSchemaTransformReadProvider}. */
+/** Tests for {@link KafkaReadSchemaTransformProvider}. */
 @RunWith(JUnit4.class)
-public class KafkaSchemaTransformReadProviderTest {
+public class KafkaReadSchemaTransformProviderTest {
   private static final String AVRO_SCHEMA =
       "{\"type\":\"record\",\"namespace\":\"com.example\","
           + "\"name\":\"FullName\",\"fields\":[{\"name\":\"first\",\"type\":\"string\"},"
@@ -44,7 +48,7 @@ public class KafkaSchemaTransformReadProviderTest {
     assertThrows(
         AssertionError.class,
         () -> {
-          KafkaSchemaTransformReadConfiguration.builder()
+          KafkaReadSchemaTransformConfiguration.builder()
               .setDataFormat("UNUSUAL_FORMAT")
               .setTopic("a_valid_topic")
               .setBootstrapServers("a_valid_server")
@@ -55,7 +59,7 @@ public class KafkaSchemaTransformReadProviderTest {
     assertThrows(
         IllegalStateException.class,
         () -> {
-          KafkaSchemaTransformReadConfiguration.builder()
+          KafkaReadSchemaTransformConfiguration.builder()
               .setDataFormat("UNUSUAL_FORMAT")
               // .setTopic("a_valid_topic")  // Topic is mandatory
               .setBootstrapServers("a_valid_server")
@@ -66,7 +70,7 @@ public class KafkaSchemaTransformReadProviderTest {
     assertThrows(
         IllegalStateException.class,
         () -> {
-          KafkaSchemaTransformReadConfiguration.builder()
+          KafkaReadSchemaTransformConfiguration.builder()
               .setDataFormat("UNUSUAL_FORMAT")
               .setTopic("a_valid_topic")
               // .setBootstrapServers("a_valid_server")  // Bootstrap server is mandatory
@@ -81,7 +85,7 @@ public class KafkaSchemaTransformReadProviderTest {
         ServiceLoader.load(SchemaTransformProvider.class);
     List<SchemaTransformProvider> providers =
         StreamSupport.stream(serviceLoader.spliterator(), false)
-            .filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class)
+            .filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
             .collect(Collectors.toList());
     SchemaTransformProvider kafkaProvider = providers.get(0);
     assertEquals(kafkaProvider.outputCollectionNames(), Lists.newArrayList("OUTPUT"));
@@ -91,7 +95,7 @@ public class KafkaSchemaTransformReadProviderTest {
         Sets.newHashSet(
             "bootstrapServers",
             "topic",
-            "avroSchema",
+            "schema",
             "autoOffsetResetConfig",
             "consumerConfigUpdates",
             "dataFormat",
@@ -108,16 +112,43 @@ public class KafkaSchemaTransformReadProviderTest {
         ServiceLoader.load(SchemaTransformProvider.class);
     List<SchemaTransformProvider> providers =
         StreamSupport.stream(serviceLoader.spliterator(), false)
-            .filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class)
+            .filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
             .collect(Collectors.toList());
-    KafkaSchemaTransformReadProvider kafkaProvider =
-        (KafkaSchemaTransformReadProvider) providers.get(0);
+    KafkaReadSchemaTransformProvider kafkaProvider =
+        (KafkaReadSchemaTransformProvider) providers.get(0);
     kafkaProvider
         .from(
-            KafkaSchemaTransformReadConfiguration.builder()
+            KafkaReadSchemaTransformConfiguration.builder()
                 .setTopic("anytopic")
                 .setBootstrapServers("anybootstrap")
-                .setAvroSchema(AVRO_SCHEMA)
+                .setSchema(AVRO_SCHEMA)
+                .build())
+        .buildTransform();
+  }
+
+  @Test
+  public void testBuildTransformWithJsonSchema() throws IOException {
+    ServiceLoader<SchemaTransformProvider> serviceLoader =
+        ServiceLoader.load(SchemaTransformProvider.class);
+    List<SchemaTransformProvider> providers =
+        StreamSupport.stream(serviceLoader.spliterator(), false)
+            .filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
+            .collect(Collectors.toList());
+    KafkaReadSchemaTransformProvider kafkaProvider =
+        (KafkaReadSchemaTransformProvider) providers.get(0);
+    kafkaProvider
+        .from(
+            KafkaReadSchemaTransformConfiguration.builder()
+                .setTopic("anytopic")
+                .setBootstrapServers("anybootstrap")
+                .setDataFormat("JSON")
+                .setSchema(
+                    new String(
+                        ByteStreams.toByteArray(
+                            Objects.requireNonNull(
+                                getClass()
+                                    .getResourceAsStream("/json-schema/basic_json_schema.json"))),
+                        StandardCharsets.UTF_8))
                 .build())
         .buildTransform();
   }