You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/01/12 23:03:21 UTC

[incubator-hudi] branch master updated: [HUDI-91][HUDI-12]Migrate to spark 2.4.4, migrate to spark-avro library instead of databricks-avro, add support for Decimal/Date types

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ad50008  [HUDI-91][HUDI-12]Migrate to spark 2.4.4, migrate to spark-avro library instead of databricks-avro, add support for Decimal/Date types
ad50008 is described below

commit ad50008a5946f15ee4ba4dd69ac514f11fed2034
Author: Udit Mehrotra <um...@illinois.edu>
AuthorDate: Sun Jan 12 15:03:11 2020 -0800

    [HUDI-91][HUDI-12]Migrate to spark 2.4.4, migrate to spark-avro library instead of databricks-avro, add support for Decimal/Date types
    
    - Upgrade Spark to 2.4.4, Parquet to 1.10.1, Avro to 1.8.2
    - Remove spark-avro from hudi-spark-bundle. Users need to provide --packages org.apache.spark:spark-avro:2.4.4 when running spark-shell or spark-submit
    - Replace com.databricks:spark-avro with org.apache.spark:spark-avro
    - Shade avro in hudi-hadoop-mr-bundle to make sure it does not conflict with hive's avro version.
---
 LICENSE                                            |  22 ++---
 .../java/org/apache/hudi/hive/util/SchemaUtil.java |   2 +
 .../java/org/apache/hudi/integ/ITTestBase.java     |   2 +-
 hudi-spark/pom.xml                                 |  17 +++-
 .../org/apache/hudi/AvroConversionHelper.scala     | 103 +++++++++++++++++----
 .../org/apache/hudi/AvroConversionUtils.scala      |  18 ++--
 hudi-spark/src/test/java/HoodieJavaApp.java        |   1 +
 hudi-utilities/pom.xml                             |   2 +-
 packaging/hudi-hadoop-mr-bundle/pom.xml            |  14 +--
 packaging/hudi-spark-bundle/pom.xml                |   6 --
 pom.xml                                            |  20 ++--
 11 files changed, 128 insertions(+), 79 deletions(-)

diff --git a/LICENSE b/LICENSE
index 325ba65..341988e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -241,27 +241,17 @@ This product includes code from https://github.com/twitter/commons/blob/master/s
  limitations under the License.
 =================================================================================================
 
-This product includes code from Databricks spark-avro with the below license
+This product includes code from Apache Spark
 
-* org.apache.hudi.AvroConversionHelper copied from classes in com/databricks/spark/avro package
+* org.apache.hudi.AvroConversionHelper copied from classes in org/apache/spark/sql/avro package
 
- Copyright 2014 Databricks
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Copyright: 2014 and onwards The Apache Software Foundation
+Home page: http://spark.apache.org/
+License: http://www.apache.org/licenses/LICENSE-2.0
 
 --------------------------------------------------------------------------------
 
