You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "mosche (via GitHub)" <gi...@apache.org> on 2023/02/23 17:01:33 UTC

[GitHub] [beam] mosche commented on a diff in pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

mosche commented on code in PR #25611:
URL: https://github.com/apache/beam/pull/25611#discussion_r1115987867


##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +51,50 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            loadProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            loadProviderOptinal(providers, provider);
+          }
+        } else {
+          loadProvider(providers, provider);
+        }
       }
     } catch (Exception e) {
       throw new RuntimeException(e.getMessage());
     }
-    return builder.build();
+    return ImmutableMap.copyOf(providers);
+  }
+
+  private void loadProvider(
+      Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
+    providers.put(
+        String.format("beam:transform:org.apache.beam:schemaio_%s_read:v1", provider.identifier()),

Review Comment:
   Honestly, better to keep this as it was before: `"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1"`. 



##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +51,50 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            loadProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            loadProviderOptinal(providers, provider);
+          }
+        } else {

Review Comment:
   This silently overrides duplicates. I think the previous behavior should be kept and instead fail in such a case (except for the avro case.)



##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +51,50 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            loadProvider(providers, provider);

Review Comment:
   nit, how about `registerProvider`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org