You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/02/24 01:48:18 UTC
[beam] 01/01: Prioritise Avro providers from "extensions/core" (#25611)
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch users/damccorm/avro-cp
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5f870679918f6daea8c59aa3aced5626181717f2
Author: Alexey Romanenko <33...@users.noreply.github.com>
AuthorDate: Fri Feb 24 02:46:27 2023 +0100
Prioritise Avro providers from "extensions/core" (#25611)
---
.../org/apache/beam/sdk/schemas/io/Providers.java | 11 ++--
.../ExternalSchemaIOTransformRegistrar.java | 60 ++++++++++++++++++----
2 files changed, 53 insertions(+), 18 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java
index 39a23685eb2..c4a4902b9d6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.annotations.Internal;
@Internal
@Experimental(Kind.SCHEMAS)
public final class Providers {
+
public interface Identifyable {
/**
* Returns an id that uniquely represents this among others implementing its derived interface.
@@ -44,16 +45,12 @@ public final class Providers {
for (T provider : ServiceLoader.load(klass)) {
// Avro provider is treated as a special case since two Avro providers may want to be loaded -
// from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
- // TODO: this check should be removed once once AvroPayloadSerializerProvider from "core" is
+ // TODO: we won't need this check once all Avro providers from "core" will be
// removed
if (provider.identifier().equals("avro")) {
// Avro provider from "extensions/avro" must have a priority.
- if (provider
- .getClass()
- .getName()
- .equals(
- "org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider")) {
- // Use AvroPayloadSerializerProvider from extensions/avro by any case.
+ if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+ // Load Avro provider from "extensions/avro" by any case.
providers.put(provider.identifier(), provider);
} else {
// Load Avro provider from "core" if it was not loaded from Avro extension before.
diff --git a/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java b/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
index f61a34532cb..d8f797e8168 100644
--- a/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
+++ b/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
@@ -17,10 +17,13 @@
*/
package org.apache.beam.sdk.extensions.schemaio.expansion;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
import com.google.auto.service.AutoService;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import javax.annotation.Nullable;
@@ -50,22 +53,57 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
@Override
public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
- ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+ Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
try {
- for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
- builder.put(
- "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
- new ReaderBuilder(schemaIOProvider));
- builder.put(
- "beam:transform:org.apache.beam:schemaio_"
- + schemaIOProvider.identifier()
- + "_write:v1",
- new WriterBuilder(schemaIOProvider));
+ for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+ // Avro provider is treated as a special case since two Avro providers may want to be loaded
+ // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+ // TODO: we won't need this check once all Avro providers from "core" will be
+ // removed
+ if (provider.identifier().equals("avro")) {
+ // Avro provider from "extensions/avro" must have a priority.
+ if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+ // Load Avro provider from "extensions/avro" by any case.
+ registerProvider(providers, provider);
+ } else {
+ // Load Avro provider from "core" if it was not loaded from Avro extension before.
+ registerProviderOptionally(providers, provider);
+ }
+ } else {
+ final String identifier =
+ "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1";
+ checkState(
+ !providers.containsKey(identifier),
+ "Duplicate providers exist with identifier `%s` for class %s.",
+ identifier,
+ SchemaIOProvider.class);
+ registerProvider(providers, provider);
+ }
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
- return builder.build();
+ return ImmutableMap.copyOf(providers);
+ }
+
+ private void registerProvider(
+ Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
+ providers.put(
+ "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1",
+ new ReaderBuilder(provider));
+ providers.put(
+ "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1",
+ new WriterBuilder(provider));
+ }
+
+ private void registerProviderOptionally(
+ Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
+ providers.putIfAbsent(
+ "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1",
+ new ReaderBuilder(provider));
+ providers.putIfAbsent(
+ "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1",
+ new WriterBuilder(provider));
}
public static class Configuration {