You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/08/09 04:18:44 UTC

[hudi] branch master updated: [HUDI-2208] Support Bulk Insert For Spark Sql (#3328)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 41a9986  [HUDI-2208] Support Bulk Insert For Spark Sql (#3328)
41a9986 is described below

commit 41a9986a7641f3232b1edd2a737fd4b7aa430dbf
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Mon Aug 9 12:18:31 2021 +0800

    [HUDI-2208] Support Bulk Insert For Spark Sql (#3328)
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  16 ++
 .../main/java/org/apache/hudi/sql/InsertMode.java  |  66 ++++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   6 +-
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala |   2 +-
 .../org/apache/spark/sql/hudi/HoodieSqlUtils.scala |   2 +-
 .../command/CreateHoodieTableAsSelectCommand.scala |  27 ++-
 .../command/InsertIntoHoodieTableCommand.scala     |  72 +++++--
 .../spark/sql/hudi/command/SqlKeyGenerator.scala   |  35 +++-
 .../spark/sql/hudi/command/UuidKeyGenerator.scala  |   4 +-
 .../apache/spark/sql/hudi/TestHoodieSqlBase.scala  |   3 +
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 222 ++++++++++++++++++++-
 11 files changed, 406 insertions(+), 49 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 5c67a68..46aee67 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -256,6 +256,22 @@ object DataSourceWriteOptions {
     .withDocumentation("When set to true, will perform write operations directly using the spark native " +
       "`Row` representation, avoiding any additional conversion costs.")
 
+  /**
+   * Enable the bulk insert for sql insert statement.
+   */
+  val SQL_ENABLE_BULK_INSERT:ConfigProperty[String] = ConfigProperty
+    .key("hoodie.sql.bulk.insert.enable")
+    .defaultValue("false")
+    .withDocumentation("When set to true, the sql insert statement will use bulk insert.")
+
+  val SQL_INSERT_MODE: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.sql.insert.mode")
+    .defaultValue("upsert")
+    .withDocumentation("Insert mode when insert data to pk-table. The optional modes are: upsert, strict and non-strict." +
+      "For upsert mode, insert statement do the upsert operation for the pk-table which will update the duplicate record." +
+      "For strict mode, insert statement will keep the primary key uniqueness constraint which do not allow duplicate record." +
+      "While for non-strict mode, hudi just do the insert operation for the pk-table.")
+
   val COMMIT_METADATA_KEYPREFIX: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.commitmeta.key.prefix")
     .defaultValue("_")
diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/InsertMode.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/InsertMode.java
new file mode 100644
index 0000000..4b44ae4
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/InsertMode.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sql;
+
+import java.util.Locale;
+
+/**
+ * Insert mode for insert into pk-table.
+ */
+public enum InsertMode {
+  /**
+   * In upsert mode for insert into, duplicate record on primary key
+   * will be updated.This is the default insert mode for pk-table.
+   */
+  UPSERT("upsert"),
+  /**
+   * In strict mode for insert into, we do the pk uniqueness guarantee
+   * for COW pk-table.
+   * For MOR pk-table, it has the same behavior with "upsert" mode.
+   */
+  STRICT("strict"),
+  /**
+   * In non-strict mode for insert into, we use insert operation
+   * to write data which allow writing the duplicate record.
+   */
+  NON_STRICT("non-strict")
+  ;
+
+  private String value;
+
+  InsertMode(String value) {
+    this.value = value;
+  }
+
+  public String value() {
+    return value;
+  }
+
+  public static InsertMode of(String value) {
+    switch (value.toLowerCase(Locale.ROOT)) {
+      case "upsert":
+        return UPSERT;
+      case "strict":
+        return STRICT;
+      case "non-strict":
+        return NON_STRICT;
+      default:
+        throw new AssertionError("UnSupport Insert Mode: " + value);
+    }
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 6213ab8..6a54931 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -164,7 +164,11 @@ object HoodieSparkSqlWriter {
 
           // Convert to RDD[HoodieRecord]
           val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
-          val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || operation.equals(WriteOperationType.UPSERT);
+
+          val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
+            operation.equals(WriteOperationType.UPSERT) ||
+            parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
+              HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.defaultValue()).toBoolean
           val hoodieAllIncomingRecords = genericRecords.map(gr => {
             val hoodieRecord = if (shouldCombine) {
               val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 2f28c66..5120fe7 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -114,7 +114,7 @@ object HoodieOptionConfig {
 
   /**
    * Mapping the sql options to the hoodie table config which used to store to the hoodie
-   * .properites when create the table.
+   * .properties when create the table.
    * @param options
    * @return
    */
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
index da6c482..b0a8b52 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
@@ -218,7 +218,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
    * Append the spark config and table options to the baseConfig.
    */
   def withSparkConf(spark: SparkSession, options: Map[String, String])
-                   (baseConfig: Map[String, String]): Map[String, String] = {
+                   (baseConfig: Map[String, String] = Map.empty): Map[String, String] = {
     baseConfig ++ // Table options has the highest priority
       (spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options))
         .filterKeys(_.startsWith("hoodie."))
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index a156d9f..38c7e29 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.hive.util.ConfigUtils
+import org.apache.hudi.sql.InsertMode
 import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -62,8 +63,8 @@ case class CreateHoodieTableAsSelectCommand(
       }
     }
     val tablePath = getTableLocation(table, sparkSession)
-    val conf = sparkSession.sessionState.newHadoopConf()
-    assert(CreateHoodieTableCommand.isEmptyPath(tablePath, conf),
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    assert(CreateHoodieTableCommand.isEmptyPath(tablePath, hadoopConf),
       s"Path '$tablePath' should be empty for CTAS")
 
     // ReOrder the query which move the partition columns to the last of the project list
@@ -72,17 +73,15 @@ case class CreateHoodieTableAsSelectCommand(
 
     // Execute the insert query
     try {
-      // Set if sync as a managed table.
-      sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key,
-        (table.tableType == CatalogTableType.MANAGED).toString)
-      // Sync the options to hive serde properties
-      sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key,
-        ConfigUtils.configToString(table.storage.properties.asJava))
-      // Sync the table properties to hive
-      sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key,
-        ConfigUtils.configToString(table.properties.asJava))
+      val options = Map(
+        DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString,
+        DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(table.storage.properties.asJava),
+        DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(table.properties.asJava),
+        DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
+        DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
+      )
       val success = InsertIntoHoodieTableCommand.run(sparkSession, tableWithSchema, reOrderedQuery, Map.empty,
-        mode == SaveMode.Overwrite, refreshTable = false)
+        mode == SaveMode.Overwrite, refreshTable = false, extraOptions = options)
       if (success) {
         // If write success, create the table in catalog if it has not synced to the
         // catalog by the meta sync.
@@ -92,11 +91,11 @@ case class CreateHoodieTableAsSelectCommand(
           createTableCommand.createTableInCatalog(sparkSession, checkPathForManagedTable = false)
         }
       } else { // failed to insert data, clear table path
-        clearTablePath(tablePath, conf)
+        clearTablePath(tablePath, hadoopConf)
       }
     } catch {
       case e: Throwable => // failed to insert data, clear table path
-        clearTablePath(tablePath, conf)
+        clearTablePath(tablePath, hadoopConf)
         throw e
     }
     Seq.empty[Row]
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index cfd6609..02e7427 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -28,10 +28,12 @@ import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
 import org.apache.hudi.hive.MultiPartKeysValueExtractor
 import org.apache.hudi.hive.ddl.HiveSyncMode
-import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
+import org.apache.hudi.sql.InsertMode
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
-import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -58,7 +60,7 @@ case class InsertIntoHoodieTableCommand(
   }
 }
 
-object InsertIntoHoodieTableCommand {
+object InsertIntoHoodieTableCommand extends Logging {
   /**
    * Run the insert query. We support both dynamic partition insert and static partition insert.
    * @param sparkSession The spark session.
@@ -71,12 +73,14 @@ object InsertIntoHoodieTableCommand {
    *                         , it is None in the map.
    * @param overwrite Whether to overwrite the table.
    * @param refreshTable Whether to refresh the table after insert finished.
+   * @param extraOptions Extra options for insert.
    */
   def run(sparkSession: SparkSession, table: CatalogTable, query: LogicalPlan,
           insertPartitions: Map[String, Option[String]],
-          overwrite: Boolean, refreshTable: Boolean = true): Boolean = {
+          overwrite: Boolean, refreshTable: Boolean = true,
+          extraOptions: Map[String, String] = Map.empty): Boolean = {
 
-    val config = buildHoodieInsertConfig(table, sparkSession, overwrite, insertPartitions)
+    val config = buildHoodieInsertConfig(table, sparkSession, overwrite, insertPartitions, extraOptions)
 
     val mode = if (overwrite && table.partitionColumnNames.isEmpty) {
       // insert overwrite non-partition table
@@ -165,7 +169,7 @@ object InsertIntoHoodieTableCommand {
         Alias(castAttr, f.name)()
       })
     }
-    // Remove the hoodie meta fileds from the projects as we do not need these to write
+    // Remove the hoodie meta fields from the projects as we do not need these to write
     val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.name))
     val alignedProjects = withoutMetaFieldDataProjects ++ partitionProjects
     Project(alignedProjects, query)
@@ -179,7 +183,8 @@ object InsertIntoHoodieTableCommand {
       table: CatalogTable,
       sparkSession: SparkSession,
       isOverwrite: Boolean,
-      insertPartitions: Map[String, Option[String]] = Map.empty): Map[String, String] = {
+      insertPartitions: Map[String, Option[String]] = Map.empty,
+      extraOptions: Map[String, String]): Map[String, String] = {
 
     if (insertPartitions.nonEmpty &&
       (insertPartitions.keys.toSet != table.partitionColumnNames.toSet)) {
@@ -187,7 +192,7 @@ object InsertIntoHoodieTableCommand {
         s"[${insertPartitions.keys.mkString(" " )}]" +
         s" not equal to the defined partition in table[${table.partitionColumnNames.mkString(",")}]")
     }
-    val parameters = HoodieOptionConfig.mappingSqlOptionToHoodieParam(table.storage.properties)
+    val parameters = withSparkConf(sparkSession, table.storage.properties)() ++ extraOptions
 
     val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue)
 
@@ -209,28 +214,49 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
-      }
-    } else {
-      if (primaryColumns.nonEmpty && !dropDuplicate) {
-        UPSERT_OPERATION_OPT_VAL
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
+      DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
+    val isNonStrictMode = insertMode == InsertMode.NON_STRICT
+
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) if !isNonStrictMode =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.")
+        case (_, true, true, _) if isPartitionedTable =>
+          throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.")
+        case (_, true, _, true) =>
+          throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." +
+            s" Please disable $INSERT_DROP_DUPS and try again.")
+        // if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table.
+        case (_, true, true, _) if !isPartitionedTable => BULK_INSERT_OPERATION_OPT_VAL
+        // insert overwrite partition
+        case (_, _, true, _) if isPartitionedTable => INSERT_OVERWRITE_OPERATION_OPT_VAL
+        // insert overwrite table
+        case (_, _, true, _) if !isPartitionedTable => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+        // if it is pk table and the dropDuplicate has disable, use the upsert operation for strict and upsert mode.
+        case (true, false, false, false) if !isNonStrictMode => UPSERT_OPERATION_OPT_VAL
+        // if enableBulkInsert is true and the table is non-primaryKeyed, use the bulk insert operation
+        case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+        // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
+        case (true, true, _, _) if isNonStrictMode => BULK_INSERT_OPERATION_OPT_VAL
+        // for the rest case, use the insert operation
+        case (_, _, _, _) => INSERT_OPERATION_OPT_VAL
       }
-    }
 
-    val payloadClassName = if (primaryColumns.nonEmpty && !dropDuplicate &&
-      tableType == COW_TABLE_TYPE_OPT_VAL) {
+    val payloadClassName = if (operation ==  UPSERT_OPERATION_OPT_VAL &&
+      tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
       // Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload
       // on reading.
       classOf[ValidateDuplicateKeyPayload].getCanonicalName
     } else {
       classOf[DefaultHoodieRecordPayload].getCanonicalName
     }
+    logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName")
+
     val enableHive = isEnableHive(sparkSession)
     withSparkConf(sparkSession, options) {
       Map(
@@ -243,6 +269,8 @@ object InsertIntoHoodieTableCommand {
         RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
         PARTITIONPATH_FIELD.key -> partitionFields,
         PAYLOAD_CLASS.key -> payloadClassName,
+        ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
+        HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key -> isPrimaryKeyTable.toString,
         META_SYNC_ENABLED.key -> enableHive.toString,
         HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
         HIVE_USE_JDBC.key -> "false",
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index 3b5b33c..2ae0821 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -22,9 +22,10 @@ import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.keygen.{ComplexKeyGenerator, KeyGenUtils}
+import org.apache.hudi.keygen.{BaseKeyGenerator, ComplexKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{StructType, TimestampType}
-import org.joda.time.format.DateTimeFormat
+import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
 
 /**
  * A complex key generator for sql command which do some process for the
@@ -62,8 +63,15 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
     }
   }
 
-  override def getPartitionPath(record: GenericRecord) = {
-    val partitionPath = super.getPartitionPath(record)
+  override def getRecordKey(row: Row): String = {
+    if (originKeyGen.isDefined && originKeyGen.get.isInstanceOf[SparkKeyGeneratorInterface]) {
+      originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
+    } else {
+      super.getRecordKey(row)
+    }
+  }
+
+  private def convertPartitionPathToSqlType(partitionPath: String, rowType: Boolean): String = {
     if (partitionSchema.isDefined) {
       // we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT
       // by default for sql.
@@ -82,9 +90,13 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
 
             partitionField.dataType match {
               case TimestampType =>
-                val timeMs = MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
+                val timeMs = if (rowType) { // In RowType, the partitionPathValue is the time format string, convert to millis
+                  SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
+                } else {
+                  MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
+                }
                 val timestampFormat = PartitionPathEncodeUtils.escapePathName(
-                  SqlKeyGenerator.timestampTimeFormat.print(timeMs))
+                    SqlKeyGenerator.timestampTimeFormat.print(timeMs))
                 if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else timestampFormat
               case _ => partitionValue
             }
@@ -92,10 +104,21 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
       }
     } else partitionPath
   }
+
+  override def getPartitionPath(record: GenericRecord) = {
+    val partitionPath = super.getPartitionPath(record)
+    convertPartitionPathToSqlType(partitionPath, false)
+  }
+
+  override def getPartitionPath(row: Row): String = {
+    val partitionPath = super.getPartitionPath(row)
+    convertPartitionPathToSqlType(partitionPath, true)
+  }
 }
 
 object SqlKeyGenerator {
   val PARTITION_SCHEMA = "hoodie.sql.partition.schema"
   val ORIGIN_KEYGEN_CLASS = "hoodie.sql.origin.keygen.class"
   private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
+  private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S")
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala
index 15dcdd2..14a0074 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.hudi.command
 
 import java.util.UUID
-
 import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.common.config.TypedProperties
+import org.apache.spark.sql.Row
 
 /**
  * A KeyGenerator which use the uuid as the record key.
@@ -28,4 +28,6 @@ import org.apache.hudi.common.config.TypedProperties
 class UuidKeyGenerator(props: TypedProperties) extends SqlKeyGenerator(props) {
 
   override def getRecordKey(record: GenericRecord): String = UUID.randomUUID.toString
+
+  override def getRecordKey(row: Row): String = UUID.randomUUID.toString
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
index 67b5275..11390fb 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
@@ -79,12 +79,15 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
   }
 
   protected def checkException(sql: String)(errorMsg: String): Unit = {
+    var hasException = false
     try {
       spark.sql(sql)
     } catch {
       case e: Throwable =>
         assertResult(errorMsg)(e.getMessage)
+        hasException = true
     }
+    assertResult(true)(hasException)
   }
 
   protected def removeQuotes(value: Any): Any = {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 0cda181..337317b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -64,6 +64,7 @@ class TestInsertTable extends TestHoodieSqlBase {
   test("Test Insert Into None Partitioned Table") {
     withTempDir { tmp =>
       val tableName = generateTableName
+      spark.sql(s"set hoodie.sql.insert.mode=strict")
       // Create none partitioned cow table
       spark.sql(
         s"""
@@ -80,7 +81,6 @@ class TestInsertTable extends TestHoodieSqlBase {
            |  preCombineField = 'ts'
            | )
        """.stripMargin)
-
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
       checkAnswer(s"select id, name, price, ts from $tableName")(
         Seq(1, "a1", 10.0, 1000)
@@ -127,6 +127,9 @@ class TestInsertTable extends TestHoodieSqlBase {
       checkAnswer(s"select id, name, price, ts from $tableName2")(
         Seq(1, "a1", 10.0, 1000)
       )
+      // disable this config to avoid affect other test in this class.
+      spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false")
+      spark.sql(s"set hoodie.sql.insert.mode=upsert")
     }
   }
 
@@ -146,7 +149,6 @@ class TestInsertTable extends TestHoodieSqlBase {
            | partitioned by (dt)
            | location '${tmp.getCanonicalPath}/$tableName'
        """.stripMargin)
-
       //  Insert overwrite dynamic partition
       spark.sql(
         s"""
@@ -246,7 +248,6 @@ class TestInsertTable extends TestHoodieSqlBase {
              | partitioned by (dt)
              | location '${tmp.getCanonicalPath}/$tableName'
        """.stripMargin)
-
         spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10")
         spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue")
         checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")(
@@ -303,5 +304,220 @@ class TestInsertTable extends TestHoodieSqlBase {
       "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" +
         " count: 3,columns: (1,a1,10)"
     )
+    spark.sql("set hoodie.sql.bulk.insert.enable = true")
+    spark.sql("set hoodie.sql.insert.mode= strict")
+
+    val tableName2 = generateTableName
+    spark.sql(
+      s"""
+         |create table $tableName2 (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  ts long
+         |) using hudi
+         | options (
+         |   primaryKey = 'id',
+         |   preCombineField = 'ts'
+         | )
+       """.stripMargin)
+    checkException(s"insert into $tableName2 values(1, 'a1', 10, 1000)")(
+      "Table with primaryKey can not use bulk insert in strict mode."
+    )
+
+    val tableName3 = generateTableName
+    spark.sql(
+      s"""
+         |create table $tableName3 (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  dt string
+         |) using hudi
+         | partitioned by (dt)
+       """.stripMargin)
+    checkException(s"insert overwrite table $tableName3 values(1, 'a1', 10, '2021-07-18')")(
+      "Insert Overwrite Partition can not use bulk insert."
+    )
+    spark.sql("set hoodie.sql.bulk.insert.enable = false")
+    spark.sql("set hoodie.sql.insert.mode= upsert")
+  }
+
+  test("Test bulk insert") {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach {tableType =>
+        // Test bulk insert for single partition
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  dt string
+             |) using hudi
+             | options (
+             |  type = '$tableType'
+             | )
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}/$tableName'
+       """.stripMargin)
+        spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false")
+
+        // Enable the bulk insert
+        spark.sql("set hoodie.sql.bulk.insert.enable = true")
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')")
+
+        checkAnswer(s"select id, name, price, dt from $tableName")(
+          Seq(1, "a1", 10.0, "2021-07-18")
+        )
+        // Disable the bulk insert
+        spark.sql("set hoodie.sql.bulk.insert.enable = false")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, '2021-07-18')")
+
+        checkAnswer(s"select id, name, price, dt from $tableName order by id")(
+          Seq(1, "a1", 10.0, "2021-07-18"),
+          Seq(2, "a2", 10.0, "2021-07-18")
+        )
+
+        // Test bulk insert for multi-level partition
+        val tableMultiPartition = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableMultiPartition (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  dt string,
+             |  hh string
+             |) using hudi
+             | options (
+             |  type = '$tableType'
+             | )
+             | partitioned by (dt, hh)
+             | location '${tmp.getCanonicalPath}/$tableMultiPartition'
+       """.stripMargin)
+
+        // Enable the bulk insert
+        spark.sql("set hoodie.sql.bulk.insert.enable = true")
+        spark.sql(s"insert into $tableMultiPartition values(1, 'a1', 10, '2021-07-18', '12')")
+
+        checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition")(
+          Seq(1, "a1", 10.0, "2021-07-18", "12")
+        )
+        // Disable the bulk insert
+        spark.sql("set hoodie.sql.bulk.insert.enable = false")
+        spark.sql(s"insert into $tableMultiPartition " +
+          s"values(2, 'a2', 10, '2021-07-18','12')")
+
+        checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition order by id")(
+          Seq(1, "a1", 10.0, "2021-07-18", "12"),
+          Seq(2, "a2", 10.0, "2021-07-18", "12")
+        )
+        // Test bulk insert for non-partitioned table
+        val nonPartitionedTable = generateTableName
+        spark.sql(
+          s"""
+             |create table $nonPartitionedTable (
+             |  id int,
+             |  name string,
+             |  price double
+             |) using hudi
+             | options (
+             |  type = '$tableType'
+             | )
+             | location '${tmp.getCanonicalPath}/$nonPartitionedTable'
+       """.stripMargin)
+        spark.sql("set hoodie.sql.bulk.insert.enable = true")
+        spark.sql(s"insert into $nonPartitionedTable values(1, 'a1', 10)")
+        checkAnswer(s"select id, name, price from $nonPartitionedTable")(
+          Seq(1, "a1", 10.0)
+        )
+        spark.sql(s"insert overwrite table $nonPartitionedTable values(2, 'a2', 10)")
+        checkAnswer(s"select id, name, price from $nonPartitionedTable")(
+          Seq(2, "a2", 10.0)
+        )
+        spark.sql("set hoodie.sql.bulk.insert.enable = false")
+
+        // Test CTAS for bulk insert
+        val tableName2 = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName2
+             |using hudi
+             |options(
+             | type = '$tableType',
+             | primaryKey = 'id'
+             |)
+             | location '${tmp.getCanonicalPath}/$tableName2'
+             | as
+             | select * from $tableName
+             |""".stripMargin)
+        checkAnswer(s"select id, name, price, dt from $tableName2 order by id")(
+          Seq(1, "a1", 10.0, "2021-07-18"),
+          Seq(2, "a2", 10.0, "2021-07-18")
+        )
+      }
+    }
+  }
+
+  test("Test combine before insert") {
+    withTempDir{tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | options (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      spark.sql(
+        s"""
+           |insert overwrite table $tableName
+           |select * from (
+           | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+           | union all
+           | select 1 as id, 'a1' as name, 11 as price, 1001 as ts
+           | )
+           |""".stripMargin
+      )
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 11.0, 1001)
+      )
+    }
   }
+
+  test("Test insert pk-table") {
+    withTempDir{tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | options (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(1, 'a1', 11, 1000)")
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 11.0, 1000)
+      )
+
+    }
+  }
+
 }