You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/05/18 11:28:39 UTC
[incubator-hudi] branch master updated: [HUDI-863] get decimal
properties from derived spark DataType (#1596)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 459356e [HUDI-863] get decimal properties from derived spark DataType (#1596)
459356e is described below
commit 459356e292ea869ffe5f39235646dc474da76ea5
Author: rolandjohann <rm...@gmail.com>
AuthorDate: Mon May 18 13:28:27 2020 +0200
[HUDI-863] get decimal properties from derived spark DataType (#1596)
---
.../org/apache/hudi/AvroConversionHelper.scala | 22 ++++++++++------------
.../org/apache/hudi/AvroConversionUtils.scala | 4 +---
2 files changed, 11 insertions(+), 15 deletions(-)
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index 43225bc..69e6376 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -268,8 +268,7 @@ object AvroConversionHelper {
createConverter(sourceAvroSchema, targetSqlType, List.empty[String])
}
- def createConverterToAvro(avroSchema: Schema,
- dataType: DataType,
+ def createConverterToAvro(dataType: DataType,
structName: String,
recordNamespace: String): Any => Any = {
dataType match {
@@ -284,13 +283,15 @@ object AvroConversionHelper {
if (item == null) null else item.asInstanceOf[Byte].intValue
case ShortType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Short].intValue
- case dec: DecimalType => (item: Any) =>
- Option(item).map { _ =>
- val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal]
- val decimalConversions = new DecimalConversion()
- decimalConversions.toFixed(bigDecimalValue, avroSchema.getField(structName).schema().getTypes.get(0),
- LogicalTypes.decimal(dec.precision, dec.scale))
- }.orNull
+ case dec: DecimalType =>
+ val schema = SchemaConverters.toAvroType(dec, nullable = false, structName, recordNamespace)
+ (item: Any) => {
+ Option(item).map { _ =>
+ val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal]
+ val decimalConversions = new DecimalConversion()
+ decimalConversions.toFixed(bigDecimalValue, schema, LogicalTypes.decimal(dec.precision, dec.scale))
+ }.orNull
+ }
case TimestampType => (item: Any) =>
// Convert time to microseconds since spark-avro by default converts TimestampType to
// Avro Logical TimestampMicros
@@ -299,7 +300,6 @@ object AvroConversionHelper {
Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull
case ArrayType(elementType, _) =>
val elementConverter = createConverterToAvro(
- avroSchema,
elementType,
structName,
recordNamespace)
@@ -320,7 +320,6 @@ object AvroConversionHelper {
}
case MapType(StringType, valueType, _) =>
val valueConverter = createConverterToAvro(
- avroSchema,
valueType,
structName,
recordNamespace)
@@ -340,7 +339,6 @@ object AvroConversionHelper {
val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(
- avroSchema,
field.dataType,
field.name,
childNameSpace))
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 04de1c7..bdb8955 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -38,14 +38,12 @@ object AvroConversionUtils {
: RDD[GenericRecord] = {
// Use the Avro schema to derive the StructType which has the correct nullability information
val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
- val avroSchemaAsJsonString = avroSchema.toString
val encoder = RowEncoder.apply(dataType).resolveAndBind()
df.queryExecution.toRdd.map(encoder.fromRow)
.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
- val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString)
- val convertor = AvroConversionHelper.createConverterToAvro(avroSchema, dataType, structName, recordNamespace)
+ val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
records.map { x => convertor(x).asInstanceOf[GenericRecord] }
}
}