You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/01 07:30:29 UTC

[GitHub] [spark] Zhen-hao opened a new pull request #35379: fix bugs in AvroSerializer

Zhen-hao opened a new pull request #35379:
URL: https://github.com/apache/spark/pull/35379


   **What's the problem to fix?**
   
   `AvroSerializer`'s implementation, at least in `newConverter`, was not 100% based on the `InternalRow` and `SpecializedGetters` interface. It assumes many implementation details of the interface. 
   
   For example, in 
   
   ```scala
         case (TimestampType, LONG) => avroType.getLogicalType match {
             // For backward compatibility, if the Avro type is Long and it is not logical type
             // (the `null` case), output the timestamp value as with millisecond precision.
             case null | _: TimestampMillis => (getter, ordinal) =>
               DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
             case _: TimestampMicros => (getter, ordinal) =>
               timestampRebaseFunc(getter.getLong(ordinal))
             case other => throw new IncompatibleSchemaException(errorPrefix +
               s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
           }
   ```
   
   it assumes the `InternalRow` instance encodes `TimestampType` as `java.lang.Long`. That's true for `Unsaferow` but not for `GenericInternalRow`. 
   
   Hence the above code will end up with runtime exceptions when used on an instance of `GenericInternalRow`, which is most of the case for Python.
   
   This PR may not be complete as I don't have much free time to work on it.
   But it should be a good improvement for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang edited a comment on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang edited a comment on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1043721088


   > I totally agree that this may not be the right direction.
   But any fix should pass my added unit tests or produce compiling errors instead of runtime errors.
   This is just my "easy" fix for some issue that we had with a client. We'd like to contribute back and/or make others aware of the issue.
   If I were to have unlimited free time, I would remove all the OO designs and rewrite the whole Spark with typeclasses as it has been painful to work with type casts and interfaces containing Any, AnyRef, or Object.
   I'll address your inline comments later.
   
   Actually, could you provide an end-to-end Python UDF example?  It can be in the PR description instead of code changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1033262392


   `./dev/scalafmt` is sort of a tool for easier dev ... but I don't think that 100% follow our style. We will have to manually add some changes to follow https://github.com/databricks/scala-style-guide for the time being.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1033689749


   intersting.
   sounds like something that @lihaoyi can easily fix.
   
   Meanwhile, I will do my best to revert the formatting changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r798911092



##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -166,35 +204,91 @@ private[sql] class AvroSerializer(
         (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
 
       case (DateType, INT) =>
-        (getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))
+        (getter, ordinal) =>
+          getter.get(ordinal, DateType) match {
+            case epochDays: java.lang.Integer => dateRebaseFunc(epochDays)
+            case date: java.sql.Date => dateRebaseFunc(date.toLocalDate().toEpochDay().toInt)
+            case localDate: java.time.LocalDate => dateRebaseFunc(localDate.toEpochDay().toInt)
+            case other =>
+              throw new IncompatibleSchemaException(s"""
+                  |Expected java.lang.Integer, java.sql.Date, or java.time.LocalDate,
+                  | but found ${other.getClass}""".stripMargin)
+          }
 
-      case (TimestampType, LONG) => avroType.getLogicalType match {
+      case (TimestampType, LONG) =>
+        avroType.getLogicalType match {
           // For backward compatibility, if the Avro type is Long and it is not logical type
           // (the `null` case), output the timestamp value as with millisecond precision.
-          case null | _: TimestampMillis => (getter, ordinal) =>
-            DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
-          case _: TimestampMicros => (getter, ordinal) =>
-            timestampRebaseFunc(getter.getLong(ordinal))
-          case other => throw new IncompatibleSchemaException(errorPrefix +
-            s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
+          case null | _: TimestampMillis =>
+            (getter, ordinal) =>
+              getter.get(ordinal, TimestampType) match {
+                case micros: java.lang.Long =>
+                  DateTimeUtils.microsToMillis(timestampRebaseFunc(micros))
+                case javaTimestamp: java.sql.Timestamp => javaTimestamp.getTime
+                case instant: java.time.Instant => instant.toEpochMilli
+                case other =>

Review comment:
       I realized that it is not as easy as the other case. 
   We can refactor out an `extractMillis` or `extractMicros` method, but it will introduce some computational overhead. There would always be a case of unnecessary back and forth conversion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1033255310


   > do you know if we encourage/discourage formatting-only PRs to make code format match the expectation from ./dev/scalafmt
   
   @xkrogen @Zhen-hao In general we don't do that. 
   Instead, people follow https://github.com/databricks/scala-style-guide and create clean patches.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r809611407



##########
File path: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala
##########
@@ -49,6 +56,194 @@ class AvroSerdeSuite extends SparkFunSuite {
     }
   }
 
+  test("Serialize DecimalType to Avro FIXED with logical type decimal") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaBigDecimal", DecimalType(6, 2), nullable = false),
+          StructField("sparkDecimal", DecimalType(6, 2), nullable = false)))
+
+      val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 32)
+      val logicalType = LogicalTypes.decimal(6, 2)
+      val fieldSchema = logicalType.addToSchema(fixedSchema)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaBigDecimal", fieldSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("sparkDecimal", fieldSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(new java.math.BigDecimal("1000.12"), Decimal("1000.12"))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaDecimal = grec.get("javaBigDecimal").asInstanceOf[GenericFixed]
+      val sparkDecimal = grec.get("sparkDecimal").asInstanceOf[GenericFixed]
+
+      assert(javaDecimal === sparkDecimal)
+      assert(
+        new DecimalConversion().fromFixed(sparkDecimal, fixedSchema, logicalType) ===
+          new java.math.BigDecimal("1000.12"))
+    }
+  }
+
+  test("Serialize DecimalType to Avro BYTES with logical type decimal") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaBigDecimal", DecimalType(6, 2), nullable = false),
+          StructField("sparkDecimal", DecimalType(6, 2), nullable = false)))
+
+      val bytesSchema = Schema.create(BYTES)
+      val logicalType = LogicalTypes.decimal(6, 2)
+      val fieldSchema = logicalType.addToSchema(bytesSchema)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaBigDecimal", fieldSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("sparkDecimal", fieldSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(new java.math.BigDecimal("1000.12"), Decimal("1000.12"))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaDecimal = grec.get("javaBigDecimal").asInstanceOf[ByteBuffer]
+      val sparkDecimal = grec.get("sparkDecimal").asInstanceOf[ByteBuffer]
+
+      assert(javaDecimal === sparkDecimal)
+      assert(
+        new DecimalConversion().fromBytes(sparkDecimal, bytesSchema, logicalType) ===
+          new java.math.BigDecimal("1000.12"))
+    }
+  }
+
+  test("Serialize DateType to Avro INT") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaSqlDate", DateType, nullable = false),
+          StructField("java8TimeDate", DateType, nullable = false)))
+
+      val dateSchema = Schema.create(INT)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaSqlDate", dateSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("java8TimeDate", dateSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(
+        new java.sql.Date(1643121231000L),
+        Instant.ofEpochMilli(1643121231000L).atZone(ZoneId.of("UTC")).toLocalDate())
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaSqlDate = grec.get("javaSqlDate").asInstanceOf[Int]
+      val java8TimeDate = grec.get("java8TimeDate").asInstanceOf[Int]
+
+      assert(javaSqlDate === java8TimeDate)
+      assert(javaSqlDate === 19017) // 19017 is 25 January 2022
+    }
+  }
+
+  test(s"""
+        |Serialize TimestampType to Avro LONG with logical type
+        | timestamp-micros and timestamp-millis
+        """.stripMargin) {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaSqlTimeMicro", TimestampType, nullable = false),
+          StructField("java8TimeInstantMicro", TimestampType, nullable = false),
+          StructField("javaSqlTimeMillis", TimestampType, nullable = false),
+          StructField("java8TimeInstantMillis", TimestampType, nullable = false)))
+
+      val microSchema = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG))
+
+      val millisSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(LONG))
+
+      val avroSchema = Schema.createRecord("name", "doc", "space", true,
+          Seq(
+            new Schema.Field("javaSqlTimeMicro", microSchema, "", null.asInstanceOf[AnyVal]),
+            new Schema.Field("java8TimeInstantMicro", microSchema, "", null.asInstanceOf[AnyVal]),
+            new Schema.Field("javaSqlTimeMillis", millisSchema, "", null.asInstanceOf[AnyVal]),
+            new Schema.Field(
+              "java8TimeInstantMillis",
+              millisSchema,
+              "",
+              null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val epoch = 1643121231000L
+      val epochMicro = 1000 * 1643121231000L
+
+      val input = InternalRow(
+        new java.sql.Timestamp(epoch),
+        Instant.ofEpochMilli(epoch),
+        new java.sql.Timestamp(epoch),
+        Instant.ofEpochMilli(epoch))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+
+      assert(grec.get("javaSqlTimeMicro").asInstanceOf[Long] === epochMicro)
+      assert(grec.get("java8TimeInstantMicro").asInstanceOf[Long] === epochMicro)
+      assert(grec.get("javaSqlTimeMillis").asInstanceOf[Long] === epoch)
+      assert(grec.get("java8TimeInstantMillis").asInstanceOf[Long] === epoch)
+    }
+  }
+
+  test(s"""
+        |Serialize TimestampNTZType to Avro LONG with logical type
+        | timestamp-micros and timestamp-millis
+        """.stripMargin) {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("java8LocalDateTimeMicro", TimestampNTZType, nullable = false),
+          StructField("java8LocalDateTimeMillis", TimestampNTZType, nullable = false)))
+
+      val microSchema = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG))
+
+      val millisSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(LONG))

