You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "aromanenko-dev (via GitHub)" <gi...@apache.org> on 2023/02/23 16:39:08 UTC

[GitHub] [beam] aromanenko-dev opened a new pull request, #25611: [AVRO] Prioritise Avro providers from "extensions/core"

aromanenko-dev opened a new pull request, #25611:
URL: https://github.com/apache/beam/pull/25611

   Closes #25601
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442378684

   the affected test passed (seen from log of Python3.10 postcommit):
   ```
   14:30:45 PASSED                                                                   [ 75%]
   14:30:45 apache_beam/io/external/xlang_jdbcio_it_test.py::CrossLanguageJdbcIOTest::test_xlang_jdbc_write_read_1_mysql 
   14:30:45 -------------------------------- live log call ---------------------------------
   14:30:45 INFO     testcontainers.core.container:container.py:53 Pulling image mysql:latest
   14:30:47 INFO     testcontainers.core.container:container.py:64 Container started: 22da3809393d
   14:30:47 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:47 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:48 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:49 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:50 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:51 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:52 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:53 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:54 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:55 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:56 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:57 INFO     testcontainers.core.waiting_utils:waiting_utils.py:46 Waiting to be ready...
   14:30:58 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442128269

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #25611:
URL: https://github.com/apache/beam/pull/25611#discussion_r1115987867


##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +51,50 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            loadProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            loadProviderOptinal(providers, provider);
+          }
+        } else {
+          loadProvider(providers, provider);
+        }
       }
     } catch (Exception e) {
       throw new RuntimeException(e.getMessage());
     }
-    return builder.build();
+    return ImmutableMap.copyOf(providers);
+  }
+
+  private void loadProvider(
+      Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
+    providers.put(
+        String.format("beam:transform:org.apache.beam:schemaio_%s_read:v1", provider.identifier()),

Review Comment:
   Honestly, better to keep this as it was before: `"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1"`. 



##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +51,50 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            loadProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            loadProviderOptinal(providers, provider);
+          }
+        } else {

Review Comment:
   This silently overrides duplicates. I think the previous behavior should be kept and instead fail in such a case (except for the avro case.)



##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +51,50 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            loadProvider(providers, provider);

Review Comment:
   nit, how about `registerProvider`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442375212

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on code in PR #25611:
URL: https://github.com/apache/beam/pull/25611#discussion_r1116007241


##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +51,50 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            loadProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            loadProviderOptinal(providers, provider);
+          }
+        } else {
+          loadProvider(providers, provider);
+        }
       }
     } catch (Exception e) {
       throw new RuntimeException(e.getMessage());
     }
-    return builder.build();
+    return ImmutableMap.copyOf(providers);
+  }
+
+  private void loadProvider(
+      Map<String, ExternalTransformBuilder<?, ?, ?>> providers, SchemaIOProvider provider) {
+    providers.put(
+        String.format("beam:transform:org.apache.beam:schemaio_%s_read:v1", provider.identifier()),

Review Comment:
   Done



##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +51,50 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            loadProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            loadProviderOptinal(providers, provider);
+          }
+        } else {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442089534

   Run Python3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442187413

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442328534

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442188562

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442306664

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442328889

   Run Java_Hadoop_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on code in PR #25611:
URL: https://github.com/apache/beam/pull/25611#discussion_r1116009123


##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +53,55 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            registerProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            registerProviderOptionally(providers, provider);
+          }
+        } else {
+          checkState(
+              !providers.containsKey(provider.identifier()),

Review Comment:
   Right-right, copy-paste, sorry...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442374994

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on code in PR #25611:
URL: https://github.com/apache/beam/pull/25611#discussion_r1116007589


##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +51,50 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            loadProvider(providers, provider);

Review Comment:
   Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442093925

   CC: @mosche @Abacn @sclukas77 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on code in PR #25611:
URL: https://github.com/apache/beam/pull/25611#discussion_r1116009123


##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +53,55 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            registerProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            registerProviderOptionally(providers, provider);
+          }
+        } else {
+          checkState(
+              !providers.containsKey(provider.identifier()),

Review Comment:
   Right-right, copy-paste, sorry... actually, it didn't exist before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442162824

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #25611:
URL: https://github.com/apache/beam/pull/25611#discussion_r1116057485


##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -40,32 +43,75 @@
 import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Experimental(Experimental.Kind.PORTABILITY)
 @AutoService(ExternalTransformRegistrar.class)
 @SuppressWarnings({
   "rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
 })
 public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegistrar {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ExternalSchemaIOTransformRegistrar.class);
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            registerProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            LOG.warn(

Review Comment:
   Same as above regarding logs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #25611:
URL: https://github.com/apache/beam/pull/25611#discussion_r1116008188


##########
sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java:
##########
@@ -50,22 +53,55 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
-    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    Map<String, ExternalTransformBuilder<?, ?, ?>> providers = new HashMap<>();
     try {
-      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
-            new ReaderBuilder(schemaIOProvider));
-        builder.put(
-            "beam:transform:org.apache.beam:schemaio_"
-                + schemaIOProvider.identifier()
-                + "_write:v1",
-            new WriterBuilder(schemaIOProvider));
+      for (SchemaIOProvider provider : ServiceLoader.load(SchemaIOProvider.class)) {
+        // Avro provider is treated as a special case since two Avro providers may want to be loaded
+        // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
+        // TODO: we won't need this check once all Avro providers from "core" will be
+        // removed
+        if (provider.identifier().equals("avro")) {
+          // Avro provider from "extensions/avro" must have a priority.
+          if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+            // Load Avro provider from "extensions/avro" by any case.
+            registerProvider(providers, provider);
+          } else {
+            // Load Avro provider from "core" if it was not loaded from Avro extension before.
+            registerProviderOptionally(providers, provider);
+          }
+        } else {
+          checkState(
+              !providers.containsKey(provider.identifier()),

Review Comment:
   @aromanenko-dev you have to check for `"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_read:v1"` or `"beam:transform:org.apache.beam:schemaio_" + provider.identifier() + "_write:v1"`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442329527

   Rerunning flakes impacted by jenkins flakiness (underlying errors were Jenkins errors, not test issues)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm merged pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm merged PR #25611:
URL: https://github.com/apache/beam/pull/25611


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442185528

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #25611:
URL: https://github.com/apache/beam/pull/25611#discussion_r1116057032


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/Providers.java:
##########
@@ -44,19 +48,19 @@ public static <T extends Identifyable> Map<String, T> loadProviders(Class<T> kla
     for (T provider : ServiceLoader.load(klass)) {
       // Avro provider is treated as a special case since two Avro providers may want to be loaded -
       // from "core" (deprecated) and from "extensions/avro" (actual) - but only one must succeed.
-      // TODO: this check should be removed once once AvroPayloadSerializerProvider from "core" is
+      // TODO: we won't need this check once all Avro providers from "core" will be
       // removed
       if (provider.identifier().equals("avro")) {
         // Avro provider from "extensions/avro" must have a priority.
-        if (provider
-            .getClass()
-            .getName()
-            .equals(
-                "org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider")) {
-          // Use AvroPayloadSerializerProvider from extensions/avro by any case.
+        if (provider.getClass().getName().startsWith("org.apache.beam.sdk.extensions.avro")) {
+          // Load Avro provider from "extensions/avro" by any case.
           providers.put(provider.identifier(), provider);
         } else {
           // Load Avro provider from "core" if it was not loaded from Avro extension before.
+          LOG.warn(

Review Comment:
   Unfortunately this is generating noise and might warn users even if the Avro extension is on the classpath depending on the order :/
   I think it's better to keep this as a separate task @aromanenko-dev. Adding warning logs isn't urgent at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442138272

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #25611: [AVRO] Prioritise Avro providers from "extensions/core"

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #25611:
URL: https://github.com/apache/beam/pull/25611#issuecomment-1442259198

   Run Python 3.10 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org