You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/12/05 18:22:37 UTC
[beam] branch master updated: Support for JsonSchema in Kafka Read Schema Transform (#24272)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dbb584997c7 Support for JsonSchema in Kafka Read Schema Transform (#24272)
dbb584997c7 is described below
commit dbb584997c74766c10f8d9ad15d960bedf82e6f2
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Mon Dec 5 10:22:28 2022 -0800
Support for JsonSchema in Kafka Read Schema Transform (#24272)
* Implement KafkaReadSchemaTransform support for JSON
* fixup
* add dependencies
* fixup
---
sdks/java/io/kafka/build.gradle | 3 ++
... => KafkaReadSchemaTransformConfiguration.java} | 16 +++----
....java => KafkaReadSchemaTransformProvider.java} | 30 +++++++-----
...a => KafkaReadSchemaTransformProviderTest.java} | 55 +++++++++++++++++-----
4 files changed, 72 insertions(+), 32 deletions(-)
diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index 68ce449394b..5e3680a74ea 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -85,6 +85,9 @@ dependencies {
// "kafka-clients" has to be provided since user can use its own version.
exclude group: "org.apache.kafka", module: "kafka-clients"
}
+ // everit_json is needed for Kafka Read SchemaTransform tests that rely on JSON-schema translation.
+ permitUnusedDeclared library.java.everit_json_schema
+ provided library.java.everit_json_schema
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(":sdks:java:io:synthetic")
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
similarity index 88%
rename from sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java
rename to sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
index 9280de6c13f..b930e7baa46 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
@@ -36,7 +36,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
@Experimental
@DefaultSchema(AutoValueSchema.class)
@AutoValue
-public abstract class KafkaSchemaTransformReadConfiguration {
+public abstract class KafkaReadSchemaTransformConfiguration {
public static final Set<String> VALID_START_OFFSET_VALUES = Sets.newHashSet("earliest", "latest");
public static final Set<String> VALID_DATA_FORMATS = Sets.newHashSet("AVRO", "JSON");
@@ -50,9 +50,9 @@ public abstract class KafkaSchemaTransformReadConfiguration {
: "Valid data formats are " + VALID_DATA_FORMATS;
}
- /** Instantiates a {@link KafkaSchemaTransformReadConfiguration.Builder} instance. */
+ /** Instantiates a {@link KafkaReadSchemaTransformConfiguration.Builder} instance. */
public static Builder builder() {
- return new AutoValue_KafkaSchemaTransformReadConfiguration.Builder();
+ return new AutoValue_KafkaReadSchemaTransformConfiguration.Builder();
}
/** Sets the bootstrap servers for the Kafka consumer. */
@@ -69,7 +69,7 @@ public abstract class KafkaSchemaTransformReadConfiguration {
public abstract String getConfluentSchemaRegistrySubject();
@Nullable
- public abstract String getAvroSchema();
+ public abstract String getSchema();
@Nullable
public abstract String getAutoOffsetResetConfig();
@@ -80,7 +80,7 @@ public abstract class KafkaSchemaTransformReadConfiguration {
/** Sets the topic from which to read. */
public abstract String getTopic();
- /** Builder for the {@link KafkaSchemaTransformReadConfiguration}. */
+ /** Builder for the {@link KafkaReadSchemaTransformConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
@@ -91,7 +91,7 @@ public abstract class KafkaSchemaTransformReadConfiguration {
public abstract Builder setConfluentSchemaRegistrySubject(String subject);
- public abstract Builder setAvroSchema(String schema);
+ public abstract Builder setSchema(String schema);
public abstract Builder setDataFormat(String dataFormat);
@@ -102,7 +102,7 @@ public abstract class KafkaSchemaTransformReadConfiguration {
/** Sets the topic from which to read. */
public abstract Builder setTopic(String value);
- /** Builds a {@link KafkaSchemaTransformReadConfiguration} instance. */
- public abstract KafkaSchemaTransformReadConfiguration build();
+ /** Builds a {@link KafkaReadSchemaTransformConfiguration} instance. */
+ public abstract KafkaReadSchemaTransformConfiguration build();
}
}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
similarity index 86%
rename from sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java
rename to sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index a13c54c22aa..5c84922db20 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
import com.google.auto.service.AutoService;
import java.util.List;
+import java.util.Objects;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.schemas.Schema;
@@ -27,6 +28,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -41,22 +43,22 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@AutoService(SchemaTransformProvider.class)
-public class KafkaSchemaTransformReadProvider
- extends TypedSchemaTransformProvider<KafkaSchemaTransformReadConfiguration> {
+public class KafkaReadSchemaTransformProvider
+ extends TypedSchemaTransformProvider<KafkaReadSchemaTransformConfiguration> {
@Override
- protected Class<KafkaSchemaTransformReadConfiguration> configurationClass() {
- return KafkaSchemaTransformReadConfiguration.class;
+ protected Class<KafkaReadSchemaTransformConfiguration> configurationClass() {
+ return KafkaReadSchemaTransformConfiguration.class;
}
@Override
- protected SchemaTransform from(KafkaSchemaTransformReadConfiguration configuration) {
+ protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) {
return new KafkaReadSchemaTransform(configuration);
}
@Override
public String identifier() {
- return "kafka:read";
+ return "beam:schematransform:org.apache.beam:kafka_read:v1";
}
@Override
@@ -70,29 +72,33 @@ public class KafkaSchemaTransformReadProvider
}
private static class KafkaReadSchemaTransform implements SchemaTransform {
- private final KafkaSchemaTransformReadConfiguration configuration;
+ private final KafkaReadSchemaTransformConfiguration configuration;
- KafkaReadSchemaTransform(KafkaSchemaTransformReadConfiguration configuration) {
+ KafkaReadSchemaTransform(KafkaReadSchemaTransformConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}
@Override
public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
- final String avroSchema = configuration.getAvroSchema();
+ final String inputSchema = configuration.getSchema();
final Integer groupId = configuration.hashCode() % Integer.MAX_VALUE;
final String autoOffsetReset =
configuration.getAutoOffsetResetConfig() == null
? "latest"
: configuration.getAutoOffsetResetConfig();
- if (avroSchema != null) {
+ if (inputSchema != null) {
assert configuration.getConfluentSchemaRegistryUrl() == null
: "To read from Kafka, a schema must be provided directly or though Confluent "
+ "Schema Registry, but not both.";
final Schema beamSchema =
- AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(avroSchema));
+ Objects.equals(configuration.getDataFormat(), "JSON")
+ ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
+ : AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema));
SerializableFunction<byte[], Row> valueMapper =
- AvroUtils.getAvroBytesToRowFunction(beamSchema);
+ Objects.equals(configuration.getDataFormat(), "JSON")
+ ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
+ : AvroUtils.getAvroBytesToRowFunction(beamSchema);
return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
similarity index 66%
rename from sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java
rename to sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index c8f76ecf3cc..0ac42c38462 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -20,20 +20,24 @@ package org.apache.beam.sdk.io.kafka;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.Objects;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/** Tests for {@link KafkaSchemaTransformReadProvider}. */
+/** Tests for {@link KafkaReadSchemaTransformProvider}. */
@RunWith(JUnit4.class)
-public class KafkaSchemaTransformReadProviderTest {
+public class KafkaReadSchemaTransformProviderTest {
private static final String AVRO_SCHEMA =
"{\"type\":\"record\",\"namespace\":\"com.example\","
+ "\"name\":\"FullName\",\"fields\":[{\"name\":\"first\",\"type\":\"string\"},"
@@ -44,7 +48,7 @@ public class KafkaSchemaTransformReadProviderTest {
assertThrows(
AssertionError.class,
() -> {
- KafkaSchemaTransformReadConfiguration.builder()
+ KafkaReadSchemaTransformConfiguration.builder()
.setDataFormat("UNUSUAL_FORMAT")
.setTopic("a_valid_topic")
.setBootstrapServers("a_valid_server")
@@ -55,7 +59,7 @@ public class KafkaSchemaTransformReadProviderTest {
assertThrows(
IllegalStateException.class,
() -> {
- KafkaSchemaTransformReadConfiguration.builder()
+ KafkaReadSchemaTransformConfiguration.builder()
.setDataFormat("UNUSUAL_FORMAT")
// .setTopic("a_valid_topic") // Topic is mandatory
.setBootstrapServers("a_valid_server")
@@ -66,7 +70,7 @@ public class KafkaSchemaTransformReadProviderTest {
assertThrows(
IllegalStateException.class,
() -> {
- KafkaSchemaTransformReadConfiguration.builder()
+ KafkaReadSchemaTransformConfiguration.builder()
.setDataFormat("UNUSUAL_FORMAT")
.setTopic("a_valid_topic")
// .setBootstrapServers("a_valid_server") // Bootstrap server is mandatory
@@ -81,7 +85,7 @@ public class KafkaSchemaTransformReadProviderTest {
ServiceLoader.load(SchemaTransformProvider.class);
List<SchemaTransformProvider> providers =
StreamSupport.stream(serviceLoader.spliterator(), false)
- .filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class)
+ .filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
.collect(Collectors.toList());
SchemaTransformProvider kafkaProvider = providers.get(0);
assertEquals(kafkaProvider.outputCollectionNames(), Lists.newArrayList("OUTPUT"));
@@ -91,7 +95,7 @@ public class KafkaSchemaTransformReadProviderTest {
Sets.newHashSet(
"bootstrapServers",
"topic",
- "avroSchema",
+ "schema",
"autoOffsetResetConfig",
"consumerConfigUpdates",
"dataFormat",
@@ -108,16 +112,43 @@ public class KafkaSchemaTransformReadProviderTest {
ServiceLoader.load(SchemaTransformProvider.class);
List<SchemaTransformProvider> providers =
StreamSupport.stream(serviceLoader.spliterator(), false)
- .filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class)
+ .filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
.collect(Collectors.toList());
- KafkaSchemaTransformReadProvider kafkaProvider =
- (KafkaSchemaTransformReadProvider) providers.get(0);
+ KafkaReadSchemaTransformProvider kafkaProvider =
+ (KafkaReadSchemaTransformProvider) providers.get(0);
kafkaProvider
.from(
- KafkaSchemaTransformReadConfiguration.builder()
+ KafkaReadSchemaTransformConfiguration.builder()
.setTopic("anytopic")
.setBootstrapServers("anybootstrap")
- .setAvroSchema(AVRO_SCHEMA)
+ .setSchema(AVRO_SCHEMA)
+ .build())
+ .buildTransform();
+ }
+
+ @Test
+ public void testBuildTransformWithJsonSchema() throws IOException {
+ ServiceLoader<SchemaTransformProvider> serviceLoader =
+ ServiceLoader.load(SchemaTransformProvider.class);
+ List<SchemaTransformProvider> providers =
+ StreamSupport.stream(serviceLoader.spliterator(), false)
+ .filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
+ .collect(Collectors.toList());
+ KafkaReadSchemaTransformProvider kafkaProvider =
+ (KafkaReadSchemaTransformProvider) providers.get(0);
+ kafkaProvider
+ .from(
+ KafkaReadSchemaTransformConfiguration.builder()
+ .setTopic("anytopic")
+ .setBootstrapServers("anybootstrap")
+ .setDataFormat("JSON")
+ .setSchema(
+ new String(
+ ByteStreams.toByteArray(
+ Objects.requireNonNull(
+ getClass()
+ .getResourceAsStream("/json-schema/basic_json_schema.json"))),
+ StandardCharsets.UTF_8))
.build())
.buildTransform();
}