Review comment:
       ```suggestion
         val microSchema = LogicalTypes.localTimestampMicros().addToSchema(Schema.create(LONG))
   
         val millisSchema = LogicalTypes.localTimestampMillis().addToSchema(Schema.create(LONG))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1028670635


   > @Zhen-hao , this PR contains many whitespace/formatting changes, making it very hard to tell where your actual changes are. Can you please revert such changes to make it more clear what logic you are changing?
   
   Sorry for that. I run `./dev/scalafmt` hoping to fix the linting errors. (I was surprised that it didn't fix all linting errors.) 
   Is it possible that we make formatting an automatic step in compiling and make sure the linting rules are consistent with the formatting rules?
   
   Anyways, that should be a separate PR. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zero323 commented on pull request #35379: fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
zero323 commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1026751151


   Hi @Zhen-hao. Thanks for your proposal.
   
   Could you please open a JIRA ticket on our [issue tracker](https://issues.apache.org/jira/projects/SPARK/issues) (a reproducible examples will be welcomed) and edit the title of this PR accordingly (i.e. `[SPARK-xxxxx][SQL] Here goes description`).
   
   Also, please enable GitHub actions on your fork (you can follow the instructions on the [failed run](https://github.com/apache/spark/pull/35379/checks?check_run_id=5017677272)).
   
   Thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1044223469


   > > I totally agree that this may not be the right direction.
   > > But any fix should pass my added unit tests or produce compiling errors instead of runtime errors.
   > > This is just my "easy" fix for some issue that we had with a client. We'd like to contribute back and/or make others aware of the issue.
   > > If I were to have unlimited free time, I would remove all the OO designs and rewrite the whole Spark with typeclasses as it has been painful to work with type casts and interfaces containing Any, AnyRef, or Object.
   > > I'll address your inline comments later.
   > 
   > Actually, could you provide an end-to-end Python UDF example? It can be in the PR description instead of code changes.
   
   Good question. We didn't find the issue with the public Spark API. 
   We had a copy the `org.apache.spark.sql.avro` package in our code base becuse it is private to Spark and we wanted to build a layer on top of it to make a serialization/deserialization library. We didn't see the problem until we offered our Python users an Avro serialization UDF. We solved the issue first by letting Python users to use not the UDF but scala API via `sc._jvm`. 
   
   I don't know how to repduce the issue with the public API...
   
   Again, the nature of the change in this PR is more about making the Scala code hermetic (or mitigate some leaky abstractions).
   This PR doesn't need to be merged if there are better ways to improve.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang edited a comment on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang edited a comment on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1042730534


   ~~LGTM except for two minor comments. Thanks for the fix @Zhen-hao!~~
   
   After second thought, I doubt whether there is a good reason to merge this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1032975616


   hi @[gengliangwang](https://github.com/gengliangwang),
   Thank you for reviewing this PR. 
   
   Those unnecessary code changes you mentioned were resutls of running `./dev/scalafmt`.
   
   That means the existing code on `master` was merged without running that command. 
   
   One way to fix it is to first merge a PR for formatting only, as hinted by @[xkrogen](https://github.com/xkrogen)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1035457794


   Just rebased on master and reverted the formatting changes. 
   It should be much easier to review now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang edited a comment on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang edited a comment on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1042730534


    LGTM except two minor comments. Thanks for the fix @Zhen-hao!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r798625000



##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -124,26 +136,51 @@ private[sql] class AvroSerializer(
       case (DoubleType, DOUBLE) =>
         (getter, ordinal) => getter.getDouble(ordinal)
       case (d: DecimalType, FIXED)
-        if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) =>
+          if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) =>
         (getter, ordinal) =>
-          val decimal = getter.getDecimal(ordinal, d.precision, d.scale)
-          decimalConversions.toFixed(decimal.toJavaBigDecimal, avroType,
-            LogicalTypes.decimal(d.precision, d.scale))
+          getter.get(ordinal, d) match {
+            case bigDecimal: java.math.BigDecimal =>
+              decimalConversions.toFixed(
+                bigDecimal,
+                avroType,
+                LogicalTypes.decimal(bigDecimal.precision, bigDecimal.scale))

Review comment:
       you are right. even though `bigDecimal.precision` should have the same value as `d.precision`, that's more of an implementation detail. I'll make the change 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on pull request #35379: fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1027684179


   thanks, guys. I'll find some time this week, if not today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r808825964



##########
File path: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala
##########
@@ -49,6 +56,160 @@ class AvroSerdeSuite extends SparkFunSuite {
     }
   }
 
+  test("Serialize DecimalType to Avro FIXED with logical type decimal") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaBigDecimal", DecimalType(6, 2), nullable = false),
+          StructField("sparkDecimal", DecimalType(6, 2), nullable = false)))
+
+      val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 32)
+      val logicalType = LogicalTypes.decimal(6, 2)
+      val fieldSchema = logicalType.addToSchema(fixedSchema)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaBigDecimal", fieldSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("sparkDecimal", fieldSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(new java.math.BigDecimal("1000.12"), Decimal("1000.12"))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaDecimal = grec.get("javaBigDecimal").asInstanceOf[GenericFixed]
+      val sparkDecimal = grec.get("sparkDecimal").asInstanceOf[GenericFixed]
+
+      assert(javaDecimal === sparkDecimal)
+      assert(
+        new DecimalConversion().fromFixed(sparkDecimal, fixedSchema, logicalType) ===
+          new java.math.BigDecimal("1000.12"))
+    }
+  }
+
+  test("Serialize DecimalType to Avro BYTES with logical type decimal") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaBigDecimal", DecimalType(6, 2), nullable = false),
+          StructField("sparkDecimal", DecimalType(6, 2), nullable = false)))
+
+      val bytesSchema = Schema.create(BYTES)
+      val logicalType = LogicalTypes.decimal(6, 2)
+      val fieldSchema = logicalType.addToSchema(bytesSchema)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaBigDecimal", fieldSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("sparkDecimal", fieldSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(new java.math.BigDecimal("1000.12"), Decimal("1000.12"))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaDecimal = grec.get("javaBigDecimal").asInstanceOf[ByteBuffer]
+      val sparkDecimal = grec.get("sparkDecimal").asInstanceOf[ByteBuffer]
+
+      assert(javaDecimal === sparkDecimal)
+      assert(
+        new DecimalConversion().fromBytes(sparkDecimal, bytesSchema, logicalType) ===
+          new java.math.BigDecimal("1000.12"))
+    }
+  }
+
+  test("Serialize DateType to Avro INT") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaSqlDate", DateType, nullable = false),
+          StructField("java8TimeDate", DateType, nullable = false)))
+
+      val dateSchema = Schema.create(INT)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaSqlDate", dateSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("java8TimeDate", dateSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(
+        new java.sql.Date(1643121231000L),
+        Instant.ofEpochMilli(1643121231000L).atZone(ZoneId.of("UTC")).toLocalDate())
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaSqlDate = grec.get("javaSqlDate").asInstanceOf[Int]
+      val java8TimeDate = grec.get("java8TimeDate").asInstanceOf[Int]
+
+      assert(javaSqlDate === java8TimeDate)
+      assert(javaSqlDate === 19017) // 19017 is 25 January 2022
+    }
+  }
+
+  test(s"""

Review comment:
       We need to test TimestampNTZ as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r808827105



##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -166,30 +183,84 @@ private[sql] class AvroSerializer(
         (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
 
       case (DateType, INT) =>
-        (getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))
+        (getter, ordinal) =>
+          getter.get(ordinal, DateType) match {

Review comment:
       Nit: we can save the Int result to a variable and then call `dateRebaseFunc(..)` at the end to reduce duplicated code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1044624375


   > I don't know how to repduce the issue with the public API...
   
   At first, I thought there is such a case with the public API. It's my fault that providing code suggestions without confirming this. 
   Now, after second thought, I doubt whether there is a good reason to merge this one... 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r809611761



##########
File path: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala
##########
@@ -49,6 +56,160 @@ class AvroSerdeSuite extends SparkFunSuite {
     }
   }
 
+  test("Serialize DecimalType to Avro FIXED with logical type decimal") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaBigDecimal", DecimalType(6, 2), nullable = false),
+          StructField("sparkDecimal", DecimalType(6, 2), nullable = false)))
+
+      val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 32)
+      val logicalType = LogicalTypes.decimal(6, 2)
+      val fieldSchema = logicalType.addToSchema(fixedSchema)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaBigDecimal", fieldSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("sparkDecimal", fieldSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(new java.math.BigDecimal("1000.12"), Decimal("1000.12"))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaDecimal = grec.get("javaBigDecimal").asInstanceOf[GenericFixed]
+      val sparkDecimal = grec.get("sparkDecimal").asInstanceOf[GenericFixed]
+
+      assert(javaDecimal === sparkDecimal)
+      assert(
+        new DecimalConversion().fromFixed(sparkDecimal, fixedSchema, logicalType) ===
+          new java.math.BigDecimal("1000.12"))
+    }
+  }
+
+  test("Serialize DecimalType to Avro BYTES with logical type decimal") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaBigDecimal", DecimalType(6, 2), nullable = false),
+          StructField("sparkDecimal", DecimalType(6, 2), nullable = false)))
+
+      val bytesSchema = Schema.create(BYTES)
+      val logicalType = LogicalTypes.decimal(6, 2)
+      val fieldSchema = logicalType.addToSchema(bytesSchema)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaBigDecimal", fieldSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("sparkDecimal", fieldSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(new java.math.BigDecimal("1000.12"), Decimal("1000.12"))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaDecimal = grec.get("javaBigDecimal").asInstanceOf[ByteBuffer]
+      val sparkDecimal = grec.get("sparkDecimal").asInstanceOf[ByteBuffer]
+
+      assert(javaDecimal === sparkDecimal)
+      assert(
+        new DecimalConversion().fromBytes(sparkDecimal, bytesSchema, logicalType) ===
+          new java.math.BigDecimal("1000.12"))
+    }
+  }
+
+  test("Serialize DateType to Avro INT") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaSqlDate", DateType, nullable = false),
+          StructField("java8TimeDate", DateType, nullable = false)))
+
+      val dateSchema = Schema.create(INT)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaSqlDate", dateSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("java8TimeDate", dateSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(
+        new java.sql.Date(1643121231000L),
+        Instant.ofEpochMilli(1643121231000L).atZone(ZoneId.of("UTC")).toLocalDate())
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaSqlDate = grec.get("javaSqlDate").asInstanceOf[Int]
+      val java8TimeDate = grec.get("java8TimeDate").asInstanceOf[Int]
+
+      assert(javaSqlDate === java8TimeDate)
+      assert(javaSqlDate === 19017) // 19017 is 25 January 2022
+    }
+  }
+
+  test(s"""

Review comment:
       There is a bug in the test case




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on pull request #35379: fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1027843407


   hi, can someone tell me what's wrong with my GitHub Action run? 
   
   https://github.com/Zhen-hao/spark/runs/5034902203?check_suite_focus=true


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang edited a comment on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang edited a comment on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1033255310


   > do you know if we encourage/discourage formatting-only PRs to make code format match the expectation from ./dev/scalafmt
   
   @xkrogen @Zhen-hao In general we don't do that. 
   Instead, people follow https://github.com/databricks/scala-style-guide and create clean patches.
   
   ============= update after I try ./dev/scalafmt ===========
   @Zhen-hao I tried ./dev/scalafmt and the tool only formatted my changed lines, which is expected. 
   In the PR, it seems that the whole file is reformatted. Could you double check it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1028676684


   > Left a few inline comments about specific implementation details, but I have a higher-level concern. I'm not confident that this is the right direction to be moving in. It seems that the problem is whoever created the `GenericInternalRow` put the wrong data type into the field. `GenericInternalRow` implements the `SpecializedGetters` interface, so `getDecimal` should work properly on it, right? If the Python UDF logic is putting a `BigDecimal` there, that seems wrong to me, because the implementation of `BaseGenericInternalRow#getDecimal` (which is used by `GenericInternalRow`) assumes that the field contains a `BigDecimal`. The Python UDF logic should either wrap the `BigDecimal` inside of a `Decimal` when storing the object, or use a subclass of `GenericInternalRow` which overrides `getDecimal` to wrap it on an as-needed in the getter.
   > 
   > LMK what you think.
   
   I totally agree that this may not be the right direction. 
   But any fix should pass my added unit tests or produce compiling errors instead of runtime errors.
   
   This is just my "easy" fix for some issue that we had with a client. We'd like to contribute back and/or make others aware of the issue.
   
   If I were to have unlimited free time, I would remove all the OO designs and rewrite the whole Spark with typeclasses as it has been painful to work with type casts and interfaces containing `Any`, `AnyRef`, or `Object`.
   
   I'll address your inline comments later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1028525514


   cc @gengliangwang too FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
