You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/08/29 08:16:56 UTC

[hudi] branch master updated: [HUDI-1225] Fix: Avro Date logical type not handled correctly when converting to Spark Row (#2047)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b417d1  [HUDI-1225] Fix: Avro Date logical type not handled correctly when converting to Spark Row (#2047)
6b417d1 is described below

commit 6b417d1a867c51dad174fce7c68dd7a3146931b5
Author: Thinking Chen <74...@qq.com>
AuthorDate: Sat Aug 29 16:16:42 2020 +0800

    [HUDI-1225] Fix: Avro Date logical type not handled correctly when converting to Spark Row (#2047)
---
 .../org/apache/hudi/AvroConversionHelper.scala     |  8 +--
 .../org/apache/hudi/TestAvroConversionHelper.scala | 59 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index 259f51f..c701e70 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -30,6 +30,7 @@ import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
 import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 
 import scala.collection.JavaConverters._
@@ -110,10 +111,9 @@ object AvroConversionHelper {
             if (item == null) {
               null
             } else {
-              if (item.isInstanceOf[Integer]) {
-                new Date(item.asInstanceOf[Integer].longValue())
-              } else {
-                new Date(item.asInstanceOf[Long])
+              item match {
+                case integer: Integer => DateTimeUtils.toJavaDate(integer)
+                case _ => new Date(item.asInstanceOf[Long])
               }
             }
         case (TimestampType, LONG) =>
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala
new file mode 100644
index 0000000..902359d
--- /dev/null
+++ b/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.time.LocalDate
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericData
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.scalatest.{FunSuite, Matchers}
+
+class TestAvroConversionHelper extends FunSuite with Matchers {
+
+  val dateSchema = s"""
+      {
+        "namespace": "logical",
+        "type": "record",
+        "name": "test",
+        "fields": [
+          {"name": "date", "type": {"type": "int", "logicalType": "date"}}
+        ]
+      }
+    """
+
+  val dateInputData = Seq(7, 365, 0)
+
+  test("Logical type: date") {
+    val schema = new Schema.Parser().parse(dateSchema)
+    val convertor = AvroConversionHelper.createConverterToRow(schema, convertAvroSchemaToStructType(schema))
+
+    val dateOutputData = dateInputData.map(x => {
+      val record = new GenericData.Record(schema) {{ put("date", x) }}
+      convertor(record).asInstanceOf[GenericRow].get(0)
+    })
+
+    println(s"trans data from int [ ${dateInputData.mkString(", ")} ] to date [ ${dateOutputData.mkString(", ")} ]")
+
+    assert(dateOutputData(0).toString === LocalDate.ofEpochDay(dateInputData(0)).toString)
+    assert(dateOutputData(1).toString === LocalDate.ofEpochDay(dateInputData(1)).toString)
+    assert(dateOutputData(2).toString === LocalDate.ofEpochDay(dateInputData(2)).toString)
+  }
+}