You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "aromanenko-dev (via GitHub)" <gi...@apache.org> on 2023/02/23 17:14:25 UTC

[GitHub] [beam] aromanenko-dev commented on a diff in pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

aromanenko-dev commented on code in PR #25611:
URL: https://github.com/apache/beam/pull/25611#discussion_r1116007241


##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +51,50 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            loadProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            loadProviderOptinal(providers, provider);
+          }
+        } else {
+          loadProvider(providers, provider);
+        }
       }
     } catch (Exception e) {
       throw new RuntimeException(e.getMessage());
     }
-    return builder.build();
+    return ImmutableMap.copyOf(providers);
+  }
+
+  private void loadProvider(
+      Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
+    providers.put(
+        String.format("beam:transform:org.apache.beam:schemaio_%s_read:v1", provider.identifier()),

Review Comment:
   Done



##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +51,50 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            loadProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            loadProviderOptinal(providers, provider);
+          }
+        } else {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org