xkrogen commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1029211170


   > Sorry for that. I run ./dev/scalafmt hoping to fix the linting errors. (I was surprised that it didn't fix all linting errors.)
   Is it possible that we make formatting an automatic step in compiling and make sure the linting rules are consistent with the formatting rules?
   
   No problem ... Totally agreed that the formatter and linting rules should be in sync. I'm the one who wrote most of the code that had the whitespace updated here and I usually assume that the linter will catch any issues :) I can put up a PR addressing the formatting issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r800299739



##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -49,47 +54,54 @@ private[sql] class AvroSerializer(
     rootAvroType: Schema,
     nullable: Boolean,
     positionalFieldMatch: Boolean,
-    datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging {
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value)
+    extends Logging {
 
   def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) = {
-    this(rootCatalystType, rootAvroType, nullable, positionalFieldMatch = false,
+    this(
+      rootCatalystType,
+      rootAvroType,
+      nullable,
+      positionalFieldMatch = false,
       LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
   }
 
   def serialize(catalystData: Any): Any = {
     converter.apply(catalystData)
   }
 
-  private val dateRebaseFunc = DataSourceUtils.createDateRebaseFuncInWrite(
-    datetimeRebaseMode, "Avro")
+  private val dateRebaseFunc =
+    DataSourceUtils.createDateRebaseFuncInWrite(datetimeRebaseMode, "Avro")
 
-  private val timestampRebaseFunc = DataSourceUtils.createTimestampRebaseFuncInWrite(
-    datetimeRebaseMode, "Avro")
+  private val timestampRebaseFunc =
+    DataSourceUtils.createTimestampRebaseFuncInWrite(datetimeRebaseMode, "Avro")
 
   private val converter: Any => Any = {
     val actualAvroType = resolveNullableType(rootAvroType, nullable)
-    val baseConverter = try {
-      rootCatalystType match {
-        case st: StructType =>
-          newStructConverter(st, actualAvroType, Nil, Nil).asInstanceOf[Any => Any]
-        case _ =>
-          val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
-          val converter = newConverter(rootCatalystType, actualAvroType, Nil, Nil)
-          (data: Any) =>
-            tmpRow.update(0, data)
-            converter.apply(tmpRow, 0)
-      }
-    } catch {
-      case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException(
-        s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise)
-    }
-    if (nullable) {
-      (data: Any) =>
-        if (data == null) {
-          null
-        } else {
-          baseConverter.apply(data)
+    val baseConverter =

Review comment:
       unnecessary code change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r800299605



##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -49,47 +54,54 @@ private[sql] class AvroSerializer(
     rootAvroType: Schema,
     nullable: Boolean,
     positionalFieldMatch: Boolean,
-    datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging {
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value)
+    extends Logging {
 
   def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) = {
-    this(rootCatalystType, rootAvroType, nullable, positionalFieldMatch = false,
+    this(

Review comment:
       unnecessary code change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1031046214


   @Zhen-hao Thanks for the first-time contribution. I find this PR contains a lot of unnecessary code changes. Can you clean them up?
   
   Also, please follow the PR template 
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r800299651



##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -49,47 +54,54 @@ private[sql] class AvroSerializer(
     rootAvroType: Schema,
     nullable: Boolean,
     positionalFieldMatch: Boolean,
-    datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging {
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value)
+    extends Logging {
 
   def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) = {
-    this(rootCatalystType, rootAvroType, nullable, positionalFieldMatch = false,
+    this(
+      rootCatalystType,
+      rootAvroType,
+      nullable,
+      positionalFieldMatch = false,
       LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
   }
 
   def serialize(catalystData: Any): Any = {
     converter.apply(catalystData)
   }
 
-  private val dateRebaseFunc = DataSourceUtils.createDateRebaseFuncInWrite(
-    datetimeRebaseMode, "Avro")
+  private val dateRebaseFunc =
+    DataSourceUtils.createDateRebaseFuncInWrite(datetimeRebaseMode, "Avro")

Review comment:
       unnecessary code change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang edited a comment on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang edited a comment on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1033255310


   > do you know if we encourage/discourage formatting-only PRs to make code format match the expectation from ./dev/scalafmt
   
   @xkrogen @Zhen-hao In general we don't do that. 
   Instead, people follow https://github.com/databricks/scala-style-guide and create clean patches.
   
   ============= update after I try ./dev/scalafmt ===========
   It seems that the tool scalafmt always reformats the whole changed file: https://github.com/scalameta/scalafmt/issues/882#issuecomment-389797398


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao edited a comment on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao edited a comment on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1032975616


   hi @gengliangwang,
   Thank you for reviewing this PR. 
   
   Those unnecessary code changes you mentioned were resutls of running `./dev/scalafmt`.
   
   That means the existing code on `master` was merged without running that command. 
   
   One way to fix it is to first merge a PR for formatting only, as hinted by @xkrogen


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r798007663



##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -166,35 +204,91 @@ private[sql] class AvroSerializer(
         (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
 
       case (DateType, INT) =>
-        (getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))
+        (getter, ordinal) =>
+          getter.get(ordinal, DateType) match {
+            case epochDays: java.lang.Integer => dateRebaseFunc(epochDays)
+            case date: java.sql.Date => dateRebaseFunc(date.toLocalDate().toEpochDay().toInt)
+            case localDate: java.time.LocalDate => dateRebaseFunc(localDate.toEpochDay().toInt)
+            case other =>
+              throw new IncompatibleSchemaException(s"""
+                  |Expected java.lang.Integer, java.sql.Date, or java.time.LocalDate,
+                  | but found ${other.getClass}""".stripMargin)
+          }
 
-      case (TimestampType, LONG) => avroType.getLogicalType match {
+      case (TimestampType, LONG) =>
+        avroType.getLogicalType match {
           // For backward compatibility, if the Avro type is Long and it is not logical type
           // (the `null` case), output the timestamp value as with millisecond precision.
-          case null | _: TimestampMillis => (getter, ordinal) =>
-            DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
-          case _: TimestampMicros => (getter, ordinal) =>
-            timestampRebaseFunc(getter.getLong(ordinal))
-          case other => throw new IncompatibleSchemaException(errorPrefix +
-            s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
+          case null | _: TimestampMillis =>
+            (getter, ordinal) =>
+              getter.get(ordinal, TimestampType) match {
+                case micros: java.lang.Long =>
+                  DateTimeUtils.microsToMillis(timestampRebaseFunc(micros))
+                case javaTimestamp: java.sql.Timestamp => javaTimestamp.getTime
+                case instant: java.time.Instant => instant.toEpochMilli
+                case other =>

Review comment:
       Similar to my comment above about BigDecimal, can we share more of this conversion logic?

##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -124,26 +136,51 @@ private[sql] class AvroSerializer(
       case (DoubleType, DOUBLE) =>
         (getter, ordinal) => getter.getDouble(ordinal)
       case (d: DecimalType, FIXED)
-        if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) =>
+          if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) =>
         (getter, ordinal) =>
-          val decimal = getter.getDecimal(ordinal, d.precision, d.scale)
-          decimalConversions.toFixed(decimal.toJavaBigDecimal, avroType,
-            LogicalTypes.decimal(d.precision, d.scale))
+          getter.get(ordinal, d) match {
+            case bigDecimal: java.math.BigDecimal =>
+              decimalConversions.toFixed(
+                bigDecimal,
+                avroType,
+                LogicalTypes.decimal(bigDecimal.precision, bigDecimal.scale))
+            case decimal: Decimal =>
+              decimalConversions.toFixed(
+                decimal.toJavaBigDecimal,
+                avroType,
+                LogicalTypes.decimal(d.precision, d.scale))
+            case other =>
+              throw new IncompatibleSchemaException(
+                s"Expected java.math.BigDecimal or Decimal, found ${other.getClass}")
+          }
 
       case (d: DecimalType, BYTES)
-        if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) =>
+          if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) =>
         (getter, ordinal) =>
-          val decimal = getter.getDecimal(ordinal, d.precision, d.scale)
-          decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
-            LogicalTypes.decimal(d.precision, d.scale))
+          getter.get(ordinal, d) match {
+            case bigDecimal: java.math.BigDecimal =>
+              decimalConversions.toBytes(
+                bigDecimal,
+                avroType,
+                LogicalTypes.decimal(bigDecimal.precision, bigDecimal.scale))
+            case decimal: Decimal =>
+              decimalConversions.toBytes(
+                decimal.toJavaBigDecimal,
+                avroType,
+                LogicalTypes.decimal(d.precision, d.scale))

Review comment:
       Assuming that we pull the precision/scale from `d` as I suggest above, we can simplify this to just:
   ```
   val decimal = getter.get(ordinal, d) {
     case bd: java.math.BigDecimal => bd
     case dec: Decimal => dec.toJavaBigDecimal
   }
   decimalConversions.toBytes(decimal, avroType, LogicalTypes.decimal(d.precision, d.scale))
   ```
   
   It would also be good to share this impl with the `(DecimalType, FIXED)` branch. Maybe a method like
   ```scala
   def extractDecimal(getter: xxx, ordinal: Int, d: DecimalType): java.math.BigDecimal = getter.get(ordinal, d) {
     case bd: java.math.BigDecimal => bd
     case dec: Decimal => dec.toJavaBigDecimal
   }
   ```

##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -124,26 +136,51 @@ private[sql] class AvroSerializer(
       case (DoubleType, DOUBLE) =>
         (getter, ordinal) => getter.getDouble(ordinal)
       case (d: DecimalType, FIXED)
-        if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) =>
+          if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) =>
         (getter, ordinal) =>
-          val decimal = getter.getDecimal(ordinal, d.precision, d.scale)
-          decimalConversions.toFixed(decimal.toJavaBigDecimal, avroType,
-            LogicalTypes.decimal(d.precision, d.scale))
+          getter.get(ordinal, d) match {
+            case bigDecimal: java.math.BigDecimal =>
+              decimalConversions.toFixed(
+                bigDecimal,
+                avroType,
+                LogicalTypes.decimal(bigDecimal.precision, bigDecimal.scale))

Review comment:
       Is it correct for us to use the precision/scale from `bigDecimal`? I think we should still be using `d.precision` and `d.scale`, respecting the values from the Catalyst schema, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r809611693



##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -166,30 +183,85 @@ private[sql] class AvroSerializer(
         (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
 
       case (DateType, INT) =>
-        (getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))
+        (getter, ordinal) =>
+          val epochDays: Int = getter.get(ordinal, DateType) match {
+            case epochDays: java.lang.Integer => epochDays
+            case date: java.sql.Date => date.toLocalDate().toEpochDay().toInt
+            case localDate: java.time.LocalDate => localDate.toEpochDay().toInt
+            case other =>
+              throw new IncompatibleSchemaException(s"""
+                  |Expected java.lang.Integer, java.sql.Date, or java.time.LocalDate,
+                  | but found ${other.getClass}""".stripMargin)
+          }
+          dateRebaseFunc(epochDays)
 
-      case (TimestampType, LONG) => avroType.getLogicalType match {
+      case (TimestampType, LONG) =>
+        avroType.getLogicalType match {
           // For backward compatibility, if the Avro type is Long and it is not logical type
           // (the `null` case), output the timestamp value as with millisecond precision.
-          case null | _: TimestampMillis => (getter, ordinal) =>
-            DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
-          case _: TimestampMicros => (getter, ordinal) =>
-            timestampRebaseFunc(getter.getLong(ordinal))
-          case other => throw new IncompatibleSchemaException(errorPrefix +
-            s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
+          case null | _: TimestampMillis =>
+            (getter, ordinal) =>
+              getter.get(ordinal, TimestampType) match {
+                case micros: java.lang.Long =>
+                  DateTimeUtils.microsToMillis(timestampRebaseFunc(micros))
+                case javaTimestamp: java.sql.Timestamp => javaTimestamp.getTime
+                case instant: java.time.Instant => instant.toEpochMilli
+                case other =>
+                  throw new IncompatibleSchemaException(s"""
+                       |Expected java.lang.Long, java.sql.Timestamp, or java.time.Instant,
+                       | but found ${other.getClass}""".stripMargin)
+              }
+          case _: TimestampMicros =>
+            (getter, ordinal) =>
+              getter.get(ordinal, TimestampType) match {
+                case micros: java.lang.Long => timestampRebaseFunc(micros)
+                case javaTimestamp: java.sql.Timestamp =>
+                  timestampRebaseFunc(DateTimeUtils.millisToMicros(javaTimestamp.getTime))
+                case instant: java.time.Instant =>
+                  timestampRebaseFunc(DateTimeUtils.millisToMicros(instant.toEpochMilli))
+                case other =>
+                  throw new IncompatibleSchemaException(s"""
+                       |Expected Expected java.lang.Long, java.sql.Timestamp, or java.time.Instant,
+                       | but found ${other.getClass}""".stripMargin)
+              }
+          case other =>
+            throw new IncompatibleSchemaException(
+              errorPrefix +
+                s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
         }
 
-      case (TimestampNTZType, LONG) => avroType.getLogicalType match {
-        // To keep consistent with TimestampType, if the Avro type is Long and it is not
-        // logical type (the `null` case), output the TimestampNTZ as long value
-        // in millisecond precision.
-        case null | _: LocalTimestampMillis => (getter, ordinal) =>
-          DateTimeUtils.microsToMillis(getter.getLong(ordinal))
-        case _: LocalTimestampMicros => (getter, ordinal) =>
-          getter.getLong(ordinal)
-        case other => throw new IncompatibleSchemaException(errorPrefix +
-          s"SQL type ${TimestampNTZType.sql} cannot be converted to Avro logical type $other")
-      }
+      case (TimestampNTZType, LONG) =>
+        avroType.getLogicalType match {
+          // To keep consistent with TimestampType, if the Avro type is Long and it is not
+          // logical type (the `null` case), output the TimestampNTZ as long value
+          // in millisecond precision.
+          case null | _: LocalTimestampMillis | _: TimestampMillis =>
+            (getter, ordinal) =>
+              getter.get(ordinal, TimestampNTZType) match {
+                case micros: java.lang.Long => DateTimeUtils.microsToMillis(micros)
+                case localDateTime: java.time.LocalDateTime =>
+                  localDateTime.atZone(java.time.ZoneId.of("UTC")).toInstant().toEpochMilli()
+                case other =>
+                  throw new IncompatibleSchemaException(s"""
+                       |Expected java.lang.Long or java.time.LocalDateTime,
+                       | but found ${other.getClass}""".stripMargin)
+              }
+          case _: LocalTimestampMicros | _: TimestampMicros =>

Review comment:
       ```suggestion
             case _: LocalTimestampMicros =>
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r809611580



##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -166,30 +183,85 @@ private[sql] class AvroSerializer(
         (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
 
       case (DateType, INT) =>
-        (getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))
+        (getter, ordinal) =>
+          val epochDays: Int = getter.get(ordinal, DateType) match {
+            case epochDays: java.lang.Integer => epochDays
+            case date: java.sql.Date => date.toLocalDate().toEpochDay().toInt
+            case localDate: java.time.LocalDate => localDate.toEpochDay().toInt
+            case other =>
+              throw new IncompatibleSchemaException(s"""
+                  |Expected java.lang.Integer, java.sql.Date, or java.time.LocalDate,
+                  | but found ${other.getClass}""".stripMargin)
+          }
+          dateRebaseFunc(epochDays)
 
-      case (TimestampType, LONG) => avroType.getLogicalType match {
+      case (TimestampType, LONG) =>
+        avroType.getLogicalType match {
           // For backward compatibility, if the Avro type is Long and it is not logical type
           // (the `null` case), output the timestamp value as with millisecond precision.
-          case null | _: TimestampMillis => (getter, ordinal) =>
-            DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
-          case _: TimestampMicros => (getter, ordinal) =>
-            timestampRebaseFunc(getter.getLong(ordinal))
-          case other => throw new IncompatibleSchemaException(errorPrefix +
-            s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
+          case null | _: TimestampMillis =>
+            (getter, ordinal) =>
+              getter.get(ordinal, TimestampType) match {
+                case micros: java.lang.Long =>
+                  DateTimeUtils.microsToMillis(timestampRebaseFunc(micros))
+                case javaTimestamp: java.sql.Timestamp => javaTimestamp.getTime
+                case instant: java.time.Instant => instant.toEpochMilli
+                case other =>
+                  throw new IncompatibleSchemaException(s"""
+                       |Expected java.lang.Long, java.sql.Timestamp, or java.time.Instant,
+                       | but found ${other.getClass}""".stripMargin)
+              }
+          case _: TimestampMicros =>
+            (getter, ordinal) =>
+              getter.get(ordinal, TimestampType) match {
+                case micros: java.lang.Long => timestampRebaseFunc(micros)
+                case javaTimestamp: java.sql.Timestamp =>
+                  timestampRebaseFunc(DateTimeUtils.millisToMicros(javaTimestamp.getTime))
+                case instant: java.time.Instant =>
+                  timestampRebaseFunc(DateTimeUtils.millisToMicros(instant.toEpochMilli))
+                case other =>
+                  throw new IncompatibleSchemaException(s"""
+                       |Expected Expected java.lang.Long, java.sql.Timestamp, or java.time.Instant,
+                       | but found ${other.getClass}""".stripMargin)
+              }
+          case other =>
+            throw new IncompatibleSchemaException(
+              errorPrefix +
+                s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
         }
 
-      case (TimestampNTZType, LONG) => avroType.getLogicalType match {
-        // To keep consistent with TimestampType, if the Avro type is Long and it is not
-        // logical type (the `null` case), output the TimestampNTZ as long value
-        // in millisecond precision.
-        case null | _: LocalTimestampMillis => (getter, ordinal) =>
-          DateTimeUtils.microsToMillis(getter.getLong(ordinal))
-        case _: LocalTimestampMicros => (getter, ordinal) =>
-          getter.getLong(ordinal)
-        case other => throw new IncompatibleSchemaException(errorPrefix +
-          s"SQL type ${TimestampNTZType.sql} cannot be converted to Avro logical type $other")
-      }
+      case (TimestampNTZType, LONG) =>
+        avroType.getLogicalType match {
+          // To keep consistent with TimestampType, if the Avro type is Long and it is not
+          // logical type (the `null` case), output the TimestampNTZ as long value
+          // in millisecond precision.
+          case null | _: LocalTimestampMillis | _: TimestampMillis =>

Review comment:
       ```suggestion
             case null | _: LocalTimestampMillis =>
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r809501365



##########
File path: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala
##########
@@ -49,6 +56,160 @@ class AvroSerdeSuite extends SparkFunSuite {
     }
   }
 
+  test("Serialize DecimalType to Avro FIXED with logical type decimal") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaBigDecimal", DecimalType(6, 2), nullable = false),
+          StructField("sparkDecimal", DecimalType(6, 2), nullable = false)))
+
+      val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 32)
+      val logicalType = LogicalTypes.decimal(6, 2)
+      val fieldSchema = logicalType.addToSchema(fixedSchema)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaBigDecimal", fieldSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("sparkDecimal", fieldSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(new java.math.BigDecimal("1000.12"), Decimal("1000.12"))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaDecimal = grec.get("javaBigDecimal").asInstanceOf[GenericFixed]
+      val sparkDecimal = grec.get("sparkDecimal").asInstanceOf[GenericFixed]
+
+      assert(javaDecimal === sparkDecimal)
+      assert(
+        new DecimalConversion().fromFixed(sparkDecimal, fixedSchema, logicalType) ===
+          new java.math.BigDecimal("1000.12"))
+    }
+  }
+
+  test("Serialize DecimalType to Avro BYTES with logical type decimal") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaBigDecimal", DecimalType(6, 2), nullable = false),
+          StructField("sparkDecimal", DecimalType(6, 2), nullable = false)))
+
+      val bytesSchema = Schema.create(BYTES)
+      val logicalType = LogicalTypes.decimal(6, 2)
+      val fieldSchema = logicalType.addToSchema(bytesSchema)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaBigDecimal", fieldSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("sparkDecimal", fieldSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(new java.math.BigDecimal("1000.12"), Decimal("1000.12"))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaDecimal = grec.get("javaBigDecimal").asInstanceOf[ByteBuffer]
+      val sparkDecimal = grec.get("sparkDecimal").asInstanceOf[ByteBuffer]
+
+      assert(javaDecimal === sparkDecimal)
+      assert(
+        new DecimalConversion().fromBytes(sparkDecimal, bytesSchema, logicalType) ===
+          new java.math.BigDecimal("1000.12"))
+    }
+  }
+
+  test("Serialize DateType to Avro INT") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaSqlDate", DateType, nullable = false),
+          StructField("java8TimeDate", DateType, nullable = false)))
+
+      val dateSchema = Schema.create(INT)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaSqlDate", dateSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("java8TimeDate", dateSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(
+        new java.sql.Date(1643121231000L),
+        Instant.ofEpochMilli(1643121231000L).atZone(ZoneId.of("UTC")).toLocalDate())
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaSqlDate = grec.get("javaSqlDate").asInstanceOf[Int]
+      val java8TimeDate = grec.get("java8TimeDate").asInstanceOf[Int]
+
+      assert(javaSqlDate === java8TimeDate)
+      assert(javaSqlDate === 19017) // 19017 is 25 January 2022
+    }
+  }
+
+  test(s"""

Review comment:
       good one. found a bug with the added test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1043721088


   > I totally agree that this may not be the right direction.
   But any fix should pass my added unit tests or produce compiling errors instead of runtime errors.
   This is just my "easy" fix for some issue that we had with a client. We'd like to contribute back and/or make others aware of the issue.
   If I were to have unlimited free time, I would remove all the OO designs and rewrite the whole Spark with typeclasses as it has been painful to work with type casts and interfaces containing Any, AnyRef, or Object.
   I'll address your inline comments later.
   
   Actually, could you provide an end-to-end Python UDF case?  It can be in the PR description instead of code changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1044799814


   > > I don't know how to repduce the issue with the public API...
   > 
   > At first, I thought there is such a case with the public API. It's my fault that providing code suggestions without confirming this. Now, after second thought, I doubt whether there is a good reason to merge this one...
   
   I would say it is up to the reviewers. 
   I don't want to spend more time to see if it is an issue for the public API, and it is fine with me if the reviewers decide to close this PR. 
   
   I'll be happy as long as there is a fix to this in the future. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1042730534


   @Zhen-hao LGTM except two minor comments. Thanks for the fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Zhen-hao commented on a change in pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
Zhen-hao commented on a change in pull request #35379:
URL: https://github.com/apache/spark/pull/35379#discussion_r809810109



##########
File path: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSerdeSuite.scala
##########
@@ -49,6 +56,194 @@ class AvroSerdeSuite extends SparkFunSuite {
     }
   }
 
+  test("Serialize DecimalType to Avro FIXED with logical type decimal") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaBigDecimal", DecimalType(6, 2), nullable = false),
+          StructField("sparkDecimal", DecimalType(6, 2), nullable = false)))
+
+      val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 32)
+      val logicalType = LogicalTypes.decimal(6, 2)
+      val fieldSchema = logicalType.addToSchema(fixedSchema)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaBigDecimal", fieldSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("sparkDecimal", fieldSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(new java.math.BigDecimal("1000.12"), Decimal("1000.12"))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaDecimal = grec.get("javaBigDecimal").asInstanceOf[GenericFixed]
+      val sparkDecimal = grec.get("sparkDecimal").asInstanceOf[GenericFixed]
+
+      assert(javaDecimal === sparkDecimal)
+      assert(
+        new DecimalConversion().fromFixed(sparkDecimal, fixedSchema, logicalType) ===
+          new java.math.BigDecimal("1000.12"))
+    }
+  }
+
+  test("Serialize DecimalType to Avro BYTES with logical type decimal") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaBigDecimal", DecimalType(6, 2), nullable = false),
+          StructField("sparkDecimal", DecimalType(6, 2), nullable = false)))
+
+      val bytesSchema = Schema.create(BYTES)
+      val logicalType = LogicalTypes.decimal(6, 2)
+      val fieldSchema = logicalType.addToSchema(bytesSchema)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaBigDecimal", fieldSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("sparkDecimal", fieldSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(new java.math.BigDecimal("1000.12"), Decimal("1000.12"))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaDecimal = grec.get("javaBigDecimal").asInstanceOf[ByteBuffer]
+      val sparkDecimal = grec.get("sparkDecimal").asInstanceOf[ByteBuffer]
+
+      assert(javaDecimal === sparkDecimal)
+      assert(
+        new DecimalConversion().fromBytes(sparkDecimal, bytesSchema, logicalType) ===
+          new java.math.BigDecimal("1000.12"))
+    }
+  }
+
+  test("Serialize DateType to Avro INT") {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaSqlDate", DateType, nullable = false),
+          StructField("java8TimeDate", DateType, nullable = false)))
+
+      val dateSchema = Schema.create(INT)
+
+      val avroSchema = Schema.createRecord(
+        "name",
+        "doc",
+        "space",
+        true,
+        Seq(
+          new Schema.Field("javaSqlDate", dateSchema, "", null.asInstanceOf[AnyVal]),
+          new Schema.Field("java8TimeDate", dateSchema, "", null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val input = InternalRow(
+        new java.sql.Date(1643121231000L),
+        Instant.ofEpochMilli(1643121231000L).atZone(ZoneId.of("UTC")).toLocalDate())
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+      val javaSqlDate = grec.get("javaSqlDate").asInstanceOf[Int]
+      val java8TimeDate = grec.get("java8TimeDate").asInstanceOf[Int]
+
+      assert(javaSqlDate === java8TimeDate)
+      assert(javaSqlDate === 19017) // 19017 is 25 January 2022
+    }
+  }
+
+  test(s"""
+        |Serialize TimestampType to Avro LONG with logical type
+        | timestamp-micros and timestamp-millis
+        """.stripMargin) {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("javaSqlTimeMicro", TimestampType, nullable = false),
+          StructField("java8TimeInstantMicro", TimestampType, nullable = false),
+          StructField("javaSqlTimeMillis", TimestampType, nullable = false),
+          StructField("java8TimeInstantMillis", TimestampType, nullable = false)))
+
+      val microSchema = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG))
+
+      val millisSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(LONG))
+
+      val avroSchema = Schema.createRecord("name", "doc", "space", true,
+          Seq(
+            new Schema.Field("javaSqlTimeMicro", microSchema, "", null.asInstanceOf[AnyVal]),
+            new Schema.Field("java8TimeInstantMicro", microSchema, "", null.asInstanceOf[AnyVal]),
+            new Schema.Field("javaSqlTimeMillis", millisSchema, "", null.asInstanceOf[AnyVal]),
+            new Schema.Field(
+              "java8TimeInstantMillis",
+              millisSchema,
+              "",
+              null.asInstanceOf[AnyVal])).asJava)
+
+      val serializer = Serializer.create(structType, avroSchema, fieldMatch)
+
+      val epoch = 1643121231000L
+      val epochMicro = 1000 * 1643121231000L
+
+      val input = InternalRow(
+        new java.sql.Timestamp(epoch),
+        Instant.ofEpochMilli(epoch),
+        new java.sql.Timestamp(epoch),
+        Instant.ofEpochMilli(epoch))
+
+      val grec = serializer.serialize(input).asInstanceOf[GenericRecord]
+
+      assert(grec.get("javaSqlTimeMicro").asInstanceOf[Long] === epochMicro)
+      assert(grec.get("java8TimeInstantMicro").asInstanceOf[Long] === epochMicro)
+      assert(grec.get("javaSqlTimeMillis").asInstanceOf[Long] === epoch)
+      assert(grec.get("java8TimeInstantMillis").asInstanceOf[Long] === epoch)
+    }
+  }
+
+  test(s"""
+        |Serialize TimestampNTZType to Avro LONG with logical type
+        | timestamp-micros and timestamp-millis
+        """.stripMargin) {
+    withFieldMatchType { fieldMatch =>
+      val structType = StructType(
+        Seq(
+          StructField("java8LocalDateTimeMicro", TimestampNTZType, nullable = false),
+          StructField("java8LocalDateTimeMillis", TimestampNTZType, nullable = false)))
+
+      val microSchema = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG))
+
+      val millisSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(LONG))

Review comment:
       good catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
xkrogen commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1029277037


   Actually I'm not sure how we typically treat formatting-only PRs .. @HyukjinKwon or @dongjoon-hyun, do you know if we encourage/discourage formatting-only PRs to make code format match the expectation from `./dev/scalafmt`? I vaguely remember something like that being shot down in the past, in favor of making Git history more readable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35379: fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1026690153


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #35379: [SPARK-38091][SQL] fix bugs in AvroSerializer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #35379:
URL: https://github.com/apache/spark/pull/35379#issuecomment-1033712748


   lol @lihaoyi WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org