You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/02/24 01:46:40 UTC

[beam] branch master updated: Prioritise Avro providers from "extensions/core" (#25611)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dcfadddf52a Prioritise Avro providers from "extensions/core" (#25611)
dcfadddf52a is described below

commit dcfadddf52ae68a8d3d605e3a072c5d611b1fe14
Author: Alexey Romanenko <33...@users.noreply.github.com>
AuthorDate: Fri Feb 24 02:46:27 2023 +0100

    Prioritise Avro providers from "extensions/core" (#25611)
---
 .../org/apache/beam/sdk/schemas/io/Providers.java  | 11 ++--
 .../ExternalSchemaIOTransformRegistrar.java        | 60 ++++++++++++++++++----
 2 files changed, 53 insertions(+), 18 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java
index 39a23685eb2..c4a4902b9d6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.annotations.Internal;
 @Internal
 @Experimental(Kind.SCHEMAS)
 public final class Providers {
+
   public interface Identifyable {
     /**
      * Returns an id that uniquely represents this among others implementing its derived interface.
@@ -44,16 +45,12 @@ public final class Providers {
     for (T provider : ServiceLoader.load(klass)) {
       // Avro provider is treated as a special case since two Avro providers may want to be loaded -
       // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
-      // TODO: this check should be removed once once AvroPayloadSerializerProvider from "core" is
+      // TODO: we won't need this check once all Avro providers from "core" will be
       // removed
       if (provider.identifier().equals("avro")) {
         // Avro provider from "extensions/avro" must have a priority.
-        if (provider
-            .getClass()
-            .getName()
-            .equals(
-                "org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider")) {
-          // Use AvroPayloadSerializerProvider from extensions/avro by any case.
+        if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+          // Load Avro provider from "extensions/avro" by any case.
           providers.put(provider.identifier(), provider);
         } else {
           // Load Avro provider from "core" if it was not loaded from Avro extension before.
diff --git a/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java b/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
index f61a34532cb..d8f797e8168 100644
--- a/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
+++ b/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.extensions.schemaio.expansion;
 
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
 import com.google.auto.service.AutoService;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.ServiceLoader;
 import javax.annotation.Nullable;
@@ -50,22 +53,57 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            registerProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            registerProviderOptionally(providers, provider);
+          }
+        } else {
+          final String identifier =
+              "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1";
+          checkState(
+              !providers.containsKey(identifier),
+              "Duplicate providers exist with identifier `%s` for class %s.",
+              identifier,
+              SchemaIOProvider.class);
+          registerProvider(providers, provider);
+        }
       }
     } catch (Exception e) {
       throw new RuntimeException(e.getMessage());
     }
-    return builder.build();
+    return ImmutableMap.copyOf(providers);
+  }
+
+  private void registerProvider(
+      Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
+    providers.put(
+        "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1",
+        new ReaderBuilder(provider));
+    providers.put(
+        "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1",
+        new WriterBuilder(provider));
+  }
+
+  private void registerProviderOptionally(
+      Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
+    providers.putIfAbsent(
+        "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1",
+        new ReaderBuilder(provider));
+    providers.putIfAbsent(
+        "beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1",
+        new WriterBuilder(provider));
   }
 
   public static class Configuration {