-This product includes code from https://github.com/big-data-europe/README 
+This product includes code from https://github.com/big-data-europe/README
 
 * docker/hoodie/hadoop/base/entrypoint.sh copied from https://github.com/big-data-europe/docker-hadoop/blob/master/base/entrypoint.sh 
 
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
index 2e08158..16d9aae 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
@@ -174,6 +174,8 @@ public class SchemaUtil {
         final DecimalMetadata decimalMetadata = parquetType.asPrimitiveType().getDecimalMetadata();
         return field.append("DECIMAL(").append(decimalMetadata.getPrecision()).append(" , ")
             .append(decimalMetadata.getScale()).append(")").toString();
+      } else if (originalType == OriginalType.DATE) {
+        return field.append("DATE").toString();
       }
       // TODO - fix the method naming here
       return parquetPrimitiveTypeName.convert(new PrimitiveType.PrimitiveTypeNameConverter<String, RuntimeException>() {
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
index 5e6f90e..39ac694 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
@@ -107,7 +107,7 @@ public abstract class ITTestBase {
         .append(" --master local[2] --driver-class-path ").append(HADOOP_CONF_DIR)
         .append(
             " --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client  --driver-memory 1G --executor-memory 1G --num-executors 1 ")
-        .append(" --packages com.databricks:spark-avro_2.11:4.0.0 ").append(" -i ").append(commandFile).toString();
+        .append(" --packages org.apache.spark:spark-avro_2.11:2.4.4 ").append(" -i ").append(commandFile).toString();
   }
 
   static String getPrestoConsoleCommand(String commandFile) {
diff --git a/hudi-spark/pom.xml b/hudi-spark/pom.xml
index b490dd7..d69954a 100644
--- a/hudi-spark/pom.xml
+++ b/hudi-spark/pom.xml
@@ -213,9 +213,9 @@
 
     <!-- Spark (Packages) -->
     <dependency>
-      <groupId>com.databricks</groupId>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-avro_2.11</artifactId>
-      <version>4.0.0</version>
+      <scope>provided</scope>
     </dependency>
 
     <!-- Hadoop -->
@@ -239,8 +239,19 @@
     <!-- Hive -->
     <dependency>
       <groupId>${hive.groupid}</groupId>
-      <artifactId>hive-service</artifactId>
+      <artifactId>hive-exec</artifactId>
       <version>${hive.version}</version>
+      <classifier>${hive.exec.classifier}</classifier>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.mail</groupId>
+          <artifactId>mail</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty.aggregate</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>${hive.groupid}</groupId>
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index 97f89f9..8244fc3 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -1,13 +1,12 @@
 /*
- * This code is copied from com.databricks:spark-avro with following license
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Copyright 2014 Databricks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,14 +21,15 @@ import java.nio.ByteBuffer
 import java.sql.{Date, Timestamp}
 import java.util
 
-import com.databricks.spark.avro.SchemaConverters
-import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException
-import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.Conversions.DecimalConversion
+import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
+import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Schema.Type._
 import org.apache.avro.generic.GenericData.{Fixed, Record}
-import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
 import org.apache.hudi.AvroConversionUtils.getNewRecordNamespace
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
 import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.types._
 
@@ -37,6 +37,16 @@ import scala.collection.JavaConverters._
 
 object AvroConversionHelper {
 
+  private def createDecimal(decimal: java.math.BigDecimal, precision: Int, scale: Int): Decimal = {
+    if (precision <= Decimal.MAX_LONG_DIGITS) {
+      // Constructs a `Decimal` with an unscaled `Long` value if possible.
+      Decimal(decimal.unscaledValue().longValue(), precision, scale)
+    } else {
+      // Otherwise, resorts to an unscaled `BigInteger` instead.
+      Decimal(decimal, precision, scale)
+    }
+  }
+
   /**
     *
     * Returns a converter function to convert row in avro format to GenericRow of catalyst.
@@ -76,7 +86,50 @@ object AvroConversionHelper {
               byteBuffer.get(bytes)
               bytes
             }
-
+        case (d: DecimalType, FIXED) =>
+          (item: AnyRef) =>
+            if (item == null) {
+              null
+            } else {
+              val decimalConversion = new DecimalConversion
+              val bigDecimal = decimalConversion.fromFixed(item.asInstanceOf[GenericFixed], avroSchema,
+                LogicalTypes.decimal(d.precision, d.scale))
+              createDecimal(bigDecimal, d.precision, d.scale)
+            }
+        case (d: DecimalType, BYTES) =>
+          (item: AnyRef) =>
+            if (item == null) {
+              null
+            } else {
+              val decimalConversion = new DecimalConversion
+              val bigDecimal = decimalConversion.fromBytes(item.asInstanceOf[ByteBuffer], avroSchema,
+                LogicalTypes.decimal(d.precision, d.scale))
+              createDecimal(bigDecimal, d.precision, d.scale)
+            }
+        case (DateType, INT) =>
+          (item: AnyRef) =>
+            if (item == null) {
+              null
+            } else {
+              new Date(item.asInstanceOf[Long])
+            }
+        case (TimestampType, LONG) =>
+          (item: AnyRef) =>
+            if (item == null) {
+              null
+            } else {
+              avroSchema.getLogicalType match {
+                case _: TimestampMillis =>
+                  new Timestamp(item.asInstanceOf[Long])
+                case _: TimestampMicros =>
+                  new Timestamp(item.asInstanceOf[Long] / 1000)
+                case null =>
+                  new Timestamp(item.asInstanceOf[Long])
+                case other =>
+                  throw new IncompatibleSchemaException(
+                    s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
+              }
+            }
         case (struct: StructType, RECORD) =>
           val length = struct.fields.length
           val converters = new Array[AnyRef => AnyRef](length)
@@ -216,7 +269,8 @@ object AvroConversionHelper {
     createConverter(sourceAvroSchema, targetSqlType, List.empty[String])
   }
 
-  def createConverterToAvro(dataType: DataType,
+  def createConverterToAvro(avroSchema: Schema,
+                            dataType: DataType,
                             structName: String,
                             recordNamespace: String): Any => Any = {
     dataType match {
@@ -231,13 +285,22 @@ object AvroConversionHelper {
         if (item == null) null else item.asInstanceOf[Byte].intValue
       case ShortType => (item: Any) =>
         if (item == null) null else item.asInstanceOf[Short].intValue
-      case _: DecimalType => (item: Any) => if (item == null) null else item.toString
+      case dec: DecimalType => (item: Any) =>
+        Option(item).map { i =>
+          val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal]
+          val decimalConversions = new DecimalConversion()
+          decimalConversions.toFixed(bigDecimalValue, avroSchema.getField(structName).schema().getTypes.get(0),
+            LogicalTypes.decimal(dec.precision, dec.scale))
+        }.orNull
       case TimestampType => (item: Any) =>
-        if (item == null) null else item.asInstanceOf[Timestamp].getTime
+        // Convert time to microseconds since spark-avro by default converts TimestampType to
+        // Avro Logical TimestampMicros
+        Option(item).map(_.asInstanceOf[Timestamp].getTime * 1000).orNull
       case DateType => (item: Any) =>
-        if (item == null) null else item.asInstanceOf[Date].getTime
+        Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull
       case ArrayType(elementType, _) =>
         val elementConverter = createConverterToAvro(
+          avroSchema,
           elementType,
           structName,
           getNewRecordNamespace(elementType, recordNamespace, structName))
@@ -258,6 +321,7 @@ object AvroConversionHelper {
         }
       case MapType(StringType, valueType, _) =>
         val valueConverter = createConverterToAvro(
+          avroSchema,
           valueType,
           structName,
           getNewRecordNamespace(valueType, recordNamespace, structName))
@@ -273,11 +337,10 @@ object AvroConversionHelper {
           }
         }
       case structType: StructType =>
-        val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
-        val schema: Schema = SchemaConverters.convertStructToAvro(
-          structType, builder, recordNamespace)
+        val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
         val fieldConverters = structType.fields.map(field =>
           createConverterToAvro(
+            avroSchema,
             field.dataType,
             field.name,
             getNewRecordNamespace(field.dataType, recordNamespace, field.name)))
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 372b44a..a27d0ee 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -17,11 +17,11 @@
 
 package org.apache.hudi
 
-import com.databricks.spark.avro.SchemaConverters
 import org.apache.avro.generic.GenericRecord
-import org.apache.avro.{Schema, SchemaBuilder}
 import org.apache.hudi.common.model.HoodieKey
+import org.apache.avro.Schema
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
@@ -30,13 +30,20 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 object AvroConversionUtils {
 
   def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
+    val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
+    createRdd(df, avroSchema.toString, structName, recordNamespace)
+  }
+
+  def createRdd(df: DataFrame, avroSchemaAsJsonString: String, structName: String, recordNamespace: String)
+  : RDD[GenericRecord] = {
     val dataType = df.schema
     val encoder = RowEncoder.apply(dataType).resolveAndBind()
     df.queryExecution.toRdd.map(encoder.fromRow)
       .mapPartitions { records =>
         if (records.isEmpty) Iterator.empty
         else {
-          val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
+          val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString)
+          val convertor = AvroConversionHelper.createConverterToAvro(avroSchema, dataType, structName, recordNamespace)
           records.map { x => convertor(x).asInstanceOf[GenericRecord] }
         }
       }
@@ -75,11 +82,10 @@ object AvroConversionUtils {
   def convertStructTypeToAvroSchema(structType: StructType,
                                     structName: String,
                                     recordNamespace: String): Schema = {
-    val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
-    SchemaConverters.convertStructToAvro(structType, builder, recordNamespace)
+    SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
   }
 
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
-    SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType];
+    SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
 }
diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java
index 158d2e4..70324f9 100644
--- a/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -104,6 +104,7 @@ public class HoodieJavaApp {
     SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP")
         .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
     JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext());
+    spark.sparkContext().setLogLevel("WARN");
     FileSystem fs = FileSystem.get(jssc.hadoopConfiguration());
 
     // Generator of some records to be loaded in.
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 4c88eb0..53a3a1a 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -161,7 +161,7 @@
     </dependency>
 
     <dependency>
-      <groupId>com.databricks</groupId>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-avro_2.11</artifactId>
       <scope>provided</scope>
     </dependency>
diff --git a/packaging/hudi-hadoop-mr-bundle/pom.xml b/packaging/hudi-hadoop-mr-bundle/pom.xml
index 5237d22..3c40c69 100644
--- a/packaging/hudi-hadoop-mr-bundle/pom.xml
+++ b/packaging/hudi-hadoop-mr-bundle/pom.xml
@@ -89,7 +89,7 @@
                 </relocation>
                 <relocation>
                   <pattern>org.apache.avro.</pattern>
-                  <shadedPattern>${mr.bundle.avro.shade.prefix}org.apache.avro.</shadedPattern>
+                  <shadedPattern>org.apache.hudi.org.apache.avro.</shadedPattern>
                 </relocation>
               </relocations>
               <createDependencyReducedPom>false</createDependencyReducedPom>
@@ -143,17 +143,7 @@
     <dependency>
       <groupId>org.apache.avro</groupId>
       <artifactId>avro</artifactId>
-      <scope>${mr.bundle.avro.scope}</scope>
+      <scope>compile</scope>
     </dependency>
   </dependencies>
-
-  <profiles>
-    <profile>
-      <id>mr-bundle-shade-avro</id>
-      <properties>
-        <mr.bundle.avro.scope>compile</mr.bundle.avro.scope>
-        <mr.bundle.avro.shade.prefix>org.apache.hudi.</mr.bundle.avro.shade.prefix>
-      </properties>
-    </profile>
-  </profiles>
 </project>
diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml
index 55d07fa..c56c789 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -94,8 +94,6 @@
                   <include>org.apache.hive:hive-service-rpc</include>
                   <include>org.apache.hive:hive-metastore</include>
                   <include>org.apache.hive:hive-jdbc</include>
-
-                  <include>com.databricks:spark-avro_2.11</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -139,10 +137,6 @@
                   <pattern>org.apache.commons.codec.</pattern>
                   <shadedPattern>org.apache.hudi.org.apache.commons.codec.</shadedPattern>
                 </relocation>
-                <relocation>
-                  <pattern>com.databricks.</pattern>
-                  <shadedPattern>org.apache.hudi.com.databricks.</shadedPattern>
-                </relocation>
                 <!-- TODO: Revisit GH ISSUE #533 & PR#633-->
               </relocations>
              <filters>
diff --git a/pom.xml b/pom.xml
index f3f51c2..d4d47c0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,7 +76,7 @@
     <java.version>1.8</java.version>
     <fasterxml.version>2.6.7</fasterxml.version>
     <glassfish.version>2.17</glassfish.version>
-    <parquet.version>1.8.1</parquet.version>
+    <parquet.version>1.10.1</parquet.version>
     <junit.version>4.11</junit.version>
     <junit-dep.version>4.10</junit-dep.version>
     <mockito.version>1.10.19</mockito.version>
@@ -88,8 +88,8 @@
     <hive.version>2.3.1</hive.version>
     <hive.exec.classifier>core</hive.exec.classifier>
     <metrics.version>4.1.1</metrics.version>
-    <spark.version>2.1.0</spark.version>
-    <avro.version>1.7.7</avro.version>
+    <spark.version>2.4.4</spark.version>
+    <avro.version>1.8.2</avro.version>
     <scala.version>2.11.8</scala.version>
     <scala.libversion>2.11</scala.libversion>
     <apache-rat-plugin.version>0.12</apache-rat-plugin.version>
@@ -105,8 +105,6 @@
     <skipUTs>${skipTests}</skipUTs>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <main.basedir>${project.basedir}</main.basedir>
-    <mr.bundle.avro.scope>provided</mr.bundle.avro.scope>
-    <mr.bundle.avro.shade.prefix></mr.bundle.avro.shade.prefix>
     <spark.bundle.hive.scope>provided</spark.bundle.hive.scope>
     <spark.bundle.hive.shade.prefix></spark.bundle.hive.shade.prefix>
     <utilities.bundle.hive.scope>provided</utilities.bundle.hive.scope>
@@ -485,9 +483,10 @@
 
       <!-- Spark (Packages) -->
       <dependency>
-        <groupId>com.databricks</groupId>
+        <groupId>org.apache.spark</groupId>
         <artifactId>spark-avro_2.11</artifactId>
-        <version>4.0.0</version>
+        <version>${spark.version}</version>
+        <scope>provided</scope>
       </dependency>
 
       <!-- Dropwizard Metrics -->
@@ -935,13 +934,6 @@
       </properties>
     </profile>
     <profile>
-      <id>aws-emr-profile</id>
-      <properties>
-        <mr.bundle.avro.scope>compile</mr.bundle.avro.scope>
-        <mr.bundle.avro.shade.prefix>org.apache.hudi.</mr.bundle.avro.shade.prefix>
-      </properties>
-    </profile>
-    <profile>
       <id>javadocs</id>
       <build>
         <plugins>