You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/08/09 04:18:44 UTC
[hudi] branch master updated: [HUDI-2208] Support Bulk Insert For
Spark Sql (#3328)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 41a9986 [HUDI-2208] Support Bulk Insert For Spark Sql (#3328)
41a9986 is described below
commit 41a9986a7641f3232b1edd2a737fd4b7aa430dbf
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Mon Aug 9 12:18:31 2021 +0800
[HUDI-2208] Support Bulk Insert For Spark Sql (#3328)
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 16 ++
.../main/java/org/apache/hudi/sql/InsertMode.java | 66 ++++++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 6 +-
.../apache/spark/sql/hudi/HoodieOptionConfig.scala | 2 +-
.../org/apache/spark/sql/hudi/HoodieSqlUtils.scala | 2 +-
.../command/CreateHoodieTableAsSelectCommand.scala | 27 ++-
.../command/InsertIntoHoodieTableCommand.scala | 72 +++++--
.../spark/sql/hudi/command/SqlKeyGenerator.scala | 35 +++-
.../spark/sql/hudi/command/UuidKeyGenerator.scala | 4 +-
.../apache/spark/sql/hudi/TestHoodieSqlBase.scala | 3 +
.../apache/spark/sql/hudi/TestInsertTable.scala | 222 ++++++++++++++++++++-
11 files changed, 406 insertions(+), 49 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 5c67a68..46aee67 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -256,6 +256,22 @@ object DataSourceWriteOptions {
.withDocumentation("When set to true, will perform write operations directly using the spark native " +
"`Row` representation, avoiding any additional conversion costs.")
+ /**
+ * Enable the bulk insert for sql insert statement.
+ */
+ val SQL_ENABLE_BULK_INSERT:ConfigProperty[String] = ConfigProperty
+ .key("hoodie.sql.bulk.insert.enable")
+ .defaultValue("false")
+ .withDocumentation("When set to true, the sql insert statement will use bulk insert.")
+
+ val SQL_INSERT_MODE: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.sql.insert.mode")
+ .defaultValue("upsert")
+ .withDocumentation("Insert mode when insert data to pk-table. The optional modes are: upsert, strict and non-strict." +
+ "For upsert mode, insert statement do the upsert operation for the pk-table which will update the duplicate record." +
+ "For strict mode, insert statement will keep the primary key uniqueness constraint which do not allow duplicate record." +
+ "While for non-strict mode, hudi just do the insert operation for the pk-table.")
+
val COMMIT_METADATA_KEYPREFIX: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.commitmeta.key.prefix")
.defaultValue("_")
diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/InsertMode.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/InsertMode.java
new file mode 100644
index 0000000..4b44ae4
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/InsertMode.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sql;
+
+import java.util.Locale;
+
+/**
+ * Insert mode for insert into pk-table.
+ */
+public enum InsertMode {
+ /**
+ * In upsert mode for insert into, duplicate record on primary key
+ * will be updated.This is the default insert mode for pk-table.
+ */
+ UPSERT("upsert"),
+ /**
+ * In strict mode for insert into, we do the pk uniqueness guarantee
+ * for COW pk-table.
+ * For MOR pk-table, it has the same behavior with "upsert" mode.
+ */
+ STRICT("strict"),
+ /**
+ * In non-strict mode for insert into, we use insert operation
+ * to write data which allow writing the duplicate record.
+ */
+ NON_STRICT("non-strict")
+ ;
+
+ private String value;
+
+ InsertMode(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
+ }
+
+ public static InsertMode of(String value) {
+ switch (value.toLowerCase(Locale.ROOT)) {
+ case "upsert":
+ return UPSERT;
+ case "strict":
+ return STRICT;
+ case "non-strict":
+ return NON_STRICT;
+ default:
+ throw new AssertionError("UnSupport Insert Mode: " + value);
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 6213ab8..6a54931 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -164,7 +164,11 @@ object HoodieSparkSqlWriter {
// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
- val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || operation.equals(WriteOperationType.UPSERT);
+
+ val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
+ operation.equals(WriteOperationType.UPSERT) ||
+ parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
+ HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.defaultValue()).toBoolean
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val hoodieRecord = if (shouldCombine) {
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 2f28c66..5120fe7 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -114,7 +114,7 @@ object HoodieOptionConfig {
/**
* Mapping the sql options to the hoodie table config which used to store to the hoodie
- * .properites when create the table.
+ * .properties when create the table.
* @param options
* @return
*/
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
index da6c482..b0a8b52 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
@@ -218,7 +218,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
* Append the spark config and table options to the baseConfig.
*/
def withSparkConf(spark: SparkSession, options: Map[String, String])
- (baseConfig: Map[String, String]): Map[String, String] = {
+ (baseConfig: Map[String, String] = Map.empty): Map[String, String] = {
baseConfig ++ // Table options has the highest priority
(spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options))
.filterKeys(_.startsWith("hoodie."))
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index a156d9f..38c7e29 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.hive.util.ConfigUtils
+import org.apache.hudi.sql.InsertMode
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -62,8 +63,8 @@ case class CreateHoodieTableAsSelectCommand(
}
}
val tablePath = getTableLocation(table, sparkSession)
- val conf = sparkSession.sessionState.newHadoopConf()
- assert(CreateHoodieTableCommand.isEmptyPath(tablePath, conf),
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ assert(CreateHoodieTableCommand.isEmptyPath(tablePath, hadoopConf),
s"Path '$tablePath' should be empty for CTAS")
// ReOrder the query which move the partition columns to the last of the project list
@@ -72,17 +73,15 @@ case class CreateHoodieTableAsSelectCommand(
// Execute the insert query
try {
- // Set if sync as a managed table.
- sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key,
- (table.tableType == CatalogTableType.MANAGED).toString)
- // Sync the options to hive serde properties
- sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key,
- ConfigUtils.configToString(table.storage.properties.asJava))
- // Sync the table properties to hive
- sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key,
- ConfigUtils.configToString(table.properties.asJava))
+ val options = Map(
+ DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString,
+ DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(table.storage.properties.asJava),
+ DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(table.properties.asJava),
+ DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
+ )
val success = InsertIntoHoodieTableCommand.run(sparkSession, tableWithSchema, reOrderedQuery, Map.empty,
- mode == SaveMode.Overwrite, refreshTable = false)
+ mode == SaveMode.Overwrite, refreshTable = false, extraOptions = options)
if (success) {
// If write success, create the table in catalog if it has not synced to the
// catalog by the meta sync.
@@ -92,11 +91,11 @@ case class CreateHoodieTableAsSelectCommand(
createTableCommand.createTableInCatalog(sparkSession, checkPathForManagedTable = false)
}
} else { // failed to insert data, clear table path
- clearTablePath(tablePath, conf)
+ clearTablePath(tablePath, hadoopConf)
}
} catch {
case e: Throwable => // failed to insert data, clear table path
- clearTablePath(tablePath, conf)
+ clearTablePath(tablePath, hadoopConf)
throw e
}
Seq.empty[Row]
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index cfd6609..02e7427 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -28,10 +28,12 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
-import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
+import org.apache.hudi.sql.InsertMode
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
-import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -58,7 +60,7 @@ case class InsertIntoHoodieTableCommand(
}
}
-object InsertIntoHoodieTableCommand {
+object InsertIntoHoodieTableCommand extends Logging {
/**
* Run the insert query. We support both dynamic partition insert and static partition insert.
* @param sparkSession The spark session.
@@ -71,12 +73,14 @@ object InsertIntoHoodieTableCommand {
* , it is None in the map.
* @param overwrite Whether to overwrite the table.
* @param refreshTable Whether to refresh the table after insert finished.
+ * @param extraOptions Extra options for insert.
*/
def run(sparkSession: SparkSession, table: CatalogTable, query: LogicalPlan,
insertPartitions: Map[String, Option[String]],
- overwrite: Boolean, refreshTable: Boolean = true): Boolean = {
+ overwrite: Boolean, refreshTable: Boolean = true,
+ extraOptions: Map[String, String] = Map.empty): Boolean = {
- val config = buildHoodieInsertConfig(table, sparkSession, overwrite, insertPartitions)
+ val config = buildHoodieInsertConfig(table, sparkSession, overwrite, insertPartitions, extraOptions)
val mode = if (overwrite && table.partitionColumnNames.isEmpty) {
// insert overwrite non-partition table
@@ -165,7 +169,7 @@ object InsertIntoHoodieTableCommand {
Alias(castAttr, f.name)()
})
}
- // Remove the hoodie meta fileds from the projects as we do not need these to write
+ // Remove the hoodie meta fields from the projects as we do not need these to write
val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.name))
val alignedProjects = withoutMetaFieldDataProjects ++ partitionProjects
Project(alignedProjects, query)
@@ -179,7 +183,8 @@ object InsertIntoHoodieTableCommand {
table: CatalogTable,
sparkSession: SparkSession,
isOverwrite: Boolean,
- insertPartitions: Map[String, Option[String]] = Map.empty): Map[String, String] = {
+ insertPartitions: Map[String, Option[String]] = Map.empty,
+ extraOptions: Map[String, String]): Map[String, String] = {
if (insertPartitions.nonEmpty &&
(insertPartitions.keys.toSet != table.partitionColumnNames.toSet)) {
@@ -187,7 +192,7 @@ object InsertIntoHoodieTableCommand {
s"[${insertPartitions.keys.mkString(" " )}]" +
s" not equal to the defined partition in table[${table.partitionColumnNames.mkString(",")}]")
}
- val parameters = HoodieOptionConfig.mappingSqlOptionToHoodieParam(table.storage.properties)
+ val parameters = withSparkConf(sparkSession, table.storage.properties)() ++ extraOptions
val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue)
@@ -209,28 +214,49 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS.defaultValue)
.toBoolean
- val operation = if (isOverwrite) {
- if (table.partitionColumnNames.nonEmpty) {
- INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
- } else {
- INSERT_OPERATION_OPT_VAL
- }
- } else {
- if (primaryColumns.nonEmpty && !dropDuplicate) {
- UPSERT_OPERATION_OPT_VAL
- } else {
- INSERT_OPERATION_OPT_VAL
+ val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val isPartitionedTable = table.partitionColumnNames.nonEmpty
+ val isPrimaryKeyTable = primaryColumns.nonEmpty
+ val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
+ DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
+ val isNonStrictMode = insertMode == InsertMode.NON_STRICT
+
+ val operation =
+ (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+ case (true, true, _, _) if !isNonStrictMode =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.")
+ case (_, true, true, _) if isPartitionedTable =>
+ throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.")
+ case (_, true, _, true) =>
+ throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." +
+ s" Please disable $INSERT_DROP_DUPS and try again.")
+ // if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table.
+ case (_, true, true, _) if !isPartitionedTable => BULK_INSERT_OPERATION_OPT_VAL
+ // insert overwrite partition
+ case (_, _, true, _) if isPartitionedTable => INSERT_OVERWRITE_OPERATION_OPT_VAL
+ // insert overwrite table
+ case (_, _, true, _) if !isPartitionedTable => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+ // if it is pk table and the dropDuplicate has disable, use the upsert operation for strict and upsert mode.
+ case (true, false, false, false) if !isNonStrictMode => UPSERT_OPERATION_OPT_VAL
+ // if enableBulkInsert is true and the table is non-primaryKeyed, use the bulk insert operation
+ case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+ // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
+ case (true, true, _, _) if isNonStrictMode => BULK_INSERT_OPERATION_OPT_VAL
+ // for the rest case, use the insert operation
+ case (_, _, _, _) => INSERT_OPERATION_OPT_VAL
}
- }
- val payloadClassName = if (primaryColumns.nonEmpty && !dropDuplicate &&
- tableType == COW_TABLE_TYPE_OPT_VAL) {
+ val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
+ tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
// Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload
// on reading.
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
classOf[DefaultHoodieRecordPayload].getCanonicalName
}
+ logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName")
+
val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, options) {
Map(
@@ -243,6 +269,8 @@ object InsertIntoHoodieTableCommand {
RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
PARTITIONPATH_FIELD.key -> partitionFields,
PAYLOAD_CLASS.key -> payloadClassName,
+ ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
+ HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key -> isPrimaryKeyTable.toString,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index 3b5b33c..2ae0821 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -22,9 +22,10 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.keygen.{ComplexKeyGenerator, KeyGenUtils}
+import org.apache.hudi.keygen.{BaseKeyGenerator, ComplexKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, TimestampType}
-import org.joda.time.format.DateTimeFormat
+import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
/**
* A complex key generator for sql command which do some process for the
@@ -62,8 +63,15 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
}
}
- override def getPartitionPath(record: GenericRecord) = {
- val partitionPath = super.getPartitionPath(record)
+ override def getRecordKey(row: Row): String = {
+ if (originKeyGen.isDefined && originKeyGen.get.isInstanceOf[SparkKeyGeneratorInterface]) {
+ originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
+ } else {
+ super.getRecordKey(row)
+ }
+ }
+
+ private def convertPartitionPathToSqlType(partitionPath: String, rowType: Boolean): String = {
if (partitionSchema.isDefined) {
// we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT
// by default for sql.
@@ -82,9 +90,13 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
partitionField.dataType match {
case TimestampType =>
- val timeMs = MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
+ val timeMs = if (rowType) { // In RowType, the partitionPathValue is the time format string, convert to millis
+ SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
+ } else {
+ MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
+ }
val timestampFormat = PartitionPathEncodeUtils.escapePathName(
- SqlKeyGenerator.timestampTimeFormat.print(timeMs))
+ SqlKeyGenerator.timestampTimeFormat.print(timeMs))
if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else timestampFormat
case _ => partitionValue
}
@@ -92,10 +104,21 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
}
} else partitionPath
}
+
+ override def getPartitionPath(record: GenericRecord) = {
+ val partitionPath = super.getPartitionPath(record)
+ convertPartitionPathToSqlType(partitionPath, false)
+ }
+
+ override def getPartitionPath(row: Row): String = {
+ val partitionPath = super.getPartitionPath(row)
+ convertPartitionPathToSqlType(partitionPath, true)
+ }
}
object SqlKeyGenerator {
val PARTITION_SCHEMA = "hoodie.sql.partition.schema"
val ORIGIN_KEYGEN_CLASS = "hoodie.sql.origin.keygen.class"
private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
+ private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S")
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala
index 15dcdd2..14a0074 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala
@@ -18,9 +18,9 @@
package org.apache.spark.sql.hudi.command
import java.util.UUID
-
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.common.config.TypedProperties
+import org.apache.spark.sql.Row
/**
* A KeyGenerator which use the uuid as the record key.
@@ -28,4 +28,6 @@ import org.apache.hudi.common.config.TypedProperties
class UuidKeyGenerator(props: TypedProperties) extends SqlKeyGenerator(props) {
override def getRecordKey(record: GenericRecord): String = UUID.randomUUID.toString
+
+ override def getRecordKey(row: Row): String = UUID.randomUUID.toString
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
index 67b5275..11390fb 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
@@ -79,12 +79,15 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
}
protected def checkException(sql: String)(errorMsg: String): Unit = {
+ var hasException = false
try {
spark.sql(sql)
} catch {
case e: Throwable =>
assertResult(errorMsg)(e.getMessage)
+ hasException = true
}
+ assertResult(true)(hasException)
}
protected def removeQuotes(value: Any): Any = {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 0cda181..337317b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -64,6 +64,7 @@ class TestInsertTable extends TestHoodieSqlBase {
test("Test Insert Into None Partitioned Table") {
withTempDir { tmp =>
val tableName = generateTableName
+ spark.sql(s"set hoodie.sql.insert.mode=strict")
// Create none partitioned cow table
spark.sql(
s"""
@@ -80,7 +81,6 @@ class TestInsertTable extends TestHoodieSqlBase {
| preCombineField = 'ts'
| )
""".stripMargin)
-
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
@@ -127,6 +127,9 @@ class TestInsertTable extends TestHoodieSqlBase {
checkAnswer(s"select id, name, price, ts from $tableName2")(
Seq(1, "a1", 10.0, 1000)
)
+ // disable this config to avoid affect other test in this class.
+ spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false")
+ spark.sql(s"set hoodie.sql.insert.mode=upsert")
}
}
@@ -146,7 +149,6 @@ class TestInsertTable extends TestHoodieSqlBase {
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
-
// Insert overwrite dynamic partition
spark.sql(
s"""
@@ -246,7 +248,6 @@ class TestInsertTable extends TestHoodieSqlBase {
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
-
spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10")
spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue")
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")(
@@ -303,5 +304,220 @@ class TestInsertTable extends TestHoodieSqlBase {
"assertion failed: Required select columns count: 4, Current select columns(including static partition column)" +
" count: 3,columns: (1,a1,10)"
)
+ spark.sql("set hoodie.sql.bulk.insert.enable = true")
+ spark.sql("set hoodie.sql.insert.mode= strict")
+
+ val tableName2 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName2 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ checkException(s"insert into $tableName2 values(1, 'a1', 10, 1000)")(
+ "Table with primaryKey can not use bulk insert in strict mode."
+ )
+
+ val tableName3 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName3 (
+ | id int,
+ | name string,
+ | price double,
+ | dt string
+ |) using hudi
+ | partitioned by (dt)
+ """.stripMargin)
+ checkException(s"insert overwrite table $tableName3 values(1, 'a1', 10, '2021-07-18')")(
+ "Insert Overwrite Partition can not use bulk insert."
+ )
+ spark.sql("set hoodie.sql.bulk.insert.enable = false")
+ spark.sql("set hoodie.sql.insert.mode= upsert")
+ }
+
+ test("Test bulk insert") {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach {tableType =>
+ // Test bulk insert for single partition
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | dt string
+ |) using hudi
+ | options (
+ | type = '$tableType'
+ | )
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}/$tableName'
+ """.stripMargin)
+ spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false")
+
+ // Enable the bulk insert
+ spark.sql("set hoodie.sql.bulk.insert.enable = true")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')")
+
+ checkAnswer(s"select id, name, price, dt from $tableName")(
+ Seq(1, "a1", 10.0, "2021-07-18")
+ )
+ // Disable the bulk insert
+ spark.sql("set hoodie.sql.bulk.insert.enable = false")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, '2021-07-18')")
+
+ checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+ Seq(1, "a1", 10.0, "2021-07-18"),
+ Seq(2, "a2", 10.0, "2021-07-18")
+ )
+
+ // Test bulk insert for multi-level partition
+ val tableMultiPartition = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableMultiPartition (
+ | id int,
+ | name string,
+ | price double,
+ | dt string,
+ | hh string
+ |) using hudi
+ | options (
+ | type = '$tableType'
+ | )
+ | partitioned by (dt, hh)
+ | location '${tmp.getCanonicalPath}/$tableMultiPartition'
+ """.stripMargin)
+
+ // Enable the bulk insert
+ spark.sql("set hoodie.sql.bulk.insert.enable = true")
+ spark.sql(s"insert into $tableMultiPartition values(1, 'a1', 10, '2021-07-18', '12')")
+
+ checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition")(
+ Seq(1, "a1", 10.0, "2021-07-18", "12")
+ )
+ // Disable the bulk insert
+ spark.sql("set hoodie.sql.bulk.insert.enable = false")
+ spark.sql(s"insert into $tableMultiPartition " +
+ s"values(2, 'a2', 10, '2021-07-18','12')")
+
+ checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition order by id")(
+ Seq(1, "a1", 10.0, "2021-07-18", "12"),
+ Seq(2, "a2", 10.0, "2021-07-18", "12")
+ )
+ // Test bulk insert for non-partitioned table
+ val nonPartitionedTable = generateTableName
+ spark.sql(
+ s"""
+ |create table $nonPartitionedTable (
+ | id int,
+ | name string,
+ | price double
+ |) using hudi
+ | options (
+ | type = '$tableType'
+ | )
+ | location '${tmp.getCanonicalPath}/$nonPartitionedTable'
+ """.stripMargin)
+ spark.sql("set hoodie.sql.bulk.insert.enable = true")
+ spark.sql(s"insert into $nonPartitionedTable values(1, 'a1', 10)")
+ checkAnswer(s"select id, name, price from $nonPartitionedTable")(
+ Seq(1, "a1", 10.0)
+ )
+ spark.sql(s"insert overwrite table $nonPartitionedTable values(2, 'a2', 10)")
+ checkAnswer(s"select id, name, price from $nonPartitionedTable")(
+ Seq(2, "a2", 10.0)
+ )
+ spark.sql("set hoodie.sql.bulk.insert.enable = false")
+
+ // Test CTAS for bulk insert
+ val tableName2 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName2
+ |using hudi
+ |options(
+ | type = '$tableType',
+ | primaryKey = 'id'
+ |)
+ | location '${tmp.getCanonicalPath}/$tableName2'
+ | as
+ | select * from $tableName
+ |""".stripMargin)
+ checkAnswer(s"select id, name, price, dt from $tableName2 order by id")(
+ Seq(1, "a1", 10.0, "2021-07-18"),
+ Seq(2, "a2", 10.0, "2021-07-18")
+ )
+ }
+ }
+ }
+
+ test("Test combine before insert") {
+ withTempDir{tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | options (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ spark.sql(
+ s"""
+ |insert overwrite table $tableName
+ |select * from (
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+ | union all
+ | select 1 as id, 'a1' as name, 11 as price, 1001 as ts
+ | )
+ |""".stripMargin
+ )
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 11.0, 1001)
+ )
+ }
}
+
+ test("Test insert pk-table") {
+ withTempDir{tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | options (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 11, 1000)")
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 11.0, 1000)
+ )
+
+ }
+ }
+
}