You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/08/07 11:49:39 UTC

[hudi] branch master updated: [HUDI-1842] Spark Sql Support For pre-existing Hoodie Table (#3393)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 55d2e78  [HUDI-1842] Spark Sql Support For pre-existing Hoodie Table (#3393)
55d2e78 is described below

commit 55d2e786dbae584d2c4917a15d599ab7bb1877fb
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Sat Aug 7 19:49:26 2021 +0800

    [HUDI-1842] Spark Sql Support For pre-existing Hoodie Table (#3393)
---
 .../main/scala/org/apache/hudi/DefaultSource.scala |   1 -
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala |   5 +-
 .../org/apache/spark/sql/hudi/HoodieSqlUtils.scala |  27 ++-
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |  26 ++-
 .../AlterHoodieTableAddColumnsCommand.scala        |   1 -
 .../AlterHoodieTableChangeColumnCommand.scala      |   1 -
 .../command/AlterHoodieTableRenameCommand.scala    |   1 -
 .../command/CompactionHoodieTableCommand.scala     |   1 -
 .../command/CompactionShowHoodieTableCommand.scala |   1 -
 .../command/CreateHoodieTableAsSelectCommand.scala |   1 -
 .../hudi/command/CreateHoodieTableCommand.scala    | 172 ++++++++++-----
 .../hudi/command/DeleteHoodieTableCommand.scala    |   1 -
 .../command/InsertIntoHoodieTableCommand.scala     |   1 -
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   1 -
 .../spark/sql/hudi/command/SqlKeyGenerator.scala   |  75 ++++---
 .../hudi/command/TruncateHoodieTableCommand.scala  |   1 -
 .../hudi/command/UpdateHoodieTableCommand.scala    |   1 -
 .../apache/spark/sql/hudi/TestCreateTable.scala    | 233 +++++++++++++++++++++
 .../sql/hudi/TestPartialUpdateForMergeInto.scala   |   4 +-
 19 files changed, 453 insertions(+), 101 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index a5098d6..00133ab 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
-import org.apache.hudi.hive.util.ConfigUtils
 import org.apache.log4j.LogManager
 import org.apache.spark.sql.avro.SchemaConverters
 import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index c49ffbd..2f28c66 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -126,12 +126,9 @@ object HoodieOptionConfig {
 
   /**
    * Mapping the table config (loaded from the hoodie.properties) to the sql options.
-   * @param options
-   * @return
    */
   def mappingTableConfigToSqlOption(options: Map[String, String]): Map[String, String] = {
-    options.filterKeys(k => tableConfigKeyToSqlKey.contains(k))
-      .map(kv => tableConfigKeyToSqlKey(kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2))
+    options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2))
   }
 
   private lazy val defaultTableConfig: Map[String, String] = {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
index 99ae226..e2c622e 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
@@ -24,7 +24,9 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.avro.SchemaConverters
 import org.apache.spark.sql.{Column, DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -64,6 +66,16 @@ object HoodieSqlUtils extends SparkAdapterSupport {
     }
   }
 
+  def getTableSqlSchema(metaClient: HoodieTableMetaClient): Option[StructType] = {
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val avroSchema = try Some(schemaResolver.getTableAvroSchema(false))
+    catch {
+      case _: Throwable => None
+    }
+    avroSchema.map(SchemaConverters.toSqlType(_).dataType
+      .asInstanceOf[StructType])
+  }
+
   private def tripAlias(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case SubqueryAlias(_, relation: LogicalPlan) =>
@@ -122,12 +134,12 @@ object HoodieSqlUtils extends SparkAdapterSupport {
    * @param spark
    * @return
    */
-  def getTableLocation(tableId: TableIdentifier, spark: SparkSession): Option[String] = {
+  def getTableLocation(tableId: TableIdentifier, spark: SparkSession): String = {
     val table = spark.sessionState.catalog.getTableMetadata(tableId)
     getTableLocation(table, spark)
   }
 
-  def getTableLocation(table: CatalogTable, sparkSession: SparkSession): Option[String] = {
+  def getTableLocation(table: CatalogTable, sparkSession: SparkSession): String = {
     val uri = if (table.tableType == CatalogTableType.MANAGED && isHoodieTable(table)) {
       Some(sparkSession.sessionState.catalog.defaultTablePath(table.identifier))
     } else {
@@ -136,6 +148,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
     val conf = sparkSession.sessionState.newHadoopConf()
     uri.map(makePathQualified(_, conf))
       .map(removePlaceHolder)
+      .getOrElse(throw new IllegalArgumentException(s"Missing location for ${table.identifier}"))
   }
 
   private def removePlaceHolder(path: String): String = {
@@ -154,6 +167,16 @@ object HoodieSqlUtils extends SparkAdapterSupport {
     fs.makeQualified(hadoopPath).toUri.toString
   }
 
+  /**
+   * Check if the hoodie.properties exists in the table path.
+   */
+  def tableExistsInPath(tablePath: String, conf: Configuration): Boolean = {
+    val basePath = new Path(tablePath)
+    val fs = basePath.getFileSystem(conf)
+    val metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)
+    fs.exists(metaPath)
+  }
+
   def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): Expression = {
     child match {
       case Literal(nul, NullType) => Literal(nul, dataType)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 8ab8e8b..4c15670 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -21,7 +21,8 @@ import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
 import org.apache.hudi.SparkAdapterSupport
 
 import scala.collection.JavaConverters._
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedStar
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -286,7 +287,28 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
       } else {
         l
       }
-
+    // Fill schema for Create Table without specify schema info
+    case c @ CreateTable(tableDesc, _, _)
+      if isHoodieTable(tableDesc) =>
+        val tablePath = getTableLocation(c.tableDesc, sparkSession)
+        if (tableExistsInPath(tablePath, sparkSession.sessionState.newHadoopConf())) {
+          val metaClient = HoodieTableMetaClient.builder()
+            .setBasePath(tablePath)
+            .setConf(sparkSession.sessionState.newHadoopConf())
+            .build()
+          val tableSchema = HoodieSqlUtils.getTableSqlSchema(metaClient).map(HoodieSqlUtils.addMetaFields)
+          if (tableSchema.isDefined && tableDesc.schema.isEmpty) {
+            // Fill the schema with the schema from the table
+            c.copy(tableDesc.copy(schema = tableSchema.get))
+          } else if (tableSchema.isDefined && tableDesc.schema != tableSchema.get) {
+            throw new AnalysisException(s"Specified schema in create table statement is not equal to the table schema." +
+              s"You should not specify the schema for an exist table: ${tableDesc.identifier} ")
+          } else {
+            c
+          }
+        } else {
+          c
+        }
     case p => p
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
index c5a6f84..4123ea9 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -103,7 +103,6 @@ object AlterHoodieTableAddColumnsCommand {
    */
   def commitWithSchema(schema: Schema, table: CatalogTable, sparkSession: SparkSession): Unit = {
     val path = getTableLocation(table, sparkSession)
-      .getOrElse(s"missing location for ${table.identifier}")
 
     val jsc = new JavaSparkContext(sparkSession.sparkContext)
     val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
index d38b98c..d9569ce 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
@@ -68,7 +68,6 @@ case class AlterHoodieTableChangeColumnCommand(
     val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)
 
     val path = getTableLocation(table, sparkSession)
-      .getOrElse(s"missing location for ${table.identifier}")
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
       .setConf(hadoopConf).build()
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala
index e473299..2df9ec8 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala
@@ -37,7 +37,6 @@ class AlterHoodieTableRenameCommand(
       val catalog = sparkSession.sessionState.catalog
       val table = catalog.getTableMetadata(oldName)
       val path = getTableLocation(table, sparkSession)
-        .getOrElse(s"missing location for ${table.identifier}")
 
       val hadoopConf = sparkSession.sessionState.newHadoopConf()
       val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala
index 5fdfefb..631504d 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala
@@ -31,7 +31,6 @@ case class CompactionHoodieTableCommand(table: CatalogTable,
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val basePath = getTableLocation(table, sparkSession)
-      .getOrElse(s"missing location for ${table.identifier}")
     CompactionHoodiePathCommand(basePath, operation, instantTimestamp).run(sparkSession)
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala
index e06bc1f..0702e6b 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala
@@ -29,7 +29,6 @@ case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val basePath = getTableLocation(table, sparkSession)
-      .getOrElse(s"missing location for ${table.identifier}")
     CompactionShowHoodiePathCommand(basePath, limit).run(sparkSession)
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index dce34b3..a156d9f 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -62,7 +62,6 @@ case class CreateHoodieTableAsSelectCommand(
       }
     }
     val tablePath = getTableLocation(table, sparkSession)
-      .getOrElse(s"Missing path for table ${table.identifier}")
     val conf = sparkSession.sessionState.newHadoopConf()
     assert(CreateHoodieTableCommand.isEmptyPath(tablePath, conf),
       s"Path '$tablePath' should be empty for CTAS")
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index 462dfcf..12a67bb 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -19,16 +19,19 @@ package org.apache.spark.sql.hudi.command
 
 import scala.collection.JavaConverters._
 import java.util.{Locale, Properties}
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
 import org.apache.hudi.common.model.HoodieFileFormat
-import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.ValidationUtils
 import org.apache.hudi.hadoop.HoodieParquetInputFormat
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
+import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.{SPARK_VERSION, SparkConf}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.avro.SchemaConverters
@@ -40,8 +43,7 @@ import org.apache.spark.sql.hive.HiveClientUtils
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.spark.sql.hudi.HoodieSqlUtils._
 import org.apache.spark.sql.hudi.HoodieOptionConfig
-import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.{initTableIfNeed, tableExistsInPath, isEmptyPath}
-import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.{initTableIfNeed, isEmptyPath}
 import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -50,8 +52,6 @@ import scala.collection.mutable
 
 /**
  * Command for create hoodie table.
- * @param table
- * @param ignoreIfExists
  */
 case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
   extends RunnableCommand with SparkAdapterSupport {
@@ -83,7 +83,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
     val sessionState = sparkSession.sessionState
     val tableName = table.identifier.unquotedString
     val path = getTableLocation(table, sparkSession)
-      .getOrElse(s"Missing path for table ${table.identifier}")
     val conf = sparkSession.sessionState.newHadoopConf()
     val isTableExists = tableExistsInPath(path, conf)
     // Get the schema & table options
@@ -95,26 +94,44 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
         .setBasePath(path)
         .setConf(conf)
         .build()
-      val schemaResolver = new TableSchemaResolver(metaClient)
-      val avroSchema = try Some(schemaResolver.getTableAvroSchema(false))
-      catch {
-        case _: Throwable => None
-      }
-      val tableSchema = avroSchema.map(SchemaConverters.toSqlType(_).dataType
-        .asInstanceOf[StructType])
-
-      // Get options from the external table and append with the options in ddl.
-      val options = HoodieOptionConfig.mappingTableConfigToSqlOption(
-        metaClient.getTableConfig.getProps.asScala.toMap) ++ table.storage.properties
-
-      val userSpecifiedSchema = table.schema
-      if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) {
+     val tableSchema = getTableSqlSchema(metaClient)
+
+     // Get options from the external table and append with the options in ddl.
+     val originTableConfig =  HoodieOptionConfig.mappingTableConfigToSqlOption(
+       metaClient.getTableConfig.getProps.asScala.toMap)
+
+     val allPartitionPaths = getAllPartitionPaths(sparkSession, table)
+     var upgrateConfig = Map.empty[String, String]
+     // If this is a non-hive-styled partition table, disable the hive style config.
+     // (By default this config is enable for spark sql)
+     upgrateConfig = if (isNotHiveStyledPartitionTable(allPartitionPaths, table)) {
+        upgrateConfig + (DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false")
+     } else {
+        upgrateConfig
+     }
+     upgrateConfig = if (isUrlEncodeDisable(allPartitionPaths, table)) {
+       upgrateConfig + (DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key -> "false")
+     } else {
+       upgrateConfig
+     }
+
+      // Use the origin keygen to generate record key to keep the rowkey consistent with the old table for spark sql.
+      // See SqlKeyGenerator#getRecordKey for detail.
+     upgrateConfig = if (originTableConfig.contains(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS.key)) {
+        upgrateConfig + (SqlKeyGenerator.ORIGIN_KEYGEN_CLASS -> originTableConfig(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS.key))
+     } else {
+        upgrateConfig
+     }
+     val options = originTableConfig ++ upgrateConfig ++ table.storage.properties
+
+     val userSpecifiedSchema = table.schema
+     if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) {
         (addMetaFields(tableSchema.get), options)
-      } else if (userSpecifiedSchema.nonEmpty) {
+     }  else if (userSpecifiedSchema.nonEmpty) {
         (addMetaFields(userSpecifiedSchema), options)
-      } else {
+    } else {
         throw new IllegalArgumentException(s"Missing schema for Create Table: $tableName")
-      }
+     }
     } else {
       assert(table.schema.nonEmpty, s"Missing schema for Create Table: $tableName")
       // SPARK-19724: the default location of a managed table should be non-existent or empty.
@@ -301,47 +318,102 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
           s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'")
     }
   }
+
+  private def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = {
+    val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
+    val metadataConfig = {
+      val properties = new Properties()
+      properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties).asJava)
+      HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
+    }
+    FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala
+  }
+
+  /**
+   * This method is used to compatible with the old non-hive-styled partition table.
+   * By default we enable the "hoodie.datasource.write.hive_style_partitioning"
+   * when writing data to hudi table by spark sql by default.
+   * If the exist table is a non-hive-styled partitioned table, we should
+   * disable the "hoodie.datasource.write.hive_style_partitioning" when
+   * merge or update the table. Or else, we will get an incorrect merge result
+   * as the partition path mismatch.
+   */
+  private def isNotHiveStyledPartitionTable(partitionPaths: Seq[String], table: CatalogTable): Boolean = {
+    if (table.partitionColumnNames.nonEmpty) {
+      val isHiveStylePartitionPath = (path: String) => {
+        val fragments = path.split("/")
+        if (fragments.size != table.partitionColumnNames.size) {
+          false
+        } else {
+          fragments.zip(table.partitionColumnNames).forall {
+            case (pathFragment, partitionColumn) => pathFragment.startsWith(s"$partitionColumn=")
+          }
+        }
+      }
+      !partitionPaths.forall(isHiveStylePartitionPath)
+    } else {
+      false
+    }
+  }
+
+  /**
+   * If this table has disable the url encode, spark sql should also disable it when writing to the table.
+   */
+  private def isUrlEncodeDisable(partitionPaths: Seq[String], table: CatalogTable): Boolean = {
+    if (table.partitionColumnNames.nonEmpty) {
+      !partitionPaths.forall(partitionPath => partitionPath.split("/").length == table.partitionColumnNames.size)
+    } else {
+      false
+    }
+  }
+
 }
 
 object CreateHoodieTableCommand extends Logging {
 
   /**
-    * Init the table if it is not exists.
-    * @param sparkSession
-    * @param table
-    * @return
+    * Init the hoodie.properties.
     */
   def initTableIfNeed(sparkSession: SparkSession, table: CatalogTable): Unit = {
-    val location = getTableLocation(table, sparkSession).getOrElse(
-      throw new IllegalArgumentException(s"Missing location for ${table.identifier}"))
+    val location = getTableLocation(table, sparkSession)
 
     val conf = sparkSession.sessionState.newHadoopConf()
     // Init the hoodie table
-    if (!tableExistsInPath(location, conf)) {
-      val tableName = table.identifier.table
-      logInfo(s"Table $tableName is not exists, start to create the hudi table")
+    val originTableConfig = if (tableExistsInPath(location, conf)) {
+      val metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(location)
+        .setConf(conf)
+        .build()
+      metaClient.getTableConfig.getProps.asScala.toMap
+    } else {
+      Map.empty[String, String]
+    }
 
-      // Save all the table config to the hoodie.properties.
-      val parameters = HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties)
-      val properties = new Properties()
+    val tableName = table.identifier.table
+    logInfo(s"Init hoodie.properties for $tableName")
+    val tableOptions = HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties)
+    checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key)
+    checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key)
+    checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS.key)
+    // Save all the table config to the hoodie.properties.
+    val parameters = originTableConfig ++ tableOptions
+    val properties = new Properties()
       properties.putAll(parameters.asJava)
       HoodieTableMetaClient.withPropertyBuilder()
-          .fromProperties(properties)
-          .setTableName(tableName)
-          .setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString())
-          .setPartitionFields(table.partitionColumnNames.mkString(","))
-          .initTable(conf, location)
-    }
+        .fromProperties(properties)
+        .setTableName(tableName)
+        .setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString())
+        .setPartitionFields(table.partitionColumnNames.mkString(","))
+        .initTable(conf, location)
   }
 
