You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/01/12 23:03:21 UTC
[incubator-hudi] branch master updated: [HUDI-91][HUDI-12]Migrate
to spark 2.4.4, migrate to spark-avro library instead of databricks-avro,
add support for Decimal/Date types
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ad50008 [HUDI-91][HUDI-12]Migrate to spark 2.4.4, migrate to spark-avro library instead of databricks-avro, add support for Decimal/Date types
ad50008 is described below
commit ad50008a5946f15ee4ba4dd69ac514f11fed2034
Author: Udit Mehrotra <um...@illinois.edu>
AuthorDate: Sun Jan 12 15:03:11 2020 -0800
[HUDI-91][HUDI-12]Migrate to spark 2.4.4, migrate to spark-avro library instead of databricks-avro, add support for Decimal/Date types
- Upgrade Spark to 2.4.4, Parquet to 1.10.1, Avro to 1.8.2
- Remove spark-avro from hudi-spark-bundle. Users need to provide --packages org.apache.spark:spark-avro:2.4.4 when running spark-shell or spark-submit
- Replace com.databricks:spark-avro with org.apache.spark:spark-avro
- Shade avro in hudi-hadoop-mr-bundle to make sure it does not conflict with hive's avro version.
---
LICENSE | 22 ++---
.../java/org/apache/hudi/hive/util/SchemaUtil.java | 2 +
.../java/org/apache/hudi/integ/ITTestBase.java | 2 +-
hudi-spark/pom.xml | 17 +++-
.../org/apache/hudi/AvroConversionHelper.scala | 103 +++++++++++++++++----
.../org/apache/hudi/AvroConversionUtils.scala | 18 ++--
hudi-spark/src/test/java/HoodieJavaApp.java | 1 +
hudi-utilities/pom.xml | 2 +-
packaging/hudi-hadoop-mr-bundle/pom.xml | 14 +--
packaging/hudi-spark-bundle/pom.xml | 6 --
pom.xml | 20 ++--
11 files changed, 128 insertions(+), 79 deletions(-)
diff --git a/LICENSE b/LICENSE
index 325ba65..341988e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -241,27 +241,17 @@ This product includes code from https://github.com/twitter/commons/blob/master/s
limitations under the License.
=================================================================================================
-This product includes code from Databricks spark-avro with the below license
+This product includes code from Apache Spark
-* org.apache.hudi.AvroConversionHelper copied from classes in com/databricks/spark/avro package
+* org.apache.hudi.AvroConversionHelper copied from classes in org/apache/spark/sql/avro package
- Copyright 2014 Databricks
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Copyright: 2014 and onwards The Apache Software Foundation
+Home page: http://spark.apache.org/
+License: http://www.apache.org/licenses/LICENSE-2.0
--------------------------------------------------------------------------------
-This product includes code from https://github.com/big-data-europe/README
+This product includes code from https://github.com/big-data-europe/README
* docker/hoodie/hadoop/base/entrypoint.sh copied from https://github.com/big-data-europe/docker-hadoop/blob/master/base/entrypoint.sh
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
index 2e08158..16d9aae 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
@@ -174,6 +174,8 @@ public class SchemaUtil {
final DecimalMetadata decimalMetadata = parquetType.asPrimitiveType().getDecimalMetadata();
return field.append("DECIMAL(").append(decimalMetadata.getPrecision()).append(" , ")
.append(decimalMetadata.getScale()).append(")").toString();
+ } else if (originalType == OriginalType.DATE) {
+ return field.append("DATE").toString();
}
// TODO - fix the method naming here
return parquetPrimitiveTypeName.convert(new PrimitiveType.PrimitiveTypeNameConverter<String, RuntimeException>() {
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
index 5e6f90e..39ac694 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
@@ -107,7 +107,7 @@ public abstract class ITTestBase {
.append(" --master local[2] --driver-class-path ").append(HADOOP_CONF_DIR)
.append(
" --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --executor-memory 1G --num-executors 1 ")
- .append(" --packages com.databricks:spark-avro_2.11:4.0.0 ").append(" -i ").append(commandFile).toString();
+ .append(" --packages org.apache.spark:spark-avro_2.11:2.4.4 ").append(" -i ").append(commandFile).toString();
}
static String getPrestoConsoleCommand(String commandFile) {
diff --git a/hudi-spark/pom.xml b/hudi-spark/pom.xml
index b490dd7..d69954a 100644
--- a/hudi-spark/pom.xml
+++ b/hudi-spark/pom.xml
@@ -213,9 +213,9 @@
<!-- Spark (Packages) -->
<dependency>
- <groupId>com.databricks</groupId>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
- <version>4.0.0</version>
+ <scope>provided</scope>
</dependency>
<!-- Hadoop -->
@@ -239,8 +239,19 @@
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
- <artifactId>hive-service</artifactId>
+ <artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
+ <classifier>${hive.exec.classifier}</classifier>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.mail</groupId>
+ <artifactId>mail</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty.aggregate</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index 97f89f9..8244fc3 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -1,13 +1,12 @@
/*
- * This code is copied from com.databricks:spark-avro with following license
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Copyright 2014 Databricks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,14 +21,15 @@ import java.nio.ByteBuffer
import java.sql.{Date, Timestamp}
import java.util
-import com.databricks.spark.avro.SchemaConverters
-import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException
-import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.Conversions.DecimalConversion
+import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
+import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericData.{Fixed, Record}
-import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
import org.apache.hudi.AvroConversionUtils.getNewRecordNamespace
import org.apache.spark.sql.Row
+import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._
@@ -37,6 +37,16 @@ import scala.collection.JavaConverters._
object AvroConversionHelper {
+ private def createDecimal(decimal: java.math.BigDecimal, precision: Int, scale: Int): Decimal = {
+ if (precision <= Decimal.MAX_LONG_DIGITS) {
+ // Constructs a `Decimal` with an unscaled `Long` value if possible.
+ Decimal(decimal.unscaledValue().longValue(), precision, scale)
+ } else {
+ // Otherwise, resorts to an unscaled `BigInteger` instead.
+ Decimal(decimal, precision, scale)
+ }
+ }
+
/**
*
* Returns a converter function to convert row in avro format to GenericRow of catalyst.
@@ -76,7 +86,50 @@ object AvroConversionHelper {
byteBuffer.get(bytes)
bytes
}
-
+ case (d: DecimalType, FIXED) =>
+ (item: AnyRef) =>
+ if (item == null) {
+ null
+ } else {
+ val decimalConversion = new DecimalConversion
+ val bigDecimal = decimalConversion.fromFixed(item.asInstanceOf[GenericFixed], avroSchema,
+ LogicalTypes.decimal(d.precision, d.scale))
+ createDecimal(bigDecimal, d.precision, d.scale)
+ }
+ case (d: DecimalType, BYTES) =>
+ (item: AnyRef) =>
+ if (item == null) {
+ null
+ } else {
+ val decimalConversion = new DecimalConversion
+ val bigDecimal = decimalConversion.fromBytes(item.asInstanceOf[ByteBuffer], avroSchema,
+ LogicalTypes.decimal(d.precision, d.scale))
+ createDecimal(bigDecimal, d.precision, d.scale)
+ }
+ case (DateType, INT) =>
+ (item: AnyRef) =>
+ if (item == null) {
+ null
+ } else {
+ new Date(item.asInstanceOf[Long])
+ }
+ case (TimestampType, LONG) =>
+ (item: AnyRef) =>
+ if (item == null) {
+ null
+ } else {
+ avroSchema.getLogicalType match {
+ case _: TimestampMillis =>
+ new Timestamp(item.asInstanceOf[Long])
+ case _: TimestampMicros =>
+ new Timestamp(item.asInstanceOf[Long] / 1000)
+ case null =>
+ new Timestamp(item.asInstanceOf[Long])
+ case other =>
+ throw new IncompatibleSchemaException(
+ s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
+ }
+ }
case (struct: StructType, RECORD) =>
val length = struct.fields.length
val converters = new Array[AnyRef => AnyRef](length)
@@ -216,7 +269,8 @@ object AvroConversionHelper {
createConverter(sourceAvroSchema, targetSqlType, List.empty[String])
}
- def createConverterToAvro(dataType: DataType,
+ def createConverterToAvro(avroSchema: Schema,
+ dataType: DataType,
structName: String,
recordNamespace: String): Any => Any = {
dataType match {
@@ -231,13 +285,22 @@ object AvroConversionHelper {
if (item == null) null else item.asInstanceOf[Byte].intValue
case ShortType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Short].intValue
- case _: DecimalType => (item: Any) => if (item == null) null else item.toString
+ case dec: DecimalType => (item: Any) =>
+ Option(item).map { i =>
+ val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal]
+ val decimalConversions = new DecimalConversion()
+ decimalConversions.toFixed(bigDecimalValue, avroSchema.getField(structName).schema().getTypes.get(0),
+ LogicalTypes.decimal(dec.precision, dec.scale))
+ }.orNull
case TimestampType => (item: Any) =>
- if (item == null) null else item.asInstanceOf[Timestamp].getTime
+ // Convert time to microseconds since spark-avro by default converts TimestampType to
+ // Avro Logical TimestampMicros
+ Option(item).map(_.asInstanceOf[Timestamp].getTime * 1000).orNull
case DateType => (item: Any) =>
- if (item == null) null else item.asInstanceOf[Date].getTime
+ Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull
case ArrayType(elementType, _) =>
val elementConverter = createConverterToAvro(
+ avroSchema,
elementType,
structName,
getNewRecordNamespace(elementType, recordNamespace, structName))
@@ -258,6 +321,7 @@ object AvroConversionHelper {
}
case MapType(StringType, valueType, _) =>
val valueConverter = createConverterToAvro(
+ avroSchema,
valueType,
structName,
getNewRecordNamespace(valueType, recordNamespace, structName))
@@ -273,11 +337,10 @@ object AvroConversionHelper {
}
}
case structType: StructType =>
- val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
- val schema: Schema = SchemaConverters.convertStructToAvro(
- structType, builder, recordNamespace)
+ val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(
+ avroSchema,
field.dataType,
field.name,
getNewRecordNamespace(field.dataType, recordNamespace, field.name)))
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 372b44a..a27d0ee 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -17,11 +17,11 @@
package org.apache.hudi
-import com.databricks.spark.avro.SchemaConverters
import org.apache.avro.generic.GenericRecord
-import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.hudi.common.model.HoodieKey
+import org.apache.avro.Schema
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
@@ -30,13 +30,20 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object AvroConversionUtils {
def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
+ val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
+ createRdd(df, avroSchema.toString, structName, recordNamespace)
+ }
+
+ def createRdd(df: DataFrame, avroSchemaAsJsonString: String, structName: String, recordNamespace: String)
+ : RDD[GenericRecord] = {
val dataType = df.schema
val encoder = RowEncoder.apply(dataType).resolveAndBind()
df.queryExecution.toRdd.map(encoder.fromRow)
.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
- val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
+ val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString)
+ val convertor = AvroConversionHelper.createConverterToAvro(avroSchema, dataType, structName, recordNamespace)
records.map { x => convertor(x).asInstanceOf[GenericRecord] }
}
}
@@ -75,11 +82,10 @@ object AvroConversionUtils {
def convertStructTypeToAvroSchema(structType: StructType,
structName: String,
recordNamespace: String): Schema = {
- val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
- SchemaConverters.convertStructToAvro(structType, builder, recordNamespace)
+ SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
}
def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
- SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType];
+ SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
}
}
diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java
index 158d2e4..70324f9 100644
--- a/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -104,6 +104,7 @@ public class HoodieJavaApp {
SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext());
+ spark.sparkContext().setLogLevel("WARN");
FileSystem fs = FileSystem.get(jssc.hadoopConfiguration());
// Generator of some records to be loaded in.
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 4c88eb0..53a3a1a 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -161,7 +161,7 @@
</dependency>
<dependency>
- <groupId>com.databricks</groupId>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<scope>provided</scope>
</dependency>
diff --git a/packaging/hudi-hadoop-mr-bundle/pom.xml b/packaging/hudi-hadoop-mr-bundle/pom.xml
index 5237d22..3c40c69 100644
--- a/packaging/hudi-hadoop-mr-bundle/pom.xml
+++ b/packaging/hudi-hadoop-mr-bundle/pom.xml
@@ -89,7 +89,7 @@
</relocation>
<relocation>
<pattern>org.apache.avro.</pattern>
- <shadedPattern>${mr.bundle.avro.shade.prefix}org.apache.avro.</shadedPattern>
+ <shadedPattern>org.apache.hudi.org.apache.avro.</shadedPattern>
</relocation>
</relocations>
<createDependencyReducedPom>false</createDependencyReducedPom>
@@ -143,17 +143,7 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
- <scope>${mr.bundle.avro.scope}</scope>
+ <scope>compile</scope>
</dependency>
</dependencies>
-
- <profiles>
- <profile>
- <id>mr-bundle-shade-avro</id>
- <properties>
- <mr.bundle.avro.scope>compile</mr.bundle.avro.scope>
- <mr.bundle.avro.shade.prefix>org.apache.hudi.</mr.bundle.avro.shade.prefix>
- </properties>
- </profile>
- </profiles>
</project>
diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml
index 55d07fa..c56c789 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -94,8 +94,6 @@
<include>org.apache.hive:hive-service-rpc</include>
<include>org.apache.hive:hive-metastore</include>
<include>org.apache.hive:hive-jdbc</include>
-
- <include>com.databricks:spark-avro_2.11</include>
</includes>
</artifactSet>
<relocations>
@@ -139,10 +137,6 @@
<pattern>org.apache.commons.codec.</pattern>
<shadedPattern>org.apache.hudi.org.apache.commons.codec.</shadedPattern>
</relocation>
- <relocation>
- <pattern>com.databricks.</pattern>
- <shadedPattern>org.apache.hudi.com.databricks.</shadedPattern>
- </relocation>
<!-- TODO: Revisit GH ISSUE #533 & PR#633-->
</relocations>
<filters>
diff --git a/pom.xml b/pom.xml
index f3f51c2..d4d47c0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,7 +76,7 @@
<java.version>1.8</java.version>
<fasterxml.version>2.6.7</fasterxml.version>
<glassfish.version>2.17</glassfish.version>
- <parquet.version>1.8.1</parquet.version>
+ <parquet.version>1.10.1</parquet.version>
<junit.version>4.11</junit.version>
<junit-dep.version>4.10</junit-dep.version>
<mockito.version>1.10.19</mockito.version>
@@ -88,8 +88,8 @@
<hive.version>2.3.1</hive.version>
<hive.exec.classifier>core</hive.exec.classifier>
<metrics.version>4.1.1</metrics.version>
- <spark.version>2.1.0</spark.version>
- <avro.version>1.7.7</avro.version>
+ <spark.version>2.4.4</spark.version>
+ <avro.version>1.8.2</avro.version>
<scala.version>2.11.8</scala.version>
<scala.libversion>2.11</scala.libversion>
<apache-rat-plugin.version>0.12</apache-rat-plugin.version>
@@ -105,8 +105,6 @@
<skipUTs>${skipTests}</skipUTs>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.basedir>${project.basedir}</main.basedir>
- <mr.bundle.avro.scope>provided</mr.bundle.avro.scope>
- <mr.bundle.avro.shade.prefix></mr.bundle.avro.shade.prefix>
<spark.bundle.hive.scope>provided</spark.bundle.hive.scope>
<spark.bundle.hive.shade.prefix></spark.bundle.hive.shade.prefix>
<utilities.bundle.hive.scope>provided</utilities.bundle.hive.scope>
@@ -485,9 +483,10 @@
<!-- Spark (Packages) -->
<dependency>
- <groupId>com.databricks</groupId>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
- <version>4.0.0</version>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
</dependency>
<!-- Dropwizard Metrics -->
@@ -935,13 +934,6 @@
</properties>
</profile>
<profile>
- <id>aws-emr-profile</id>
- <properties>
- <mr.bundle.avro.scope>compile</mr.bundle.avro.scope>
- <mr.bundle.avro.shade.prefix>org.apache.hudi.</mr.bundle.avro.shade.prefix>
- </properties>
- </profile>
- <profile>
<id>javadocs</id>
<build>
<plugins>