You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "RustedBones (via GitHub)" <gi...@apache.org> on 2023/04/18 10:14:52 UTC

[GitHub] [beam] RustedBones opened a new pull request, #26320: Select dedicated avro darum reader and writer

RustedBones opened a new pull request, #26320:
URL: https://github.com/apache/beam/pull/26320

   Follow-up for https://github.com/apache/beam/pull/16271
   
   * [AVRO-1891](https://issues.apache.org/jira/browse/AVRO-1891) is fixed by adding the conversion at compile time in the generated class' model. Beam does not currently read the `SpecificData` form the class.
   * [BEAM-8329](https://issues.apache.org/jira/browse/BEAM-8329) ReflectData is not supposed to use the logical conversions by default. Users should either use a `CustomEncoding` with the `AvroEncode` annotation or manually add the conversion to their own `ReflectData` which is not possible everywhere in beam.
   * [BEAM-9144](https://issues.apache.org/jira/browse/BEAM-9144) is a 'snowflake'. All other joda logical time conversions are still failing. (`date`, `time-millis`, `time-micros`, `timestampt-micros`). This should not be in the framework but in the user code
   
   This PR aims to
   * Use proper logical type conversions when using avro 1.9 specific data (`useReflectApi` must be disabled)
   * Give users the possibility to provide their own avro data in the `DatumWriterFactory` and `DatumReaderFactory` used by beam internals


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1210071216


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroReflectCoder.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.coders;
+
+import org.apache.avro.Schema;
+import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.Union;
+import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
+
+/**
+ * AvroCoder specialisation for avro classes using Java reflection.
+ *
+ * <p>Only concrete classes with a no-argument constructor can be mapped to Avro records. All
+ * inherited fields that are not static or transient are included. Fields are not permitted to be
+ * null unless annotated by {@link Nullable} or a {@link Union} schema containing {@code "null"}.
+ */
+public class AvroReflectCoder<T> extends AvroCoder<T> {

Review Comment:
   Did as the `SchemaCoderCloudObjectTranslator` and serialized the `datumFactory` as bytes.
   Removed the subclasses



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/ParquetWriteSchemaTransformFormatProvider.java:
##########
@@ -23,7 +23,7 @@
 import com.google.auto.service.AutoService;
 import java.util.Optional;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.sdk.coders.AvroGenericCoder;

Review Comment:
   This one was probably forgotten after migration to the extension module



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "reuvenlax (via GitHub)" <gi...@apache.org>.
reuvenlax commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1234703649


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java:
##########
@@ -20,38 +20,41 @@
 import org.apache.avro.Schema;
 import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.StringUtils;
 
 /** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
 @SuppressWarnings({
   "rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
 })
 class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder> {
-  private static final String TYPE_FIELD = "type";
+  private static final String DATUM_FACTORY_FIELD = "datum_factory";
   private static final String SCHEMA_FIELD = "schema";
-  private static final String REFLECT_API_FIELD = "reflect_api";
 
   @Override
   public CloudObject toCloudObject(AvroCoder target, SdkComponents sdkComponents) {
     CloudObject base = CloudObject.forClass(AvroCoder.class);
+    byte[] serializedDatumFactory =
+        SerializableUtils.serializeToByteArray(target.getDatumFactory());
+    Structs.addString(
+        base, DATUM_FACTORY_FIELD, StringUtils.byteArrayToJsonString(serializedDatumFactory));
     Structs.addString(base, SCHEMA_FIELD, target.getSchema().toString());
-    Structs.addString(base, TYPE_FIELD, target.getType().getName());
-    Structs.addBoolean(base, REFLECT_API_FIELD, target.useReflectApi());
     return base;
   }
 
   @Override
   public AvroCoder<?> fromCloudObject(CloudObject cloudObject) {

Review Comment:
   Yes, I think this is problematic. Any pipeline that uses AvroCoder will now fail to update, which is something we generally avoid.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1579002484

   CC: @robertwb @reuvenlax @pabloem 
   Could some of you guys take a look on related Dataflow changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1211884816


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroJodaTimeConversions.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.schemas.utils;
+
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * Avro 1.8 & 1.9 ship joda time conversions.
+ *
+ * <p>Since avro 1.10, only java time conversions are included. As beam is still joda time based,
+ * and user may work with avro joda time generated classes, Provide joda time logical conversions.
+ *
+ * <p>This code is copied from avro 1.8.2 TimeConversions.
+ */
+public class AvroJodaTimeConversions {
+
+  public static class DateConversion extends Conversion<LocalDate> {
+    private static final LocalDate EPOCH_DATE = new LocalDate(1970, 1, 1);
+
+    @Override
+    public Class<LocalDate> getConvertedType() {
+      return LocalDate.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "date";
+    }
+
+    @Override
+    public LocalDate fromInt(Integer daysFromEpoch, Schema schema, LogicalType type) {
+      return EPOCH_DATE.plusDays(daysFromEpoch);
+    }
+
+    @Override
+    public Integer toInt(LocalDate date, Schema schema, LogicalType type) {
+      return Days.daysBetween(EPOCH_DATE, date).getDays();
+    }
+  }
+
+  public static class TimeConversion extends Conversion<LocalTime> {
+    @Override
+    public Class<LocalTime> getConvertedType() {
+      return LocalTime.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "time-millis";
+    }
+
+    @Override
+    public LocalTime fromInt(Integer millisFromMidnight, Schema schema, LogicalType type) {
+      return LocalTime.fromMillisOfDay(millisFromMidnight);
+    }
+
+    @Override
+    public Integer toInt(LocalTime time, Schema schema, LogicalType type) {
+      return time.millisOfDay().get();
+    }
+  }
+
+  public static class TimeMicrosConversion extends Conversion<LocalTime> {
+    @Override
+    public Class<LocalTime> getConvertedType() {
+      return LocalTime.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "time-micros";
+    }
+
+    @Override
+    public LocalTime fromLong(Long microsFromMidnight, Schema schema, LogicalType type) {
+      return LocalTime.fromMillisOfDay(microsFromMidnight / 1000);
+    }
+  }
+
+  public static class LossyTimeMicrosConversion extends TimeMicrosConversion {
+    @Override
+    public Long toLong(LocalTime time, Schema schema, LogicalType type) {
+      return 1000 * (long) time.millisOfDay().get();
+    }
+  }
+
+  public static class TimestampConversion extends Conversion<DateTime> {
+    @Override
+    public Class<DateTime> getConvertedType() {
+      return DateTime.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "timestamp-millis";
+    }
+
+    @Override
+    public DateTime fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
+      return new DateTime(millisFromEpoch, DateTimeZone.UTC);
+    }
+
+    @Override
+    public Long toLong(DateTime timestamp, Schema schema, LogicalType type) {
+      return timestamp.getMillis();
+    }
+  }
+
+  public static class TimestampMicrosConversion extends Conversion<DateTime> {

Review Comment:
   Looks that joda [DateTime](https://www.joda.org/joda-time/apidocs/org/joda/time/DateTime.html) is precise to the `ms` only:
   > it represents an exact point on the time-line, but limited to the precision of milliseconds.
   
   The `toLong` is encapsulated in the `LossyTimestampMicrosConversion`.
   It is not added by default by the `AvroUtils.addLogicalTypeConversions` to the `ReflectData`. Writing a logical `timestamp-micros` field with a `org.joda.time.DateTime` will fail, reading will truncate to the millisecond precision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1214471057


##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -63,67 +74,91 @@ dependencies {
   testImplementation library.java.junit
   testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
   testRuntimeOnly library.java.slf4j_jdk14
-  avroVersions.each {
-    "avroVersion$it.key" "org.apache.avro:avro:$it.value"
-    "avroVersion$it.key" "org.apache.avro:avro-tools:$it.value"
+  avroVersions.each { k,v ->
+    "avroVersion$k"(project(path: ":sdks:java:core", configuration: "shadowTest")) {
+      // Exclude Avro dependencies from "core" since Avro support moved to this extension
+      exclude group: "org.apache.avro", module: "avro"
+    }
+    "avroVersion$k" library.java.junit
+    "avroVersion$k" project(path: ":runners:direct-java", configuration: "shadow")
+    "avroVersion$k" library.java.slf4j_jdk14
+    "avroVersion$k" "org.tukaani:xz:1.9" // makes as provided since 1.9
+    "avroVersion$k" library.java.zstd_jni // makes as provided since 1.9

Review Comment:
   Avro version `1.9` and above mark `xz` as optional. We can't run the compression tests otherwise.
   Also include `zstd` and run tests for avro `1.9` and above



##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -35,7 +41,12 @@ def avroVersions = [
     '1111': "1.11.1",
 ]
 
-avroVersions.each { k, v -> configurations.create("avroVersion$k") }
+avroVersions.each { k, v ->
+  configurations {
+    create("avroVersion$k").extendsFrom(implementation)

Review Comment:
   extends the implementation configuration.
   `avroVersion$k` should not extends test as we do not want to include code generated by the `1.8.2` version used in test



##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -63,67 +74,91 @@ dependencies {
   testImplementation library.java.junit
   testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
   testRuntimeOnly library.java.slf4j_jdk14
-  avroVersions.each {
-    "avroVersion$it.key" "org.apache.avro:avro:$it.value"
-    "avroVersion$it.key" "org.apache.avro:avro-tools:$it.value"
+  avroVersions.each { k,v ->
+    "avroVersion$k"(project(path: ":sdks:java:core", configuration: "shadowTest")) {
+      // Exclude Avro dependencies from "core" since Avro support moved to this extension
+      exclude group: "org.apache.avro", module: "avro"
+    }
+    "avroVersion$k" library.java.junit
+    "avroVersion$k" project(path: ":runners:direct-java", configuration: "shadow")
+    "avroVersion$k" library.java.slf4j_jdk14
+    "avroVersion$k" "org.tukaani:xz:1.9" // makes as provided since 1.9
+    "avroVersion$k" library.java.zstd_jni // makes as provided since 1.9
+    "avroVersion${k}Generate" "org.apache.avro:avro-tools:$v"
   }
 }
 
-avroVersions.each { kv ->
-  configurations."avroVersion$kv.key" {
-    resolutionStrategy {
-      force "org.apache.avro:avro:$kv.value"
+task printSourceSetInformation(){
+
+  doLast{
+    sourceSets.each { srcSet ->
+      println "["+srcSet.name+"]"
+      print "-->Source directories: "+srcSet.allJava.srcDirs+"\n"
+      print "-->Output directories: "+srcSet.output.classesDirs.files+"\n"
+      print "-->Compile classpath:\n"
+      srcSet.runtimeClasspath.files.each {
+        print "  "+it.path+"\n"
+      }
+      println ""
     }
   }
+}

Review Comment:
   This was a debug leftover. will clean



##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -63,67 +74,91 @@ dependencies {
   testImplementation library.java.junit
   testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
   testRuntimeOnly library.java.slf4j_jdk14
-  avroVersions.each {
-    "avroVersion$it.key" "org.apache.avro:avro:$it.value"
-    "avroVersion$it.key" "org.apache.avro:avro-tools:$it.value"
+  avroVersions.each { k,v ->
+    "avroVersion$k"(project(path: ":sdks:java:core", configuration: "shadowTest")) {
+      // Exclude Avro dependencies from "core" since Avro support moved to this extension
+      exclude group: "org.apache.avro", module: "avro"
+    }
+    "avroVersion$k" library.java.junit
+    "avroVersion$k" project(path: ":runners:direct-java", configuration: "shadow")
+    "avroVersion$k" library.java.slf4j_jdk14
+    "avroVersion$k" "org.tukaani:xz:1.9" // makes as provided since 1.9
+    "avroVersion$k" library.java.zstd_jni // makes as provided since 1.9
+    "avroVersion${k}Generate" "org.apache.avro:avro-tools:$v"
   }
 }
 
-avroVersions.each { kv ->
-  configurations."avroVersion$kv.key" {
-    resolutionStrategy {
-      force "org.apache.avro:avro:$kv.value"
+task printSourceSetInformation(){
+
+  doLast{
+    sourceSets.each { srcSet ->
+      println "["+srcSet.name+"]"
+      print "-->Source directories: "+srcSet.allJava.srcDirs+"\n"
+      print "-->Output directories: "+srcSet.output.classesDirs.files+"\n"
+      print "-->Compile classpath:\n"
+      srcSet.runtimeClasspath.files.each {
+        print "  "+it.path+"\n"
+      }
+      println ""
     }
   }
+}
+
+avroVersions.each { k, v ->
+  configurations."avroVersion$k" {
+    resolutionStrategy.force "org.apache.avro:avro:$v", "org.apache.avro:avro:$v:tests", library.java.joda_time

Review Comment:
   Appending `joda_time` too as `force` is transitive and downgrades the lib when using `1.9`



##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -63,67 +74,91 @@ dependencies {
   testImplementation library.java.junit
   testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
   testRuntimeOnly library.java.slf4j_jdk14
-  avroVersions.each {
-    "avroVersion$it.key" "org.apache.avro:avro:$it.value"
-    "avroVersion$it.key" "org.apache.avro:avro-tools:$it.value"
+  avroVersions.each { k,v ->
+    "avroVersion$k"(project(path: ":sdks:java:core", configuration: "shadowTest")) {
+      // Exclude Avro dependencies from "core" since Avro support moved to this extension
+      exclude group: "org.apache.avro", module: "avro"
+    }
+    "avroVersion$k" library.java.junit
+    "avroVersion$k" project(path: ":runners:direct-java", configuration: "shadow")
+    "avroVersion$k" library.java.slf4j_jdk14
+    "avroVersion$k" "org.tukaani:xz:1.9" // makes as provided since 1.9
+    "avroVersion$k" library.java.zstd_jni // makes as provided since 1.9
+    "avroVersion${k}Generate" "org.apache.avro:avro-tools:$v"
   }
 }
 
-avroVersions.each { kv ->
-  configurations."avroVersion$kv.key" {
-    resolutionStrategy {
-      force "org.apache.avro:avro:$kv.value"
+task printSourceSetInformation(){
+
+  doLast{
+    sourceSets.each { srcSet ->
+      println "["+srcSet.name+"]"
+      print "-->Source directories: "+srcSet.allJava.srcDirs+"\n"
+      print "-->Output directories: "+srcSet.output.classesDirs.files+"\n"
+      print "-->Compile classpath:\n"
+      srcSet.runtimeClasspath.files.each {
+        print "  "+it.path+"\n"
+      }
+      println ""
     }
   }
+}
+
+avroVersions.each { k, v ->
+  configurations."avroVersion$k" {
+    resolutionStrategy.force "org.apache.avro:avro:$v", "org.apache.avro:avro:$v:tests", library.java.joda_time
+  }
 
   sourceSets {
-    "avro${kv.key}" {
+    "avro$k" {
       java {
-        srcDirs "build/generated/sources/avro${kv.key}/test/java"
+        srcDirs "src/test/java", "build/generated/sources/avro$k/test/java"

Review Comment:
   Recompile tests with generated coder



##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -35,7 +41,12 @@ def avroVersions = [
     '1111': "1.11.1",
 ]
 
-avroVersions.each { k, v -> configurations.create("avroVersion$k") }
+avroVersions.each { k, v ->

Review Comment:
   Related to this comment: https://github.com/apache/beam/pull/26320#discussion_r1214111539. Will put more detailed comments.



##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -63,67 +74,91 @@ dependencies {
   testImplementation library.java.junit
   testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
   testRuntimeOnly library.java.slf4j_jdk14
-  avroVersions.each {
-    "avroVersion$it.key" "org.apache.avro:avro:$it.value"
-    "avroVersion$it.key" "org.apache.avro:avro-tools:$it.value"
+  avroVersions.each { k,v ->
+    "avroVersion$k"(project(path: ":sdks:java:core", configuration: "shadowTest")) {
+      // Exclude Avro dependencies from "core" since Avro support moved to this extension
+      exclude group: "org.apache.avro", module: "avro"
+    }
+    "avroVersion$k" library.java.junit
+    "avroVersion$k" project(path: ":runners:direct-java", configuration: "shadow")
+    "avroVersion$k" library.java.slf4j_jdk14

Review Comment:
   we have to re-declare test deps for the `avroVersion$k` configuration



##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -35,7 +41,12 @@ def avroVersions = [
     '1111': "1.11.1",
 ]
 
-avroVersions.each { k, v -> configurations.create("avroVersion$k") }
+avroVersions.each { k, v ->
+  configurations {
+    create("avroVersion$k").extendsFrom(implementation)
+    create("avroVersion${k}Generate")

Review Comment:
   Create a config for the `"generateAvroClasses$k"` (`avro-tools` and dependencies `avro-compiler` are not required for the tests)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1559169317

   Reminder, please take a look at this pr: @robertwb @pabloem 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1559507341

   I'm looking on this one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1579086985

   @RustedBones Many thanks for addressing the comments, it's almost LGTM. I'd just prefer to have a feedback from Google people about Dataflow changes before merging.
   
   Also, it'd be helpful to update CHANGES.md on this changes to make them more visible in Beam release announcement. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1536170181

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb for label java.
   R: @pabloem for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1211894972


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -306,41 +357,34 @@ public ReflectData get() {
   private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer;
   private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader;
 
-  // Lazily re-instantiated after deserialization
-  private final Supplier<ReflectData> reflectData;
-
   protected AvroCoder(Class<T> type, Schema schema) {
-    this(type, schema, false);
+    this(type, schema, true);

Review Comment:
   Good catch. I did not realize this one wasn't in sync with the `of` factories. Will keep previous behavior for the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "reuvenlax (via GitHub)" <gi...@apache.org>.
reuvenlax commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1599216766

   Can we test to see whether this breaks update compatibility for Dataflow? A
   lot of jobs use AvroCoder, so we should not be breaking compatibility here
   unless there is a very strong reason.
   
   On Tue, Jun 20, 2023 at 12:30 AM Michel Davit ***@***.***>
   wrote:
   
   > I know the issue:
   > The Conversions.DecimalConversion must be removed from the ReflectData
   > used in the ReflectDatumFactory.
   > This conversions implies to have scale and precision set in the schema,
   > which can't be inferred by reflection.
   > ReflectData <https://avro.apache.org/docs/1.11.1/api/java/> by default
   > supports BigDecimal, converting it to String with the java-class property:
   >
   > {"type": "string", "java-class": "java.math.BigDecimal"}
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/beam/pull/26320#issuecomment-1598263156>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AFAYJVKNSIKFARRSZI4YNCDXMFGQ3ANCNFSM6AAAAAAXCM4H3A>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1563020535

   CC: @mosche 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1579087724

   Run PostCommit_Java_Avro_Versions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1529645397

   Reminder, please take a look at this pr: @kennknowles @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1563117367

   Thanks a lot for this @RustedBones!
   And thanks for the ping @aromanenko-dev, totally missed this.
   
   As @aromanenko-dev already pointed out, backwards compatibility is a concern here. Most of my initial comments are on that. The thing I'm mostly worried about is that this seems to change the behavior by removing the default logical conversion for Joda `DateTime`. With Beam still relying heavily on Joda, this will break users badly unless I misunderstand.
   
   Also note, looks like some tests are failing:
   ```
   gradle :sdks:java:extensions:avro:avroVersionsTest
   ```
   
   Also make sure to rebase onto master, `@Experimental` was recently removed from Beam... 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1546636422

   Reminder, please take a look at this pr: @robertwb @pabloem 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1207010786


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -768,53 +695,22 @@ private static Field getField(Class<?> originalClazz, String name) {
     }
   }
 
+  /**
+   * @return {@code true} if the two {@link AvroCoder} instances have the same class, type and
+   *     schema.
+   */
   @Override
   public boolean equals(@Nullable Object other) {
-    if (other == this) {
-      return true;
-    }
-    if (!(other instanceof AvroCoder)) {
+    if (other == null || this.getClass() != other.getClass()) {
       return false;
     }
     AvroCoder<?> that = (AvroCoder<?>) other;
     return Objects.equals(this.schemaSupplier.get(), that.schemaSupplier.get())
-        && Objects.equals(this.typeDescriptor, that.typeDescriptor)
-        && this.useReflectApi == that.useReflectApi;
+        && Objects.equals(this.typeDescriptor, that.typeDescriptor);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(schemaSupplier.get(), typeDescriptor, useReflectApi);
-  }
-
-  /**
-   * Conversion for DateTime.
-   *
-   * <p>This is a copy from Avro 1.8's TimestampConversion, which is renamed in Avro 1.9. Defining
-   * own copy gives flexibility for Beam Java SDK to work with Avro 1.8 and 1.9 at runtime.
-   *
-   * @see <a href="https://issues.apache.org/jira/browse/BEAM-9144">BEAM-9144: Beam's own Avro
-   *     TimeConversion class in beam-sdk-java-core</a>
-   */
-  public static class JodaTimestampConversion extends Conversion<DateTime> {

Review Comment:
   I added all logical type conversions to the `ReflectDatumFacory`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1548172327

   @RustedBones Sorry for delay, just got back from vacation. 
   I'll try to take a look asap.
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1563135823

   Run PostCommit_Java_Avro_Versions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1512884822

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   R: @chamikaramj for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1219719604


##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -63,67 +74,91 @@ dependencies {
   testImplementation library.java.junit
   testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
   testRuntimeOnly library.java.slf4j_jdk14
-  avroVersions.each {
-    "avroVersion$it.key" "org.apache.avro:avro:$it.value"
-    "avroVersion$it.key" "org.apache.avro:avro-tools:$it.value"
+  avroVersions.each { k,v ->
+    "avroVersion$k"(project(path: ":sdks:java:core", configuration: "shadowTest")) {
+      // Exclude Avro dependencies from "core" since Avro support moved to this extension
+      exclude group: "org.apache.avro", module: "avro"
+    }
+    "avroVersion$k" library.java.junit
+    "avroVersion$k" project(path: ":runners:direct-java", configuration: "shadow")
+    "avroVersion$k" library.java.slf4j_jdk14
+    "avroVersion$k" "org.tukaani:xz:1.9" // makes as provided since 1.9
+    "avroVersion$k" library.java.zstd_jni // makes as provided since 1.9
+    "avroVersion${k}Generate" "org.apache.avro:avro-tools:$v"
   }
 }
 
-avroVersions.each { kv ->
-  configurations."avroVersion$kv.key" {
-    resolutionStrategy {
-      force "org.apache.avro:avro:$kv.value"
+task printSourceSetInformation(){
+
+  doLast{
+    sourceSets.each { srcSet ->
+      println "["+srcSet.name+"]"
+      print "-->Source directories: "+srcSet.allJava.srcDirs+"\n"
+      print "-->Output directories: "+srcSet.output.classesDirs.files+"\n"
+      print "-->Compile classpath:\n"
+      srcSet.runtimeClasspath.files.each {
+        print "  "+it.path+"\n"
+      }
+      println ""
     }
   }
+}
+
+avroVersions.each { k, v ->
+  configurations."avroVersion$k" {
+    resolutionStrategy.force "org.apache.avro:avro:$v", "org.apache.avro:avro:$v:tests", library.java.joda_time
+  }
 
   sourceSets {
-    "avro${kv.key}" {
+    "avro$k" {
       java {
-        srcDirs "build/generated/sources/avro${kv.key}/test/java"
+        srcDirs "src/test/java", "build/generated/sources/avro$k/test/java"
       }
 
-      compileClasspath = configurations."avroVersion$kv.key" + sourceSets.test.output + sourceSets.test.compileClasspath
-      runtimeClasspath += compileClasspath + sourceSets.test.runtimeClasspath

Review Comment:
   Thanks a lot, that's a great improvement 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1587355614

   Run PostCommit_Java_Avro_Versions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1209384283


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -116,56 +106,40 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-public class AvroCoder<T> extends CustomCoder<T> {
+public abstract class AvroCoder<T> extends CustomCoder<T> {

Review Comment:
   Yes but making it `abstract` is a breaking change. Also, iinm, it doesn't introduce any abstract methods - so I don't see the reasons to make it abstract.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1569740477

   @aromanenko-dev & @mosche thanks for all the review comments and sorry for all the back and forth. I've not been very mindful of the breaking changes.
   The PR is now in a better state. CI indicates a failure on `windows-latest` but I think this is a flaky one.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1207014663


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroReflectCoder.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.coders;
+
+import org.apache.avro.Schema;
+import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.Union;
+import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
+
+/**
+ * AvroCoder specialisation for avro classes using Java reflection.
+ *
+ * <p>Only concrete classes with a no-argument constructor can be mapped to Avro records. All
+ * inherited fields that are not static or transient are included. Fields are not permitted to be
+ * null unless annotated by {@link Nullable} or a {@link Union} schema containing {@code "null"}.
+ */
+public class AvroReflectCoder<T> extends AvroCoder<T> {

Review Comment:
   IDK how this works with the translation to `CloudObject` then. Coder created with a custom factory would be de-serialized with the default one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1580248473

   Fix #18874


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1584296762

   @RustedBones Please, use `git rebase` instead of `merge` to up-to-date your feature branch with master in the future. it should not be a problem for this PR since anyway we have to squash all commits before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1214107843


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -702,6 +739,16 @@ public Read<T> withBeamSchemas(boolean withBeamSchemas) {
       return toBuilder().setInferBeamSchema(withBeamSchemas).build();
     }
 
+    /** Sets a coder for the result of the read function. */
+    public Read<T> withCoder(Coder<T> coder) {

Review Comment:
   As a quick win, I've updated the code to leverage the case when an `AvroDatumFactory` is passed as reader factory.
   I think as the other `AvroIO` already have a `withCoder` adding one here just increase consistency, even if discouraged.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1563148767

   A couple of more points for clarification / my own understanding:
   
   > [AVRO-1891](https://issues.apache.org/jira/browse/AVRO-1891) is fixed by adding the conversion at compile time in the generated class' model. Beam does not currently read the SpecificData form the class.
   
   This isn't addressed by this PR, right?
   
   > [BEAM-8329](https://issues.apache.org/jira/browse/BEAM-8329) ReflectData is not supposed to use the logical conversions by default. Users should either use a CustomEncoding with the AvroEncode annotation or manually add the conversion to their own ReflectData which is not possible everywhere in beam.
   
   As commented above, I don't see a way to remove the default logical conversion again without putting users in a bad spot.
   I think we have to keep as is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1205583401


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -116,56 +106,40 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-public class AvroCoder<T> extends CustomCoder<T> {
+public abstract class AvroCoder<T> extends CustomCoder<T> {
 
   /**
    * Returns an {@code AvroCoder} instance for the provided element type.
    *
    * @param <T> the element type
    */
   public static <T> AvroCoder<T> of(TypeDescriptor<T> type) {
-    return of(type, true);
-  }
-
-  /**
-   * Returns an {@code AvroCoder} instance for the provided element type, respecting whether to use
-   * Avro's Reflect* or Specific* suite for encoding and decoding.
-   *
-   * @param <T> the element type
-   */
-  public static <T> AvroCoder<T> of(TypeDescriptor<T> type, boolean useReflectApi) {
     @SuppressWarnings("unchecked")
     Class<T> clazz = (Class<T>) type.getRawType();
-    return of(clazz, useReflectApi);
-  }
-
-  /**
-   * Returns an {@code AvroCoder} instance for the provided element class.
-   *
-   * @param <T> the element type
-   */
-  public static <T> AvroCoder<T> of(Class<T> clazz) {
-    return of(clazz, true);
+    return of(clazz);
   }
 
   /**
    * Returns an {@code AvroGenericCoder} instance for the Avro schema. The implicit type is
    * GenericRecord.
    */
-  public static AvroGenericCoder of(Schema schema) {
+  public static AvroCoder<GenericRecord> of(Schema schema) {
     return AvroGenericCoder.of(schema);
   }
 
   /**
-   * Returns an {@code AvroCoder} instance for the given class, respecting whether to use Avro's
-   * Reflect* or Specific* suite for encoding and decoding.
+   * Returns an {@code AvroCoder} instance for the provided element class.
    *
    * @param <T> the element type
    */
-  public static <T> AvroCoder<T> of(Class<T> type, boolean useReflectApi) {

Review Comment:
   Same as above, please keep this for backwards compatibility.



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -116,56 +106,40 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-public class AvroCoder<T> extends CustomCoder<T> {
+public abstract class AvroCoder<T> extends CustomCoder<T> {
 
   /**
    * Returns an {@code AvroCoder} instance for the provided element type.
    *
    * @param <T> the element type
    */
   public static <T> AvroCoder<T> of(TypeDescriptor<T> type) {
-    return of(type, true);
-  }
-
-  /**
-   * Returns an {@code AvroCoder} instance for the provided element type, respecting whether to use
-   * Avro's Reflect* or Specific* suite for encoding and decoding.
-   *
-   * @param <T> the element type
-   */
-  public static <T> AvroCoder<T> of(TypeDescriptor<T> type, boolean useReflectApi) {

Review Comment:
   Please keep the API backwards compatible. If there's reasons not to use this factory method any longer, please deprecate it and explain what to use instead.



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -116,56 +106,40 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-public class AvroCoder<T> extends CustomCoder<T> {
+public abstract class AvroCoder<T> extends CustomCoder<T> {
 
   /**
    * Returns an {@code AvroCoder} instance for the provided element type.
    *
    * @param <T> the element type
    */
   public static <T> AvroCoder<T> of(TypeDescriptor<T> type) {
-    return of(type, true);
-  }
-
-  /**
-   * Returns an {@code AvroCoder} instance for the provided element type, respecting whether to use
-   * Avro's Reflect* or Specific* suite for encoding and decoding.
-   *
-   * @param <T> the element type
-   */
-  public static <T> AvroCoder<T> of(TypeDescriptor<T> type, boolean useReflectApi) {
     @SuppressWarnings("unchecked")
     Class<T> clazz = (Class<T>) type.getRawType();
-    return of(clazz, useReflectApi);
-  }
-
-  /**
-   * Returns an {@code AvroCoder} instance for the provided element class.
-   *
-   * @param <T> the element type
-   */
-  public static <T> AvroCoder<T> of(Class<T> clazz) {
-    return of(clazz, true);
+    return of(clazz);
   }
 
   /**
    * Returns an {@code AvroGenericCoder} instance for the Avro schema. The implicit type is
    * GenericRecord.
    */
-  public static AvroGenericCoder of(Schema schema) {
+  public static AvroCoder<GenericRecord> of(Schema schema) {

Review Comment:
   Please keep the return type unchanged to maintain backwards compatibility.



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -177,17 +151,13 @@ public static <T> AvroCoder<T> of(Class<T> type, boolean useReflectApi) {
    * @param <T> the element type
    */
   public static <T> AvroCoder<T> of(Class<T> type, Schema schema) {
-    return of(type, schema, true);
-  }
-
-  /**
-   * Returns an {@code AvroCoder} instance for the given class and schema, respecting whether to use
-   * Avro's Reflect* or Specific* suite for encoding and decoding.
-   *
-   * @param <T> the element type
-   */
-  public static <T> AvroCoder<T> of(Class<T> type, Schema schema, boolean useReflectApi) {

Review Comment:
   Same as above, please keep this for backwards compatibility.



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -365,10 +296,6 @@ public Class<T> getType() {
     return type;
   }
 
-  public boolean useReflectApi() {

Review Comment:
   Please deprecate instead



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -768,53 +695,22 @@ private static Field getField(Class<?> originalClazz, String name) {
     }
   }
 
+  /**
+   * @return {@code true} if the two {@link AvroCoder} instances have the same class, type and
+   *     schema.
+   */
   @Override
   public boolean equals(@Nullable Object other) {

Review Comment:
   Please also test `datumFactory` here (and implement their equals/hashCode accordingly), same applies to hashCode of course.



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -768,53 +695,22 @@ private static Field getField(Class<?> originalClazz, String name) {
     }
   }
 
+  /**
+   * @return {@code true} if the two {@link AvroCoder} instances have the same class, type and
+   *     schema.
+   */
   @Override
   public boolean equals(@Nullable Object other) {
-    if (other == this) {
-      return true;
-    }
-    if (!(other instanceof AvroCoder)) {
+    if (other == null || this.getClass() != other.getClass()) {
       return false;
     }
     AvroCoder<?> that = (AvroCoder<?>) other;
     return Objects.equals(this.schemaSupplier.get(), that.schemaSupplier.get())
-        && Objects.equals(this.typeDescriptor, that.typeDescriptor)
-        && this.useReflectApi == that.useReflectApi;
+        && Objects.equals(this.typeDescriptor, that.typeDescriptor);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(schemaSupplier.get(), typeDescriptor, useReflectApi);
-  }
-
-  /**
-   * Conversion for DateTime.
-   *
-   * <p>This is a copy from Avro 1.8's TimestampConversion, which is renamed in Avro 1.9. Defining
-   * own copy gives flexibility for Beam Java SDK to work with Avro 1.8 and 1.9 at runtime.
-   *
-   * @see <a href="https://issues.apache.org/jira/browse/BEAM-9144">BEAM-9144: Beam's own Avro
-   *     TimeConversion class in beam-sdk-java-core</a>
-   */
-  public static class JodaTimestampConversion extends Conversion<DateTime> {

Review Comment:
   This is critical as it breaks users using Joda types in their Pojos. To keep the behavior consistent this logical conversion has to be added to the reflect coder, if not to all three Avro coder / datum factory types (still wondering myself).



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -306,41 +256,27 @@ public ReflectData get() {
   private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer;
   private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader;
 
-  // Lazily re-instantiated after deserialization
-  private final Supplier<ReflectData> reflectData;
-
-  protected AvroCoder(Class<T> type, Schema schema) {
-    this(type, schema, false);
-  }
-
-  protected AvroCoder(Class<T> type, Schema schema, boolean useReflectApi) {
+  protected AvroCoder(Class<T> type, AvroDatumFactory<T> datumFactory, Schema schema) {

Review Comment:
   The old constructor was accessible by subclasses, so it would be better not to remove it:
   ```java
     protected AvroCoder(Class<T> type, Schema schema, boolean useReflectApi) {
       this(type, useReflectApi ? AvroDatumFactory.ReflectDatumFactory.of(type) : AvroDatumFactory.SpecificDatumFactory.of(type), schema);
     }
     ```



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroReflectCoder.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.coders;
+
+import org.apache.avro.Schema;
+import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.Union;
+import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
+
+/**
+ * AvroCoder specialisation for avro classes using Java reflection.
+ *
+ * <p>Only concrete classes with a no-argument constructor can be mapped to Avro records. All
+ * inherited fields that are not static or transient are included. Fields are not permitted to be
+ * null unless annotated by {@link Nullable} or a {@link Union} schema containing {@code "null"}.
+ */
+public class AvroReflectCoder<T> extends AvroCoder<T> {

Review Comment:
   Why subclassing AvroCoder if the only difference is datum factory? I'd suggest to better keep the old API (using the `useReflectApi` flag) to not increase the public surface for no obvious gain. Same for AvroSpecificCoder...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1211629393


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java:
##########
@@ -20,38 +20,41 @@
 import org.apache.avro.Schema;
 import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.StringUtils;
 
 /** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
 @SuppressWarnings({
   "rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
 })
 class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder> {
-  private static final String TYPE_FIELD = "type";
+  private static final String DATUM_FACTORY_FIELD = "datum_factory";
   private static final String SCHEMA_FIELD = "schema";
-  private static final String REFLECT_API_FIELD = "reflect_api";
 
   @Override
   public CloudObject toCloudObject(AvroCoder target, SdkComponents sdkComponents) {
     CloudObject base = CloudObject.forClass(AvroCoder.class);
+    byte[] serializedDatumFactory =
+        SerializableUtils.serializeToByteArray(target.getDatumFactory());
+    Structs.addString(
+        base, DATUM_FACTORY_FIELD, StringUtils.byteArrayToJsonString(serializedDatumFactory));
     Structs.addString(base, SCHEMA_FIELD, target.getSchema().toString());
-    Structs.addString(base, TYPE_FIELD, target.getType().getName());
-    Structs.addBoolean(base, REFLECT_API_FIELD, target.useReflectApi());
     return base;
   }
 
   @Override
   public AvroCoder<?> fromCloudObject(CloudObject cloudObject) {

Review Comment:
   Not sure how much of an issue this really is, I'm not too familiar with Dataflow. I suspect changing the CloudObject payload will be problematic with Dataflow streaming pipelines if upgrading Beam and this is persisted somewhere. 
   It might be necessary to support constructing the AvroCoder from a CloudObject created with a previous version of Beam.
   @kennknowles could you answer this or ping someone who's in the position to answer?



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -702,6 +739,16 @@ public Read<T> withBeamSchemas(boolean withBeamSchemas) {
       return toBuilder().setInferBeamSchema(withBeamSchemas).build();
     }
 
+    /** Sets a coder for the result of the read function. */
+    public Read<T> withCoder(Coder<T> coder) {

Review Comment:
   Just fyi, usage of `withCoder` is [discouraged ](https://beam.apache.org/documentation/io/io-standards/#classes--methods--properties) based on some fairly recent discussions on IO standards.
   
   > Strongly Discouraged
   >
   > Sets the coder to use to encode/decode the element type of the output / input PCollection of this connector. In general, it is recommended that sources will:
   >
   > 1. Return Row objects with a schema that is automatically inferred.
   > 2. Automatically set the necessary coder by having fixed output/input types, or inferring their output/input types.
   >
   > If neither 1 and 2 are possible, then a withCoder(...) method can be added.
   
   I suppose you'd like to have a way of creating a custom coder from your own AvroDatumFactory? What if Read could be configured with an AvroDatumFactory instead of the DatumReaderFactory?



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -768,53 +695,22 @@ private static Field getField(Class<?> originalClazz, String name) {
     }
   }
 
+  /**
+   * @return {@code true} if the two {@link AvroCoder} instances have the same class, type and
+   *     schema.
+   */
   @Override
   public boolean equals(@Nullable Object other) {
-    if (other == this) {
-      return true;
-    }
-    if (!(other instanceof AvroCoder)) {
+    if (other == null || this.getClass() != other.getClass()) {
       return false;
     }
     AvroCoder<?> that = (AvroCoder<?>) other;
     return Objects.equals(this.schemaSupplier.get(), that.schemaSupplier.get())
-        && Objects.equals(this.typeDescriptor, that.typeDescriptor)
-        && this.useReflectApi == that.useReflectApi;
+        && Objects.equals(this.typeDescriptor, that.typeDescriptor);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(schemaSupplier.get(), typeDescriptor, useReflectApi);
-  }
-
-  /**
-   * Conversion for DateTime.
-   *
-   * <p>This is a copy from Avro 1.8's TimestampConversion, which is renamed in Avro 1.9. Defining
-   * own copy gives flexibility for Beam Java SDK to work with Avro 1.8 and 1.9 at runtime.
-   *
-   * @see <a href="https://issues.apache.org/jira/browse/BEAM-9144">BEAM-9144: Beam's own Avro
-   *     TimeConversion class in beam-sdk-java-core</a>
-   */
-  public static class JodaTimestampConversion extends Conversion<DateTime> {

Review Comment:
   Thanks, this looks great!



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -306,41 +357,34 @@ public ReflectData get() {
   private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer;
   private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader;
 
-  // Lazily re-instantiated after deserialization
-  private final Supplier<ReflectData> reflectData;
-
   protected AvroCoder(Class<T> type, Schema schema) {
-    this(type, schema, false);
+    this(type, schema, true);

Review Comment:
   Is this swapped intentionally? This could silently change the datum factory for SpecificRecord, right?



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java:
##########
@@ -161,6 +152,25 @@ public class AvroUtils {
       new ForLoadedType(ReadableInstant.class);
   private static final ForLoadedType JODA_INSTANT = new ForLoadedType(Instant.class);
 
+  public static void addLogicalTypeConversions(final GenericData data) {

Review Comment:
   Wondering, should these conversions all be singletons?



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroDatumFactory.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import java.util.Objects;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Create {@link DatumReader} and {@link DatumWriter} for given schemas. */
+public abstract class AvroDatumFactory<T>
+    implements AvroSource.DatumReaderFactory<T>, AvroSink.DatumWriterFactory<T> {
+
+  protected final Class<T> type;
+
+  public AvroDatumFactory(Class<T> type) {
+    this.type = type;
+  }
+
+  @Override
+  public boolean equals(@Nullable Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+    AvroDatumFactory<?> that = (AvroDatumFactory<?>) other;
+    return Objects.equals(type, that.type);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getClass(), type);
+  }
+
+  /** Specialized {@link AvroDatumFactory} for {@link GenericRecord}. */
+  public static class GenericDatumFactory extends AvroDatumFactory<GenericRecord> {
+
+    public static final GenericDatumFactory INSTANCE = new GenericDatumFactory();
+
+    public GenericDatumFactory() {
+      super(GenericRecord.class);
+    }
+
+    @Override
+    public DatumReader<GenericRecord> apply(Schema writer, Schema reader) {
+      return new GenericDatumReader<>(writer, reader);
+    }
+
+    @Override
+    public DatumWriter<GenericRecord> apply(Schema writer) {
+      return new GenericDatumWriter<>(writer);
+    }
+  }
+
+  /** Specialized {@link AvroDatumFactory} for {@link org.apache.avro.specific.SpecificRecord}. */
+  public static class SpecificDatumFactory<T> extends AvroDatumFactory<T> {
+    SpecificDatumFactory(Class<T> type) {
+      super(type);
+    }
+
+    @Override
+    public DatumReader<T> apply(Schema writer, Schema reader) {
+      // create the datum writer using the Class<T> api.
+      // avro will load the proper class loader and when using avro 1.9
+      // the proper data with conversions (SpecificData.getForClass)
+      SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(this.type);
+      datumReader.setExpected(reader);
+      datumReader.setSchema(writer);

Review Comment:
   Thanks for the clarification @RustedBones 👍 Would it make sense to add a test case to verify the behavior?



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -839,21 +894,29 @@ public ReadFiles<T> withBeamSchemas(boolean withBeamSchemas) {
       return toBuilder().setInferBeamSchema(withBeamSchemas).build();
     }
 
+    /** Sets a coder for the result of the read function. */
+    public ReadFiles<T> withCoder(Coder<T> coder) {

Review Comment:
   Same as above, wondering if there's alternatives



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroJodaTimeConversions.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.schemas.utils;
+
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * Avro 1.8 & 1.9 ship joda time conversions.
+ *
+ * <p>Since avro 1.10, only java time conversions are included. As beam is still joda time based,
+ * and user may work with avro joda time generated classes, Provide joda time logical conversions.
+ *
+ * <p>This code is copied from avro 1.8.2 TimeConversions.
+ */
+public class AvroJodaTimeConversions {
+
+  public static class DateConversion extends Conversion<LocalDate> {
+    private static final LocalDate EPOCH_DATE = new LocalDate(1970, 1, 1);
+
+    @Override
+    public Class<LocalDate> getConvertedType() {
+      return LocalDate.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "date";
+    }
+
+    @Override
+    public LocalDate fromInt(Integer daysFromEpoch, Schema schema, LogicalType type) {
+      return EPOCH_DATE.plusDays(daysFromEpoch);
+    }
+
+    @Override
+    public Integer toInt(LocalDate date, Schema schema, LogicalType type) {
+      return Days.daysBetween(EPOCH_DATE, date).getDays();
+    }
+  }
+
+  public static class TimeConversion extends Conversion<LocalTime> {
+    @Override
+    public Class<LocalTime> getConvertedType() {
+      return LocalTime.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "time-millis";
+    }
+
+    @Override
+    public LocalTime fromInt(Integer millisFromMidnight, Schema schema, LogicalType type) {
+      return LocalTime.fromMillisOfDay(millisFromMidnight);
+    }
+
+    @Override
+    public Integer toInt(LocalTime time, Schema schema, LogicalType type) {
+      return time.millisOfDay().get();
+    }
+  }
+
+  public static class TimeMicrosConversion extends Conversion<LocalTime> {
+    @Override
+    public Class<LocalTime> getConvertedType() {
+      return LocalTime.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "time-micros";
+    }
+
+    @Override
+    public LocalTime fromLong(Long microsFromMidnight, Schema schema, LogicalType type) {
+      return LocalTime.fromMillisOfDay(microsFromMidnight / 1000);
+    }
+  }
+
+  public static class LossyTimeMicrosConversion extends TimeMicrosConversion {
+    @Override
+    public Long toLong(LocalTime time, Schema schema, LogicalType type) {
+      return 1000 * (long) time.millisOfDay().get();
+    }
+  }
+
+  public static class TimestampConversion extends Conversion<DateTime> {
+    @Override
+    public Class<DateTime> getConvertedType() {
+      return DateTime.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "timestamp-millis";
+    }
+
+    @Override
+    public DateTime fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
+      return new DateTime(millisFromEpoch, DateTimeZone.UTC);
+    }
+
+    @Override
+    public Long toLong(DateTime timestamp, Schema schema, LogicalType type) {
+      return timestamp.getMillis();
+    }
+  }
+
+  public static class TimestampMicrosConversion extends Conversion<DateTime> {

Review Comment:
   I know this matches the conversions in Avro 1.8.2., but it seems strange that fromLong takes micros and toLong produces millis ... Are you aware of any particular reason for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1207010786


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -768,53 +695,22 @@ private static Field getField(Class<?> originalClazz, String name) {
     }
   }
 
+  /**
+   * @return {@code true} if the two {@link AvroCoder} instances have the same class, type and
+   *     schema.
+   */
   @Override
   public boolean equals(@Nullable Object other) {
-    if (other == this) {
-      return true;
-    }
-    if (!(other instanceof AvroCoder)) {
+    if (other == null || this.getClass() != other.getClass()) {
       return false;
     }
     AvroCoder<?> that = (AvroCoder<?>) other;
     return Objects.equals(this.schemaSupplier.get(), that.schemaSupplier.get())
-        && Objects.equals(this.typeDescriptor, that.typeDescriptor)
-        && this.useReflectApi == that.useReflectApi;
+        && Objects.equals(this.typeDescriptor, that.typeDescriptor);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(schemaSupplier.get(), typeDescriptor, useReflectApi);
-  }
-
-  /**
-   * Conversion for DateTime.
-   *
-   * <p>This is a copy from Avro 1.8's TimestampConversion, which is renamed in Avro 1.9. Defining
-   * own copy gives flexibility for Beam Java SDK to work with Avro 1.8 and 1.9 at runtime.
-   *
-   * @see <a href="https://issues.apache.org/jira/browse/BEAM-9144">BEAM-9144: Beam's own Avro
-   *     TimeConversion class in beam-sdk-java-core</a>
-   */
-  public static class JodaTimestampConversion extends Conversion<DateTime> {

Review Comment:
   I added all logical type conversions to the `ReflectDatumFacory`.
   I think we should not add the conversions to the `SpecificData`: Avro 1.9+ adds the conversions by default in the generated model.
   In the special case where users are working with schemas generated with avro 1.9 and using avro 1.10+ at runtime, they should define a custom `AvroDatumFactory` with overridden joda conversions (since the joda one won't be in the classpath)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1588113366

   @Abacn Could you take a look on Dataflow runner changes in this PR, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1567235784

   Run PostCommit_Java_Avro_Versions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1207017453


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroDatumFactory.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.io;
+
+import java.util.Objects;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Create {@link DatumReader} and {@link DatumWriter} for given schemas. */
+public abstract class AvroDatumFactory<T>
+    implements AvroSource.DatumReaderFactory<T>, AvroSink.DatumWriterFactory<T> {
+
+  protected final Class<T> type;
+
+  public AvroDatumFactory(Class<T> type) {
+    this.type = type;
+  }
+
+  @Override
+  public boolean equals(@Nullable Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+    AvroDatumFactory<?> that = (AvroDatumFactory<?>) other;
+    return Objects.equals(type, that.type);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getClass(), type);
+  }
+
+  /** Specialized {@link AvroDatumFactory} for {@link GenericRecord}. */
+  public static class GenericDatumFactory extends AvroDatumFactory<GenericRecord> {
+
+    public static final GenericDatumFactory INSTANCE = new GenericDatumFactory();
+
+    public GenericDatumFactory() {
+      super(GenericRecord.class);
+    }
+
+    @Override
+    public DatumReader<GenericRecord> apply(Schema writer, Schema reader) {
+      return new GenericDatumReader<>(writer, reader);
+    }
+
+    @Override
+    public DatumWriter<GenericRecord> apply(Schema writer) {
+      return new GenericDatumWriter<>(writer);
+    }
+  }
+
+  /** Specialized {@link AvroDatumFactory} for {@link org.apache.avro.specific.SpecificRecord}. */
+  public static class SpecificDatumFactory<T> extends AvroDatumFactory<T> {
+    SpecificDatumFactory(Class<T> type) {
+      super(type);
+    }
+
+    @Override
+    public DatumReader<T> apply(Schema writer, Schema reader) {
+      // create the datum writer using the Class<T> api.
+      // avro will load the proper class loader and when using avro 1.9
+      // the proper data with conversions (SpecificData.getForClass)
+      SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(this.type);
+      datumReader.setExpected(reader);
+      datumReader.setSchema(writer);

Review Comment:
   @mosche this is the fix for [AVRO-1891](https://issues.apache.org/jira/browse/AVRO-1891)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1598065475

   Hi, this change has possibly caused Java Example PostCommit failure:
   
   https://ci-beam.apache.org/view/PostCommit/job/beam_PostCommit_Java_Examples_Direct/1809/
   
   `org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQuerySamplesIT.testTableIO`
   
   Error Message
   org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.UnsupportedOperationException: No recommended schema for decimal (scale is required)
   
   ```
   org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.UnsupportedOperationException: No recommended schema for decimal (scale is required)
   	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:374)
   ...
   Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.UnsupportedOperationException: No recommended schema for decimal (scale is required)
   	at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
   	at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
   	at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
   	at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
   	at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
   	at org.apache.avro.reflect.ReflectDatumWriter.<init>(ReflectDatumWriter.java:47)
   	at org.apache.avro.reflect.ReflectDatumWriter.<init>(ReflectDatumWriter.java:43)
   	at org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory$ReflectDatumFactory.apply(AvroDatumFactory.java:185)
   	at org.apache.beam.sdk.extensions.avro.coders.AvroCoder$2.initialValue(AvroCoder.java:397)
   	at org.apache.beam.sdk.extensions.avro.coders.AvroCoder$2.initialValue(AvroCoder.java:392)
   at java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:195)
   	at java.lang.ThreadLocal.get(ThreadLocal.java:172)
   	at org.apache.beam.sdk.extensions.avro.coders.AvroCoder.encode(AvroCoder.java:428)
   	at org.apache.beam.sdk.coders.Coder.encode(Coder.java:132)
   	at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:86)
   	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:70)
   	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:55)
   	at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:168)
   	at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
   	at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1202493419


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -116,56 +106,40 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-public class AvroCoder<T> extends CustomCoder<T> {
+public abstract class AvroCoder<T> extends CustomCoder<T> {

Review Comment:
   Why this class became `abstract`?



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -116,56 +106,40 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-public class AvroCoder<T> extends CustomCoder<T> {
+public abstract class AvroCoder<T> extends CustomCoder<T> {
 
   /**
    * Returns an {@code AvroCoder} instance for the provided element type.
    *
    * @param <T> the element type
    */
   public static <T> AvroCoder<T> of(TypeDescriptor<T> type) {
-    return of(type, true);
-  }
-
-  /**
-   * Returns an {@code AvroCoder} instance for the provided element type, respecting whether to use
-   * Avro's Reflect* or Specific* suite for encoding and decoding.
-   *
-   * @param <T> the element type
-   */
-  public static <T> AvroCoder<T> of(TypeDescriptor<T> type, boolean useReflectApi) {

Review Comment:
   Here and in other places, I'd recommend to avoid such breaking changes in public API (delete public methods/classes) and keep and deprecate them instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1214339336


##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -35,7 +41,12 @@ def avroVersions = [
     '1111': "1.11.1",
 ]
 
-avroVersions.each { k, v -> configurations.create("avroVersion$k") }
+avroVersions.each { k, v ->

Review Comment:
   Could you elaborate the changes for Avro multiversion gradle configurations/tasks here? Why they are needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1214104454


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java:
##########
@@ -161,6 +152,25 @@ public class AvroUtils {
       new ForLoadedType(ReadableInstant.class);
   private static final ForLoadedType JODA_INSTANT = new ForLoadedType(Instant.class);
 
+  public static void addLogicalTypeConversions(final GenericData data) {

Review Comment:
   This looks do be the the default usage in avro libs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1214111539


##########
sdks/java/extensions/avro/build.gradle:
##########
@@ -63,67 +74,91 @@ dependencies {
   testImplementation library.java.junit
   testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
   testRuntimeOnly library.java.slf4j_jdk14
-  avroVersions.each {
-    "avroVersion$it.key" "org.apache.avro:avro:$it.value"
-    "avroVersion$it.key" "org.apache.avro:avro-tools:$it.value"
+  avroVersions.each { k,v ->
+    "avroVersion$k"(project(path: ":sdks:java:core", configuration: "shadowTest")) {
+      // Exclude Avro dependencies from "core" since Avro support moved to this extension
+      exclude group: "org.apache.avro", module: "avro"
+    }
+    "avroVersion$k" library.java.junit
+    "avroVersion$k" project(path: ":runners:direct-java", configuration: "shadow")
+    "avroVersion$k" library.java.slf4j_jdk14
+    "avroVersion$k" "org.tukaani:xz:1.9" // makes as provided since 1.9
+    "avroVersion$k" library.java.zstd_jni // makes as provided since 1.9
+    "avroVersion${k}Generate" "org.apache.avro:avro-tools:$v"
   }
 }
 
-avroVersions.each { kv ->
-  configurations."avroVersion$kv.key" {
-    resolutionStrategy {
-      force "org.apache.avro:avro:$kv.value"
+task printSourceSetInformation(){
+
+  doLast{
+    sourceSets.each { srcSet ->
+      println "["+srcSet.name+"]"
+      print "-->Source directories: "+srcSet.allJava.srcDirs+"\n"
+      print "-->Output directories: "+srcSet.output.classesDirs.files+"\n"
+      print "-->Compile classpath:\n"
+      srcSet.runtimeClasspath.files.each {
+        print "  "+it.path+"\n"
+      }
+      println ""
     }
   }
+}
+
+avroVersions.each { k, v ->
+  configurations."avroVersion$k" {
+    resolutionStrategy.force "org.apache.avro:avro:$v", "org.apache.avro:avro:$v:tests", library.java.joda_time
+  }
 
   sourceSets {
-    "avro${kv.key}" {
+    "avro$k" {
       java {
-        srcDirs "build/generated/sources/avro${kv.key}/test/java"
+        srcDirs "src/test/java", "build/generated/sources/avro$k/test/java"
       }
 
-      compileClasspath = configurations."avroVersion$kv.key" + sourceSets.test.output + sourceSets.test.compileClasspath
-      runtimeClasspath += compileClasspath + sourceSets.test.runtimeClasspath

Review Comment:
   I wasn't confident with this classpath. All avro test versions would contain the `1.8.2` generated model. When testing with the `Class<T>` API, we have no guarantee to load the proper class.
   Here I changed the settings so all tests are recompiled with the avro version, and their generated class only.



##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java:
##########
@@ -323,35 +314,71 @@ public void testKryoSerialization() throws Exception {
   @Test
   public void testPojoEncoding() throws Exception {
     Pojo value = new Pojo("Hello", 42, DATETIME_A);
-    AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
+    AvroCoder<Pojo> coder = AvroCoder.reflect(Pojo.class);
 
     CoderProperties.coderDecodeEncodeEqual(coder, value);
   }
 
   @Test
   public void testSpecificRecordEncoding() throws Exception {
-    if (isBrokenMapComparison()) {
-      // Don't compare the map values because of AVRO-2943
-      AVRO_SPECIFIC_RECORD.setMap(ImmutableMap.of());
-    }
-    AvroCoder<TestAvro> coder =
-        AvroCoder.of(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema(), false);
+    // Don't compare the map values because of AVRO-2943
+    AVRO_SPECIFIC_RECORD.setMap(ImmutableMap.of());
+
+    AvroCoder<TestAvro> coder = AvroCoder.specific(TestAvro.class);
+    AvroCoder<TestAvro> coderWithSchema =
+        AvroCoder.specific(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema());
 
     assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType()));
+    assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType()));
+
     CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD);
+    CoderProperties.coderDecodeEncodeEqual(coderWithSchema, AVRO_SPECIFIC_RECORD);
   }
 
-  private boolean isBrokenMapComparison() {
-    return VERSION_AVRO.equals("1.9.2")
-        || VERSION_AVRO.equals("1.10.2")
-        || VERSION_AVRO.equals("1.11.1");

Review Comment:
   All avro generated class uses `CharSequence` now.



##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java:
##########
@@ -78,13 +77,37 @@ public class AvroUtilsTest {
   private static final org.apache.avro.Schema NULL_SCHEMA =
       org.apache.avro.Schema.create(Type.NULL);
 
+  private static final String VERSION_AVRO =
+      org.apache.avro.Schema.class.getPackage().getImplementationVersion();
+
+  private Iterable<?> randomData(org.apache.avro.Schema schema, int maxLength) throws Exception {
+    Iterable<?> data;
+    if (VERSION_AVRO.equals("1.8.2")) {
+      data =
+          (Iterable<?>)
+              Class.forName("org.apache.avro.RandomData")
+                  .getDeclaredConstructor(org.apache.avro.Schema.class, Integer.TYPE)
+                  .newInstance(schema, maxLength);
+    } else {
+      data =
+          (Iterable<?>)
+              Class.forName("org.apache.avro.util.RandomData")
+                  .getDeclaredConstructor(org.apache.avro.Schema.class, Integer.TYPE, Boolean.TYPE)
+                  // force Utf8 in random data to match with String type used in AvroUtils
+                  .newInstance(schema, maxLength, true);

Review Comment:
   In avro 1.9+, `RandomData` has moved from the `tests` classifier lib into the `util` package of the main library and default behavior, has moved to `String` instead of `Utf8`.



##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/AvroSourceTest.java:
##########
@@ -86,6 +89,17 @@ private enum SyncBehavior {
 
   private static final int DEFAULT_RECORD_COUNT = 1000;
 
+  private Iterable<String> avroSupportedCodec() {
+    List<String> codecs = new ArrayList<>();
+    codecs.add(DataFileConstants.NULL_CODEC);
+    codecs.add(DataFileConstants.BZIP2_CODEC);
+    codecs.add(DataFileConstants.DEFLATE_CODEC);
+    codecs.add(DataFileConstants.SNAPPY_CODEC);
+    codecs.add(DataFileConstants.XZ_CODEC);
+    if (!VERSION_AVRO.equals("1.8.2")) codecs.add("zstandard");

Review Comment:
   Added `zstandard` in tests for avro 1.9+



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroSource.java:
##########
@@ -270,11 +265,19 @@ public AvroSource<T> withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreat
   /** Reads files containing records that conform to the given schema. */
   public AvroSource<GenericRecord> withSchema(String schema) {
     checkArgument(schema != null, "schema can not be null");
+    if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
+      return new AvroSource<>(
+          getSingleFileMetadata(),
+          getMinBundleSize(),
+          getStartOffset(),
+          getEndOffset(),
+          mode.withSchema(schema));
+    }

Review Comment:
   I think this case was missed. It was possible to have `SINGLE_FILE_OR_SUBRANGE` for typed avro but not for generic.



##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java:
##########
@@ -323,35 +314,71 @@ public void testKryoSerialization() throws Exception {
   @Test
   public void testPojoEncoding() throws Exception {
     Pojo value = new Pojo("Hello", 42, DATETIME_A);
-    AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
+    AvroCoder<Pojo> coder = AvroCoder.reflect(Pojo.class);
 
     CoderProperties.coderDecodeEncodeEqual(coder, value);
   }
 
   @Test
   public void testSpecificRecordEncoding() throws Exception {
-    if (isBrokenMapComparison()) {
-      // Don't compare the map values because of AVRO-2943
-      AVRO_SPECIFIC_RECORD.setMap(ImmutableMap.of());
-    }
-    AvroCoder<TestAvro> coder =
-        AvroCoder.of(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema(), false);
+    // Don't compare the map values because of AVRO-2943
+    AVRO_SPECIFIC_RECORD.setMap(ImmutableMap.of());
+
+    AvroCoder<TestAvro> coder = AvroCoder.specific(TestAvro.class);
+    AvroCoder<TestAvro> coderWithSchema =
+        AvroCoder.specific(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema());
 
     assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType()));
+    assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType()));
+
     CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD);
+    CoderProperties.coderDecodeEncodeEqual(coderWithSchema, AVRO_SPECIFIC_RECORD);
   }
 
-  private boolean isBrokenMapComparison() {
-    return VERSION_AVRO.equals("1.9.2")
-        || VERSION_AVRO.equals("1.10.2")
-        || VERSION_AVRO.equals("1.11.1");
+  // example to overcome AVRO-2943 limitation with custom datum factory
+  // force usage of String instead of Utf8 when avro type is set to CharSequence
+  static class CustomSpecificDatumFactory<T> extends AvroDatumFactory.SpecificDatumFactory<T> {

Review Comment:
   @mosche here is an example of a custom `AvroDatumFactory`.
   In this case, we change the default model creation by using `String` as underlying `CharSequence` implementation instead of `Utf8`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1211884816


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroJodaTimeConversions.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.avro.schemas.utils;
+
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * Avro 1.8 & 1.9 ship joda time conversions.
+ *
+ * <p>Since avro 1.10, only java time conversions are included. As beam is still joda time based,
+ * and user may work with avro joda time generated classes, Provide joda time logical conversions.
+ *
+ * <p>This code is copied from avro 1.8.2 TimeConversions.
+ */
+public class AvroJodaTimeConversions {
+
+  public static class DateConversion extends Conversion<LocalDate> {
+    private static final LocalDate EPOCH_DATE = new LocalDate(1970, 1, 1);
+
+    @Override
+    public Class<LocalDate> getConvertedType() {
+      return LocalDate.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "date";
+    }
+
+    @Override
+    public LocalDate fromInt(Integer daysFromEpoch, Schema schema, LogicalType type) {
+      return EPOCH_DATE.plusDays(daysFromEpoch);
+    }
+
+    @Override
+    public Integer toInt(LocalDate date, Schema schema, LogicalType type) {
+      return Days.daysBetween(EPOCH_DATE, date).getDays();
+    }
+  }
+
+  public static class TimeConversion extends Conversion<LocalTime> {
+    @Override
+    public Class<LocalTime> getConvertedType() {
+      return LocalTime.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "time-millis";
+    }
+
+    @Override
+    public LocalTime fromInt(Integer millisFromMidnight, Schema schema, LogicalType type) {
+      return LocalTime.fromMillisOfDay(millisFromMidnight);
+    }
+
+    @Override
+    public Integer toInt(LocalTime time, Schema schema, LogicalType type) {
+      return time.millisOfDay().get();
+    }
+  }
+
+  public static class TimeMicrosConversion extends Conversion<LocalTime> {
+    @Override
+    public Class<LocalTime> getConvertedType() {
+      return LocalTime.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "time-micros";
+    }
+
+    @Override
+    public LocalTime fromLong(Long microsFromMidnight, Schema schema, LogicalType type) {
+      return LocalTime.fromMillisOfDay(microsFromMidnight / 1000);
+    }
+  }
+
+  public static class LossyTimeMicrosConversion extends TimeMicrosConversion {
+    @Override
+    public Long toLong(LocalTime time, Schema schema, LogicalType type) {
+      return 1000 * (long) time.millisOfDay().get();
+    }
+  }
+
+  public static class TimestampConversion extends Conversion<DateTime> {
+    @Override
+    public Class<DateTime> getConvertedType() {
+      return DateTime.class;
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "timestamp-millis";
+    }
+
+    @Override
+    public DateTime fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
+      return new DateTime(millisFromEpoch, DateTimeZone.UTC);
+    }
+
+    @Override
+    public Long toLong(DateTime timestamp, Schema schema, LogicalType type) {
+      return timestamp.getMillis();
+    }
+  }
+
+  public static class TimestampMicrosConversion extends Conversion<DateTime> {

Review Comment:
   Looks that joda [DateTime](https://www.joda.org/joda-time/apidocs/org/joda/time/DateTime.html) is precise to the `ms` only:
   > it represents an exact point on the time-line, but limited to the precision of milliseconds.
   
   The `toLong` is encapsulated in the `LossyTimestampMicrosConversion`.
   It is not added by default by the `AvroUtils.addLogicalTypeConversions` to the `ReflectData`. Writing a logical `timestamp-micros` field with a `org.joda.time.DateTime` will fail



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1219741664


##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java:
##########
@@ -323,35 +314,71 @@ public void testKryoSerialization() throws Exception {
   @Test
   public void testPojoEncoding() throws Exception {
     Pojo value = new Pojo("Hello", 42, DATETIME_A);
-    AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
+    AvroCoder<Pojo> coder = AvroCoder.reflect(Pojo.class);
 
     CoderProperties.coderDecodeEncodeEqual(coder, value);
   }
 
   @Test
   public void testSpecificRecordEncoding() throws Exception {
-    if (isBrokenMapComparison()) {
-      // Don't compare the map values because of AVRO-2943
-      AVRO_SPECIFIC_RECORD.setMap(ImmutableMap.of());
-    }
-    AvroCoder<TestAvro> coder =
-        AvroCoder.of(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema(), false);
+    // Don't compare the map values because of AVRO-2943
+    AVRO_SPECIFIC_RECORD.setMap(ImmutableMap.of());
+
+    AvroCoder<TestAvro> coder = AvroCoder.specific(TestAvro.class);
+    AvroCoder<TestAvro> coderWithSchema =
+        AvroCoder.specific(TestAvro.class, AVRO_SPECIFIC_RECORD.getSchema());
 
     assertTrue(SpecificRecord.class.isAssignableFrom(coder.getType()));
+    assertTrue(SpecificRecord.class.isAssignableFrom(coderWithSchema.getType()));
+
     CoderProperties.coderDecodeEncodeEqual(coder, AVRO_SPECIFIC_RECORD);
+    CoderProperties.coderDecodeEncodeEqual(coderWithSchema, AVRO_SPECIFIC_RECORD);
   }
 
-  private boolean isBrokenMapComparison() {
-    return VERSION_AVRO.equals("1.9.2")
-        || VERSION_AVRO.equals("1.10.2")
-        || VERSION_AVRO.equals("1.11.1");
+  // example to overcome AVRO-2943 limitation with custom datum factory
+  // force usage of String instead of Utf8 when avro type is set to CharSequence
+  static class CustomSpecificDatumFactory<T> extends AvroDatumFactory.SpecificDatumFactory<T> {

Review Comment:
   Thanks 💯 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1531429003

   @aromanenko-dev since you followed the previous PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1202495223


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -116,56 +106,40 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-public class AvroCoder<T> extends CustomCoder<T> {
+public abstract class AvroCoder<T> extends CustomCoder<T> {
 
   /**
    * Returns an {@code AvroCoder} instance for the provided element type.
    *
    * @param <T> the element type
    */
   public static <T> AvroCoder<T> of(TypeDescriptor<T> type) {
-    return of(type, true);
-  }
-
-  /**
-   * Returns an {@code AvroCoder} instance for the provided element type, respecting whether to use
-   * Avro's Reflect* or Specific* suite for encoding and decoding.
-   *
-   * @param <T> the element type
-   */
-  public static <T> AvroCoder<T> of(TypeDescriptor<T> type, boolean useReflectApi) {

Review Comment:
   Here and in several other places below, I'd recommend to avoid such breaking changes in public API (delete public methods/classes) and keep and deprecate them instead. Any objections on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1207008565


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -116,56 +106,40 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-public class AvroCoder<T> extends CustomCoder<T> {
+public abstract class AvroCoder<T> extends CustomCoder<T> {

Review Comment:
   Constructors are protected and we did not create any instance of `AvroCoder`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev merged pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev merged PR #26320:
URL: https://github.com/apache/beam/pull/26320


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] RustedBones commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "RustedBones (via GitHub)" <gi...@apache.org>.
RustedBones commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1598263156

   I know the issue:
   The `Conversions.DecimalConversion` must be removed from the `ReflectData` used in the `ReflectDatumFactory`.
   This conversions implies to have `scale` and `precision` set in the schema, which can't be inferred by reflection.
   [ReflectData](https://avro.apache.org/docs/1.11.1/api/java/) by default supports `BigDecimal`, converting it to `String` with the `java-class` property:
   ```
   {"type": "string", "java-class": "java.math.BigDecimal"}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "aromanenko-dev (via GitHub)" <gi...@apache.org>.
aromanenko-dev commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1587354629

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #26320: Select dedicated avro datum reader and writer

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on PR #26320:
URL: https://github.com/apache/beam/pull/26320#issuecomment-1569746656

   Thanks a lot for addressing our concerns @RustedBones :) I'm planning to do another review later this afternoon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #26320: Select dedicated avro datum reader and writer

Posted by "mosche (via GitHub)" <gi...@apache.org>.
mosche commented on code in PR #26320:
URL: https://github.com/apache/beam/pull/26320#discussion_r1219748838


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroIO.java:
##########
@@ -702,6 +739,16 @@ public Read<T> withBeamSchemas(boolean withBeamSchemas) {
       return toBuilder().setInferBeamSchema(withBeamSchemas).build();
     }
 
+    /** Sets a coder for the result of the read function. */
+    public Read<T> withCoder(Coder<T> coder) {

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org