-  /**
-   * Check if the hoodie.properties exists in the table path.
-   */
-  def tableExistsInPath(tablePath: String, conf: Configuration): Boolean = {
-    val basePath = new Path(tablePath)
-    val fs = basePath.getFileSystem(conf)
-    val metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)
-    fs.exists(metaPath)
+  def checkTableConfigEqual(originTableConfig: Map[String, String],
+    newTableConfig: Map[String, String], configKey: String): Unit = {
+    if (originTableConfig.contains(configKey) && newTableConfig.contains(configKey)) {
+      assert(originTableConfig(configKey) == newTableConfig(configKey),
+        s"Table config: $configKey in the create table is: ${newTableConfig(configKey)}, is not the same with the value in " +
+        s"hoodie.properties, which is:  ${originTableConfig(configKey)}. Please keep the same.")
+    }
   }
 
   /**
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index cfebe1e..cca9e15 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -59,7 +59,6 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
     val targetTable = sparkSession.sessionState.catalog
       .getTableMetadata(tableId)
     val path = getTableLocation(targetTable, sparkSession)
-      .getOrElse(s"missing location for $tableId")
 
     val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 9782ec5..cfd6609 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -193,7 +193,6 @@ object InsertIntoHoodieTableCommand {
 
     val partitionFields = table.partitionColumnNames.mkString(",")
     val path = getTableLocation(table, sparkSession)
-      .getOrElse(s"Missing location for table ${table.identifier}")
 
     val tableSchema = table.schema
     val options = table.storage.properties
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 5146868..70b1a60 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -416,7 +416,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
     val targetTableDb = targetTableIdentify.database.getOrElse("default")
     val targetTableName = targetTableIdentify.identifier
     val path = getTableLocation(targetTable, sparkSession)
-      .getOrElse(s"missing location for $targetTableIdentify")
 
     val options = targetTable.storage.properties
     val definedPk = HoodieOptionConfig.getPrimaryColumns(options)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index fe0a7e1..3b5b33c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -18,10 +18,10 @@
 package org.apache.spark.sql.hudi.command
 
 import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}
-
 import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
+import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.keygen.{ComplexKeyGenerator, KeyGenUtils}
 import org.apache.spark.sql.types.{StructType, TimestampType}
 import org.joda.time.format.DateTimeFormat
@@ -40,45 +40,62 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
       None
     }
   }
+  // The origin key generator class for this table.
+  private lazy val originKeyGen = {
+    val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS, null)
+    if (beforeKeyGenClassName != null) {
+      val keyGenProps = new TypedProperties()
+      keyGenProps.putAll(props)
+      keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS)
+      keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key, beforeKeyGenClassName)
+      Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
+    } else {
+      None
+    }
+  }
+
+  override def getRecordKey(record: GenericRecord): String = {
+    if (originKeyGen.isDefined) {
+      originKeyGen.get.getKey(record).getRecordKey
+    } else {
+      super.getRecordKey(record)
+    }
+  }
 
-  override def getPartitionPath(record: GenericRecord): String = {
+  override def getPartitionPath(record: GenericRecord) = {
     val partitionPath = super.getPartitionPath(record)
     if (partitionSchema.isDefined) {
       // we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT
       // by default for sql.
       val partitionFragments = partitionPath.split(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR)
-      assert(partitionFragments.size == partitionSchema.get.size)
+      // If it is a table which is not write by spark sql before and the url encode has disabled,
+      // the partition path level may not equal to the partition schema size. Just return the partitionPath
+      // in this case.
+      if (partitionFragments.size != partitionSchema.get.size) {
+        partitionPath
+      } else {
+        partitionFragments.zip(partitionSchema.get.fields).map {
+          case (partitionValue, partitionField) =>
+            val hiveStylePrefix = s"${partitionField.name}="
+            val isHiveStyle = partitionValue.startsWith(hiveStylePrefix)
+            val _partitionValue = if (isHiveStyle) partitionValue.substring(hiveStylePrefix.length) else partitionValue
 
-      partitionFragments.zip(partitionSchema.get.fields).map {
-        case (partitionValue, partitionField) =>
-          val hiveStylePrefix = s"${partitionField.name}="
-          val isHiveStyle = partitionValue.startsWith(hiveStylePrefix)
-          val _partitionValue = if (isHiveStyle) {
-            partitionValue.substring(hiveStylePrefix.length)
-          } else {
-            partitionValue
-          }
-
-          partitionField.dataType match {
-            case TimestampType =>
-              val timeMs = MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
-              val timestampFormat = PartitionPathEncodeUtils.escapePathName(
-                SqlKeyGenerator.timestampTimeFormat.print(timeMs))
-              if (isHiveStyle) {
-                s"$hiveStylePrefix$timestampFormat"
-              } else {
-                timestampFormat
-              }
-            case _=> partitionValue
-          }
-      }.mkString(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR)
-    } else {
-      partitionPath
-    }
+            partitionField.dataType match {
+              case TimestampType =>
+                val timeMs = MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
+                val timestampFormat = PartitionPathEncodeUtils.escapePathName(
+                  SqlKeyGenerator.timestampTimeFormat.print(timeMs))
+                if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else timestampFormat
+              case _ => partitionValue
+            }
+        }.mkString(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR)
+      }
+    } else partitionPath
   }
 }
 
 object SqlKeyGenerator {
   val PARTITION_SCHEMA = "hoodie.sql.partition.schema"
+  val ORIGIN_KEYGEN_CLASS = "hoodie.sql.origin.keygen.class"
   private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
index 02624fa..339f4b5 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
@@ -35,7 +35,6 @@ class TruncateHoodieTableCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val table = sparkSession.sessionState.catalog.getTableMetadata(tableName)
     val path = getTableLocation(table, sparkSession)
-      .getOrElse(s"missing location for ${table.identifier}")
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     // If we have not specified the partition, truncate will delete all the
     // data in the table path include the hoodi.properties. In this case we
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
index a11f21f..c0b0d78 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
@@ -85,7 +85,6 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
     val targetTable = sparkSession.sessionState.catalog
       .getTableMetadata(tableId)
     val path = getTableLocation(targetTable, sparkSession)
-      .getOrElse(s"missing location for $tableId")
 
     val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 51bf8d4..f75af61 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -17,9 +17,15 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.DataSourceWriteOptions._
+
 import scala.collection.JavaConverters._
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
+import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField}
@@ -272,4 +278,231 @@ class TestCreateTable extends TestHoodieSqlBase {
       )
     }
   }
+
+  test("Test Create Table From Exist Hoodie Table") {
+    withTempDir { tmp =>
+      Seq("2021-08-02", "2021/08/02").foreach { partitionValue =>
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        import spark.implicits._
+        val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt")
+        // Write a table by spark dataframe.
+        df.write.format("hudi")
+          .option(HoodieWriteConfig.TABLE_NAME.key, tableName)
+          .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
+          .option(RECORDKEY_FIELD.key, "id")
+          .option(PRECOMBINE_FIELD.key, "ts")
+          .option(PARTITIONPATH_FIELD.key, "dt")
+          .option(KEYGENERATOR_CLASS.key, classOf[SimpleKeyGenerator].getName)
+          .option(HoodieWriteConfig.INSERT_PARALLELISM.key, "1")
+          .option(HoodieWriteConfig.UPSERT_PARALLELISM.key, "1")
+          .mode(SaveMode.Overwrite)
+          .save(tablePath)
+
+        // Create a table over the exist old table.
+        spark.sql(
+          s"""
+             |create table $tableName using hudi
+             | options (
+             | primaryKey = 'id',
+             | preCombineField = 'ts'
+             |)
+             |partitioned by (dt)
+             |location '$tablePath'
+             |""".stripMargin)
+        checkAnswer(s"select id, name, value, ts, dt from $tableName")(
+          Seq(1, "a1", 10, 1000, partitionValue)
+        )
+        // Check the missing properties for spark sql
+        val metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(tablePath)
+          .setConf(spark.sessionState.newHadoopConf())
+          .build()
+        val properties = metaClient.getTableConfig.getProps.asScala.toMap
+        assertResult(true)(properties.contains(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA.key))
+        assertResult("dt")(properties(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key))
+        assertResult("ts")(properties(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key))
+
+        // Test insert into
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$partitionValue')")
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
+          Seq("1", partitionValue, 1, "a1", 10, 1000, partitionValue),
+          Seq("2", partitionValue, 2, "a2", 10, 1000, partitionValue)
+        )
+        // Test merge into
+        spark.sql(
+          s"""
+             |merge into $tableName h0
+             |using (select 1 as id, 'a1' as name, 11 as value, 1001 as ts, '$partitionValue' as dt) s0
+             |on h0.id = s0.id
+             |when matched then update set *
+             |""".stripMargin)
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
+          Seq("1", partitionValue, 1, "a1", 11, 1001, partitionValue),
+          Seq("2", partitionValue, 2, "a2", 10, 1000, partitionValue)
+        )
+        // Test update
+        spark.sql(s"update $tableName set value = value + 1 where id = 2")
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
+          Seq("1", partitionValue, 1, "a1", 11, 1001, partitionValue),
+          Seq("2", partitionValue, 2, "a2", 11, 1000, partitionValue)
+        )
+        // Test delete
+        spark.sql(s"delete from $tableName where id = 1")
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
+          Seq("2", partitionValue, 2, "a2", 11, 1000, partitionValue)
+        )
+      }
+    }
+  }
+
+  test("Test Create Table From Exist Hoodie Table For Multi-Level Partitioned Table") {
+    withTempDir { tmp =>
+      Seq("2021-08-02", "2021/08/02").foreach { day =>
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        import spark.implicits._
+        val df = Seq((1, "a1", 10, 1000, day, 12)).toDF("id", "name", "value", "ts", "day", "hh")
+        // Write a table by spark dataframe.
+        df.write.format("hudi")
+          .option(HoodieWriteConfig.TABLE_NAME.key, tableName)
+          .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
+          .option(RECORDKEY_FIELD.key, "id")
+          .option(PRECOMBINE_FIELD.key, "ts")
+          .option(PARTITIONPATH_FIELD.key, "day,hh")
+          .option(KEYGENERATOR_CLASS.key, classOf[ComplexKeyGenerator].getName)
+          .option(HoodieWriteConfig.INSERT_PARALLELISM.key, "1")
+          .option(HoodieWriteConfig.UPSERT_PARALLELISM.key, "1")
+          .mode(SaveMode.Overwrite)
+          .save(tablePath)
+
+        // Create a table over the exist old table.
+        spark.sql(
+          s"""
+             |create table $tableName using hudi
+             | options (
+             | primaryKey = 'id',
+             | preCombineField = 'ts'
+             |)
+             |partitioned by (day, hh)
+             |location '$tablePath'
+             |""".stripMargin)
+        checkAnswer(s"select id, name, value, ts, day, hh from $tableName")(
+          Seq(1, "a1", 10, 1000, day, 12)
+        )
+        // Check the missing properties for spark sql
+        val metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(tablePath)
+          .setConf(spark.sessionState.newHadoopConf())
+          .build()
+        val properties = metaClient.getTableConfig.getProps.asScala.toMap
+        assertResult(true)(properties.contains(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA.key))
+        assertResult("day,hh")(properties(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key))
+        assertResult("ts")(properties(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key))
+
+        // Test insert into
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$day', 12)")
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
+          Seq("id:1", s"$day/12", 1, "a1", 10, 1000, day, 12),
+          Seq("id:2", s"$day/12", 2, "a2", 10, 1000, day, 12)
+        )
+        // Test merge into
+        spark.sql(
+          s"""
+             |merge into $tableName h0
+             |using (select 1 as id, 'a1' as name, 11 as value, 1001 as ts, '$day' as day, 12 as hh) s0
+             |on h0.id = s0.id
+             |when matched then update set *
+             |""".stripMargin)
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
+          Seq("id:1", s"$day/12", 1, "a1", 11, 1001, day, 12),
+          Seq("id:2", s"$day/12", 2, "a2", 10, 1000, day, 12)
+        )
+        // Test update
+        spark.sql(s"update $tableName set value = value + 1 where id = 2")
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
+          Seq("id:1", s"$day/12", 1, "a1", 11, 1001, day, 12),
+          Seq("id:2", s"$day/12", 2, "a2", 11, 1000, day, 12)
+        )
+        // Test delete
+        spark.sql(s"delete from $tableName where id = 1")
+        checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
+          Seq("id:2", s"$day/12", 2, "a2", 11, 1000, day, 12)
+        )
+      }
+    }
+  }
+
+  test("Test Create Table From Exist Hoodie Table For None Partitioned Table") {
+    withTempDir{tmp =>
+      // Write a table by spark dataframe.
+      val tableName = generateTableName
+      import spark.implicits._
+      val df = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "ts")
+      df.write.format("hudi")
+        .option(HoodieWriteConfig.TABLE_NAME.key, tableName)
+        .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
+        .option(RECORDKEY_FIELD.key, "id")
+        .option(PRECOMBINE_FIELD.key, "ts")
+        .option(PARTITIONPATH_FIELD.key, "")
+        .option(KEYGENERATOR_CLASS.key, classOf[NonpartitionedKeyGenerator].getName)
+        .option(HoodieWriteConfig.INSERT_PARALLELISM.key, "1")
+        .option(HoodieWriteConfig.UPSERT_PARALLELISM.key, "1")
+        .mode(SaveMode.Overwrite)
+        .save(tmp.getCanonicalPath)
+
+      // Create a table over the exist old table.
+      spark.sql(
+        s"""
+           |create table $tableName using hudi
+           | options (
+           | primaryKey = 'id',
+           | preCombineField = 'ts'
+           |)
+           |location '${tmp.getCanonicalPath}'
+           |""".stripMargin)
+      checkAnswer(s"select id, name, value, ts from $tableName")(
+        Seq(1, "a1", 10, 1000)
+      )
+      // Check the missing properties for spark sql
+      val metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(tmp.getCanonicalPath)
+        .setConf(spark.sessionState.newHadoopConf())
+        .build()
+      val properties = metaClient.getTableConfig.getProps.asScala.toMap
+      assertResult(true)(properties.contains(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA.key))
+      assertResult("ts")(properties(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP.key))
+
+      // Test insert into
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+      checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts from $tableName order by id")(
+        Seq("1", "", 1, "a1", 10, 1000),
+        Seq("2", "", 2, "a2", 10, 1000)
+      )
+      // Test merge into
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (select 1 as id, 'a1' as name, 11 as value, 1001 as ts) s0
+           |on h0.id = s0.id
+           |when matched then update set *
+           |""".stripMargin)
+      checkAnswer(s"select id, name, value, ts from $tableName order by id")(
+        Seq(1, "a1", 11, 1001),
+        Seq(2, "a2", 10, 1000)
+      )
+      // Test update
+      spark.sql(s"update $tableName set value = value + 1 where id = 2")
+      checkAnswer(s"select id, name, value, ts from $tableName order by id")(
+        Seq(1, "a1", 11, 1001),
+        Seq(2, "a2", 11, 1000)
+      )
+      // Test delete
+      spark.sql(s"delete from $tableName where id = 1")
+      checkAnswer(s"select id, name, value, ts from $tableName order by id")(
+        Seq(2, "a2", 11, 1000)
+      )
+    }
+  }
+
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index 0def049..0dbb074 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -126,9 +126,9 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase {
     checkException(
       s"""
          |merge into $tableName2 t0
-         |using ( select 1 as id, 'a1' as name, 12 as price) s0
+         |using ( select 1 as id, 'a1' as name, 12 as price, 1000 as ts) s0
          |on t0.id = s0.id
-         |when matched then update set price = s0.price
+         |when matched then update set price = s0.price, _ts = s0.ts
         """.stripMargin)(
       "Missing specify the value for target field: 'id' in merge into update action for MOR table. " +
         "Currently we cannot support partial update for MOR, please complete all the target fields " +