You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/05/28 00:19:17 UTC

[hudi] 34/40: [HUDI-863] get decimal properties from derived spark DataType (#1596)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4192cc65d02640f53db78922a1783103e7a46cfb
Author: rolandjohann <rm...@gmail.com>
AuthorDate: Mon May 18 13:28:27 2020 +0200

    [HUDI-863] get decimal properties from derived spark DataType (#1596)
---
 .../org/apache/hudi/AvroConversionHelper.scala     | 22 ++++++++++------------
 .../org/apache/hudi/AvroConversionUtils.scala      |  4 +---
 2 files changed, 11 insertions(+), 15 deletions(-)

diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index 43225bc..69e6376 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -268,8 +268,7 @@ object AvroConversionHelper {
     createConverter(sourceAvroSchema, targetSqlType, List.empty[String])
   }
 
-  def createConverterToAvro(avroSchema: Schema,
-                            dataType: DataType,
+  def createConverterToAvro(dataType: DataType,
                             structName: String,
                             recordNamespace: String): Any => Any = {
     dataType match {
@@ -284,13 +283,15 @@ object AvroConversionHelper {
         if (item == null) null else item.asInstanceOf[Byte].intValue
       case ShortType => (item: Any) =>
         if (item == null) null else item.asInstanceOf[Short].intValue
-      case dec: DecimalType => (item: Any) =>
-        Option(item).map { _ =>
-          val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal]
-          val decimalConversions = new DecimalConversion()
-          decimalConversions.toFixed(bigDecimalValue, avroSchema.getField(structName).schema().getTypes.get(0),
-            LogicalTypes.decimal(dec.precision, dec.scale))
-        }.orNull
+      case dec: DecimalType =>
+        val schema = SchemaConverters.toAvroType(dec, nullable = false, structName, recordNamespace)
+        (item: Any) => {
+          Option(item).map { _ =>
+            val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal]
+            val decimalConversions = new DecimalConversion()
+            decimalConversions.toFixed(bigDecimalValue, schema, LogicalTypes.decimal(dec.precision, dec.scale))
+          }.orNull
+        }
       case TimestampType => (item: Any) =>
         // Convert time to microseconds since spark-avro by default converts TimestampType to
         // Avro Logical TimestampMicros
@@ -299,7 +300,6 @@ object AvroConversionHelper {
         Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull
       case ArrayType(elementType, _) =>
         val elementConverter = createConverterToAvro(
-          avroSchema,
           elementType,
           structName,
           recordNamespace)
@@ -320,7 +320,6 @@ object AvroConversionHelper {
         }
       case MapType(StringType, valueType, _) =>
         val valueConverter = createConverterToAvro(
-          avroSchema,
           valueType,
           structName,
           recordNamespace)
@@ -340,7 +339,6 @@ object AvroConversionHelper {
         val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName
         val fieldConverters = structType.fields.map(field =>
           createConverterToAvro(
-            avroSchema,
             field.dataType,
             field.name,
             childNameSpace))
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 04de1c7..bdb8955 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -38,14 +38,12 @@ object AvroConversionUtils {
   : RDD[GenericRecord] = {
     // Use the Avro schema to derive the StructType which has the correct nullability information
     val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
-    val avroSchemaAsJsonString = avroSchema.toString
     val encoder = RowEncoder.apply(dataType).resolveAndBind()
     df.queryExecution.toRdd.map(encoder.fromRow)
       .mapPartitions { records =>
         if (records.isEmpty) Iterator.empty
         else {
-          val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString)
-          val convertor = AvroConversionHelper.createConverterToAvro(avroSchema, dataType, structName, recordNamespace)
+          val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
           records.map { x => convertor(x).asInstanceOf[GenericRecord] }
         }
       }