You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/11/03 02:04:38 UTC

[hudi] branch master updated: [HUDI-2538] persist some configs to hoodie.properties when the first write (#3823)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6351e5f  [HUDI-2538] persist some configs to hoodie.properties when the first write (#3823)
6351e5f is described below

commit 6351e5f4d042b14cd0f6715c36f23d75fcc8e091
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Wed Nov 3 10:04:23 2021 +0800

    [HUDI-2538] persist some configs to hoodie.properties when the first write (#3823)
---
 .../table/upgrade/TwoToThreeUpgradeHandler.java    |   9 +-
 .../factory/HoodieSparkKeyGeneratorFactory.java    |  98 +++++++++------
 .../client/functional/TestHoodieMetadataBase.java  |   8 +-
 .../apache/hudi/keygen/TestCustomKeyGenerator.java |  13 +-
 .../hudi/testutils/HoodieClientTestHarness.java    |  13 +-
 .../apache/hudi/common/config/HoodieConfig.java    |   6 +-
 .../hudi/common/table/HoodieTableConfig.java       |  16 +++
 .../hudi/common/table/HoodieTableMetaClient.java   |  24 ++++
 .../common/testutils/HoodieCommonTestHarness.java  |   5 +
 .../main/scala/org/apache/hudi/DefaultSource.scala |  12 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 118 +++++++++++++-----
 .../org/apache/hudi/HoodieStreamingSink.scala      |   9 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   5 +-
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala |   9 +-
 .../hudi/command/CreateHoodieTableCommand.scala    |  83 ++++++++-----
 .../hudi/command/DeleteHoodieTableCommand.scala    |  13 +-
 .../command/InsertIntoHoodieTableCommand.scala     |  52 +++++---
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  60 ++++-----
 .../spark/sql/hudi/command/SqlKeyGenerator.scala   |  18 ++-
 .../hudi/command/UpdateHoodieTableCommand.scala    |  14 ++-
 .../apache/hudi/HoodieSparkSqlWriterSuite.scala    | 137 ++++++++++++++++-----
 .../org/apache/hudi/TestHoodieFileIndex.scala      |   9 ++
 .../functional/TestDataSourceForBootstrap.scala    |   8 +-
 .../apache/hudi/functional/TestMORDataSource.scala |   3 +
 .../hudi/functional/TestTimeTravelQuery.scala      |   1 +
 25 files changed, 540 insertions(+), 203 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
index 6a825e1..e1dbfbb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java
@@ -21,10 +21,11 @@ package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -39,6 +40,10 @@ public class TwoToThreeUpgradeHandler implements UpgradeHandler {
       // table has been updated and is not backward compatible.
       HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
     }
-    return Collections.emptyMap();
+    Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
+    tablePropsToAdd.put(HoodieTableConfig.URL_ENCODE_PARTITIONING, config.getStringOrDefault(HoodieTableConfig.URL_ENCODE_PARTITIONING));
+    tablePropsToAdd.put(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, config.getStringOrDefault(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
+    tablePropsToAdd.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, config.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME));
+    return tablePropsToAdd;
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index d4e99f7..165b27d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -19,14 +19,13 @@
 package org.apache.hudi.keygen.factory;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieKeyGeneratorException;
-import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.keygen.ComplexKeyGenerator;
 import org.apache.hudi.keygen.CustomKeyGenerator;
 import org.apache.hudi.keygen.GlobalDeleteKeyGenerator;
-import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
@@ -37,8 +36,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Locale;
-import java.util.Objects;
+import java.util.Map;
 
 /**
  * Factory help to create {@link org.apache.hudi.keygen.KeyGenerator}.
@@ -50,45 +50,73 @@ public class HoodieSparkKeyGeneratorFactory {
 
   private static final Logger LOG = LoggerFactory.getLogger(HoodieSparkKeyGeneratorFactory.class);
 
-  public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
-    // keyGenerator class name has higher priority
-    KeyGenerator keyGenerator = KeyGenUtils.createKeyGeneratorByClassName(props);
+  private static final Map<String, String> COMMON_TO_SPARK_KEYGENERATOR = new HashMap<>();
+  static {
+    COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.ComplexAvroKeyGenerator",
+        "org.apache.hudi.keygen.ComplexKeyGenerator");
+    COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.CustomAvroKeyGenerator",
+        "org.apache.hudi.keygen.CustomKeyGenerator");
+    COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.GlobalAvroDeleteKeyGenerator",
+        "org.apache.hudi.keygen.GlobalDeleteKeyGenerator");
+    COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator",
+        "org.apache.hudi.keygen.NonpartitionedKeyGenerator");
+    COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.SimpleAvroKeyGenerator",
+        "org.apache.hudi.keygen.SimpleKeyGenerator");
+    COMMON_TO_SPARK_KEYGENERATOR.put("org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator",
+        "org.apache.hudi.keygen.TimestampBasedKeyGenerator");
+  }
 
-    return Objects.isNull(keyGenerator) ? createKeyGeneratorByType(props) : keyGenerator;
+  public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
+    String keyGeneratorClass = getKeyGeneratorClassName(props);
+    try {
+      return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props);
+    } catch (Throwable e) {
+      throw new IOException("Could not load key generator class " + keyGeneratorClass, e);
+    }
   }
 
-  private static BuiltinKeyGenerator createKeyGeneratorByType(TypedProperties props) throws IOException {
-    // Use KeyGeneratorType.SIMPLE as default keyGeneratorType
-    String keyGeneratorType =
-        props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), null);
+  public static String getKeyGeneratorClassName(TypedProperties props) {
+    String keyGeneratorClass = props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), null);
 
-    if (StringUtils.isNullOrEmpty(keyGeneratorType)) {
+    if (StringUtils.isNullOrEmpty(keyGeneratorClass)) {
+      String keyGeneratorType = props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.SIMPLE.name());
       LOG.info("The value of {} is empty, use SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE.key());
-      keyGeneratorType = KeyGeneratorType.SIMPLE.name();
-    }
-
-    KeyGeneratorType keyGeneratorTypeEnum;
-    try {
-      keyGeneratorTypeEnum = KeyGeneratorType.valueOf(keyGeneratorType.toUpperCase(Locale.ROOT));
-    } catch (IllegalArgumentException e) {
-      throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType);
-    }
-    switch (keyGeneratorTypeEnum) {
-      case SIMPLE:
-        return new SimpleKeyGenerator(props);
-      case COMPLEX:
-        return new ComplexKeyGenerator(props);
-      case TIMESTAMP:
-        return new TimestampBasedKeyGenerator(props);
-      case CUSTOM:
-        return new CustomKeyGenerator(props);
-      case NON_PARTITION:
-        return new NonpartitionedKeyGenerator(props);
-      case GLOBAL_DELETE:
-        return new GlobalDeleteKeyGenerator(props);
-      default:
+      KeyGeneratorType keyGeneratorTypeEnum;
+      try {
+        keyGeneratorTypeEnum = KeyGeneratorType.valueOf(keyGeneratorType.toUpperCase(Locale.ROOT));
+      } catch (IllegalArgumentException e) {
         throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType);
+      }
+      switch (keyGeneratorTypeEnum) {
+        case SIMPLE:
+          keyGeneratorClass = SimpleKeyGenerator.class.getName();
+          break;
+        case COMPLEX:
+          keyGeneratorClass = ComplexKeyGenerator.class.getName();
+          break;
+        case TIMESTAMP:
+          keyGeneratorClass = TimestampBasedKeyGenerator.class.getName();
+          break;
+        case CUSTOM:
+          keyGeneratorClass = CustomKeyGenerator.class.getName();
+          break;
+        case NON_PARTITION:
+          keyGeneratorClass = NonpartitionedKeyGenerator.class.getName();
+          break;
+        case GLOBAL_DELETE:
+          keyGeneratorClass = GlobalDeleteKeyGenerator.class.getName();
+          break;
+        default:
+          throw new HoodieKeyGeneratorException("Unsupported keyGenerator Type " + keyGeneratorType);
+      }
     }
+    return keyGeneratorClass;
   }
 
+  /**
+   * Convert hoodie-common KeyGenerator to SparkKeyGeneratorInterface implement.
+   */
+  public static String convertToSparkKeyGenerator(String keyGeneratorClassName) {
+    return COMMON_TO_SPARK_KEYGENERATOR.getOrDefault(keyGeneratorClassName, keyGeneratorClassName);
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 7a49daf..cf261cc 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
@@ -33,6 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
@@ -50,6 +52,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Properties;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
@@ -268,6 +271,8 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
 
   protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
                                                             boolean enableMetrics, boolean enableFullScan) {
+    Properties properties = new Properties();
+    properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName());
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
         .withAutoCommit(autoCommit)
@@ -287,7 +292,8 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
         .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
             .withExecutorMetrics(true).build())
         .withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()
-            .usePrefix("unit-test").build());
+            .usePrefix("unit-test").build())
+        .withProperties(properties);
   }
 
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index 4bfc71f..4b590d9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -33,6 +33,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 
 public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
 
@@ -122,6 +124,13 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
     return properties;
   }
 
+  private String stackTraceToString(Throwable e) {
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    e.printStackTrace(pw);
+    return sw.toString();
+  }
+
   @Test
   public void testSimpleKeyGeneratorWithKeyGeneratorClass() throws IOException {
     testSimpleKeyGenerator(getPropertiesForSimpleKeyGen(true));
@@ -259,7 +268,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
             .getMessage()
             .contains("Property hoodie.datasource.write.recordkey.field not found"));
       } else {
-        Assertions.assertTrue(e.getMessage().contains("Property hoodie.datasource.write.recordkey.field not found"));
+        Assertions.assertTrue(stackTraceToString(e).contains("Property hoodie.datasource.write.recordkey.field not found"));
       }
 
     }
@@ -282,7 +291,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
             .getMessage()
             .contains("Property hoodie.datasource.write.recordkey.field not found"));
       } else {
-        Assertions.assertTrue(e.getMessage().contains("Property hoodie.datasource.write.recordkey.field not found"));
+        Assertions.assertTrue(stackTraceToString(e).contains("Property hoodie.datasource.write.recordkey.field not found"));
       }
     }
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 8c0a3bd..9ed98b1 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -249,7 +249,15 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
     initMetaClient(getTableType());
   }
 
+  protected void initMetaClient(Properties properties) throws IOException {
+    initMetaClient(getTableType(), properties);
+  }
+
   protected void initMetaClient(HoodieTableType tableType) throws IOException {
+    initMetaClient(tableType, new Properties());
+  }
+
+  protected void initMetaClient(HoodieTableType tableType, Properties properties) throws IOException {
     if (basePath == null) {
       throw new IllegalStateException("The base path has not been initialized.");
     }
@@ -258,7 +266,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
       throw new IllegalStateException("The Spark context has not been initialized.");
     }
 
-    metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
+    if (tableName != null && !tableName.isEmpty()) {
+      properties.put(HoodieTableConfig.NAME.key(), tableName);
+    }
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType, properties);
   }
 
   protected Properties getPropertiesForKeyGen() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
index 1f646aa..ed2b90e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java
@@ -74,6 +74,10 @@ public class HoodieConfig implements Serializable {
     }
   }
 
+  public Boolean contains(String key) {
+    return props.containsKey(key);
+  }
+
   public <T> boolean contains(ConfigProperty<T> configProperty) {
     if (props.containsKey(configProperty.key())) {
       return true;
@@ -135,7 +139,7 @@ public class HoodieConfig implements Serializable {
   public <T> boolean getBooleanOrDefault(ConfigProperty<T> configProperty) {
     Option<Object> rawValue = getRawValue(configProperty);
     return rawValue.map(v -> Boolean.parseBoolean(v.toString()))
-            .orElse((Boolean) configProperty.defaultValue());
+            .orElse(Boolean.parseBoolean(configProperty.defaultValue().toString()));
   }
 
   public <T> Long getLong(ConfigProperty<T> configProperty) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 129bcce..dc57fd1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -161,6 +162,9 @@ public class HoodieTableConfig extends HoodieConfig {
       .noDefaultValue()
       .withDocumentation("Key Generator class property for the hoodie table");
 
+  public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
+  public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
+
   public static final String NO_OP_BOOTSTRAP_INDEX_CLASS = NoOpBootstrapIndex.class.getName();
 
   public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) {
@@ -363,6 +367,18 @@ public class HoodieTableConfig extends HoodieConfig {
     return getString(RECORDKEY_FIELDS);
   }
 
+  public String getKeyGeneratorClassName() {
+    return getString(KEY_GENERATOR_CLASS_NAME);
+  }
+
+  public String getHiveStylePartitioningEnable() {
+    return getString(HIVE_STYLE_PARTITIONING_ENABLE);
+  }
+
+  public String getUrlEncodePartitoning() {
+    return getString(URL_ENCODE_PARTITIONING);
+  }
+
   public Map<String, String> propsMap() {
     return props.entrySet().stream()
         .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 340a99e..450a3cc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -637,6 +637,8 @@ public class HoodieTableMetaClient implements Serializable {
     private Boolean bootstrapIndexEnable;
     private Boolean populateMetaFields;
     private String keyGeneratorClassProp;
+    private Boolean hiveStylePartitioningEnable;
+    private Boolean urlEncodePartitioning;
 
     private PropertyBuilder() {
 
@@ -725,6 +727,16 @@ public class HoodieTableMetaClient implements Serializable {
       return this;
     }
 
+    public PropertyBuilder setHiveStylePartitioningEnable(Boolean hiveStylePartitioningEnable) {
+      this.hiveStylePartitioningEnable = hiveStylePartitioningEnable;
+      return this;
+    }
+
+    public PropertyBuilder setUrlEncodePartitioning(Boolean urlEncodePartitioning) {
+      this.urlEncodePartitioning = urlEncodePartitioning;
+      return this;
+    }
+
     public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) {
       return setTableType(metaClient.getTableType())
         .setTableName(metaClient.getTableConfig().getTableName())
@@ -786,6 +798,12 @@ public class HoodieTableMetaClient implements Serializable {
       if (hoodieConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)) {
         setKeyGeneratorClassProp(hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME));
       }
+      if (hoodieConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)) {
+        setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
+      }
+      if (hoodieConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING)) {
+        setUrlEncodePartitioning(hoodieConfig.getBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING));
+      }
       return this;
     }
 
@@ -849,6 +867,12 @@ public class HoodieTableMetaClient implements Serializable {
       if (null != keyGeneratorClassProp) {
         tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, keyGeneratorClassProp);
       }
+      if (null != hiveStylePartitioningEnable) {
+        tableConfig.setValue(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, Boolean.toString(hiveStylePartitioningEnable));
+      }
+      if (null != urlEncodePartitioning) {
+        tableConfig.setValue(HoodieTableConfig.URL_ENCODE_PARTITIONING, Boolean.toString(urlEncodePartitioning));
+      }
       return tableConfig.getProps();
     }
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index 9738816..311c131 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -34,12 +34,17 @@ import java.io.IOException;
  */
 public class HoodieCommonTestHarness {
 
+  protected String tableName = null;
   protected String basePath = null;
   protected transient HoodieTestDataGenerator dataGen = null;
   protected transient HoodieTableMetaClient metaClient;
   @TempDir
   public java.nio.file.Path tempDir;
 
+  protected void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
   /**
    * Initializes basePath.
    */
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 00133ab..a9d85af 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -154,14 +154,12 @@ class DefaultSource extends RelationProvider
                               mode: SaveMode,
                               optParams: Map[String, String],
                               df: DataFrame): BaseRelation = {
-    val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
-    val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
     val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
 
-    if (translatedOptions(OPERATION.key).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
-      HoodieSparkSqlWriter.bootstrap(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
+    if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
+      HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
     } else {
-      HoodieSparkSqlWriter.write(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
+      HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
     }
     new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
   }
@@ -170,11 +168,9 @@ class DefaultSource extends RelationProvider
                           optParams: Map[String, String],
                           partitionColumns: Seq[String],
                           outputMode: OutputMode): Sink = {
-    val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
-    val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
     new HoodieStreamingSink(
       sqlContext,
-      translatedOptions,
+      optParams,
       partitionColumns,
       outputMode)
   }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index e5c1a7a..1d0e8af 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -17,13 +17,13 @@
 
 package org.apache.hudi
 
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.DataSourceOptionsHelper.{allAlternatives, translateConfigurations}
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
 import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
@@ -31,12 +31,13 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
-import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
+import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils, StringUtils}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
 import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.index.SparkHoodieIndexFactory
 import org.apache.hudi.internal.DataSourceInternalWriterHelper
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.sync.common.AbstractSyncTool
@@ -51,9 +52,9 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
 import java.util
 import java.util.Properties
 
-import org.apache.hudi.index.SparkHoodieIndexFactory
-
 import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.mutable.StringBuilder
 import scala.collection.mutable.ListBuffer
 
 object HoodieSparkSqlWriter {
@@ -65,7 +66,7 @@ object HoodieSparkSqlWriter {
 
   def write(sqlContext: SQLContext,
             mode: SaveMode,
-            parameters: Map[String, String],
+            optParams: Map[String, String],
             df: DataFrame,
             hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
             hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
@@ -75,16 +76,23 @@ object HoodieSparkSqlWriter {
   : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
     SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
 
+    assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
+    val path = optParams("path")
+    val basePath = new Path(path)
     val sparkContext = sqlContext.sparkContext
-    val path = parameters.get("path")
-    val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
-    val tblNameOp = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
+    val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
+    tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
+    var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
+    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
+
+    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
+    val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
+      s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
+    assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)),
+      s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
+
     asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
     asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined
-    if (path.isEmpty) {
-      throw new HoodieException(s"'path' must be set.")
-    }
-    val tblName = tblNameOp.trim
     sparkContext.getConf.getOption("spark.serializer") match {
       case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") =>
       case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
@@ -105,12 +113,8 @@ object HoodieSparkSqlWriter {
     }
 
     val jsc = new JavaSparkContext(sparkContext)
-    val basePath = new Path(path.get)
     val instantTime = HoodieActiveTimeline.createNewInstantTime()
-    val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
-    tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
-    var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt)
-    val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(toProperties(parameters))
+    val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieConfig.getProps))
 
     if (mode == SaveMode.Ignore && tableExists) {
       log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
@@ -124,7 +128,7 @@ object HoodieSparkSqlWriter {
         val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
         val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
         val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
-        val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean
+        val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
 
         val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(tableType)
@@ -138,7 +142,9 @@ object HoodieSparkSqlWriter {
           .setPopulateMetaFields(populateMetaFields)
           .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
           .setKeyGeneratorClassProp(hoodieConfig.getString(KEYGENERATOR_CLASS_NAME))
-          .initTable(sparkContext.hadoopConfiguration, path.get)
+          .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
+          .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
+          .initTable(sparkContext.hadoopConfiguration, path)
         tableConfig = tableMetaClient.getTableConfig
       }
 
@@ -169,7 +175,7 @@ object HoodieSparkSqlWriter {
 
             // Create a HoodieWriteClient & issue the delete.
             val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
-              null, path.get, tblName,
+              null, path, tblName,
               mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
             .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
 
@@ -200,7 +206,7 @@ object HoodieSparkSqlWriter {
             }
             // Create a HoodieWriteClient & issue the delete.
             val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
-              null, path.get, tblName,
+              null, path, tblName,
               mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
               .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
             // Issue delete partitions
@@ -244,7 +250,7 @@ object HoodieSparkSqlWriter {
 
             val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema
             // Create a HoodieWriteClient & issue the write.
-            val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path.get,
+            val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path,
               tblName, mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
             )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
 
@@ -326,14 +332,21 @@ object HoodieSparkSqlWriter {
 
   def bootstrap(sqlContext: SQLContext,
                 mode: SaveMode,
-                parameters: Map[String, String],
+                optParams: Map[String, String],
                 df: DataFrame,
                 hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
                 hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = {
 
+    assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
+    val path = optParams("path")
+    val basePath = new Path(path)
     val sparkContext = sqlContext.sparkContext
-    val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set."))
-    val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
+    val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
+    tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
+    var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
+    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
+
+    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
     val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
     val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
     val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
@@ -349,10 +362,6 @@ object HoodieSparkSqlWriter {
       schema = HoodieAvroUtils.getNullSchema.toString
     }
 
-    val basePath = new Path(path)
-    val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
-    tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
-    val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
 
     // Handle various save modes
     if (mode == SaveMode.Ignore && tableExists) {
@@ -381,6 +390,8 @@ object HoodieSparkSqlWriter {
         .setPartitionFields(partitionColumns)
         .setPopulateMetaFields(populateMetaFields)
         .setKeyGeneratorClassProp(keyGenProp)
+        .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
+        .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
         .initTable(sparkContext.hadoopConfiguration, path)
     }
 
@@ -401,7 +412,7 @@ object HoodieSparkSqlWriter {
                       df: DataFrame,
                       tblName: String,
                       basePath: Path,
-                      path: Option[String],
+                      path: String,
                       instantTime: String,
                       partitionColumns: String): (Boolean, common.util.Option[String]) = {
     val sparkContext = sqlContext.sparkContext
@@ -424,7 +435,7 @@ object HoodieSparkSqlWriter {
       throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
     }
     val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key, schema.toString)
-    val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path.get, tblName, mapAsJavaMap(params))
+    val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
     val bulkInsertPartitionerRows : BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
       val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
       if (userDefinedBulkInsertPartitionerOpt.isPresent)  {
@@ -699,4 +710,49 @@ object HoodieSparkSqlWriter {
       null
     }
   }
+
+  private def validateTableConfig(spark: SparkSession, params: Map[String, String],
+      tableConfig: HoodieTableConfig): Unit = {
+    val resolver = spark.sessionState.conf.resolver
+    val diffConfigs = StringBuilder.newBuilder
+    params.foreach { case (key, value) =>
+      val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
+      if (null != existingValue && !resolver(existingValue, value)) {
+        diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+      }
+    }
+    if (diffConfigs.nonEmpty) {
+      diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
+      throw new HoodieException(diffConfigs.toString.trim)
+    }
+  }
+
+  private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
+      tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
+    val mergedParams = mutable.Map.empty ++
+      DataSourceWriteOptions.translateSqlOptions(HoodieWriterUtils.parametersWithWriteDefaults(optParams))
+    if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
+      && mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
+      mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key)
+    }
+    if (null != tableConfig) {
+      tableConfig.getProps.foreach { case (key, value) =>
+        mergedParams(key) = value
+      }
+    }
+    val params = mergedParams.toMap
+    (params, HoodieWriterUtils.convertMapToHoodieConfig(params))
+  }
+
+  private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieTableConfig, key: String): String = {
+    if (null == tableConfig) {
+      null
+    } else {
+      if (allAlternatives.contains(key)) {
+        tableConfig.getString(allAlternatives(key))
+      } else {
+        tableConfig.getString(key)
+      }
+    }
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index b1f8eb5..6e736d2 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -48,9 +48,12 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
 
-  private val retryCnt = options(DataSourceWriteOptions.STREAMING_RETRY_CNT.key).toInt
-  private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key).toLong
-  private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key).toBoolean
+  private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
+    DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
+  private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
+    DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
+  private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
+    DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
 
   private var isAsyncCompactorServiceShutdownAbnormally = false
   private var isAsyncClusteringServiceShutdownAbnormally = false
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index bdb2afb..0e3ede1 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -92,10 +92,9 @@ object HoodieWriterUtils {
    * @return
    */
   def getPartitionColumns(parameters: Map[String, String]): String = {
-    val props = new TypedProperties()
+    val props = new Properties()
     props.putAll(parameters.asJava)
-    val keyGen = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
-    HoodieSparkUtils.getPartitionColumns(keyGen, props)
+    HoodieSparkUtils.getPartitionColumns(props)
   }
 
   def convertMapToHoodieConfig(parameters: Map[String, String]): HoodieConfig = {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 25d3026..963035c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -120,8 +120,13 @@ object HoodieOptionConfig {
    */
   def mappingSqlOptionToTableConfig(options: Map[String, String]): Map[String, String] = {
     defaultTableConfig ++
-      options.filterKeys(k => keyTableConfigMapping.contains(k))
-        .map(kv => keyTableConfigMapping(kv._1) -> valueMapping.getOrElse(kv._2, kv._2))
+      options.map { case (k, v) =>
+        if (keyTableConfigMapping.contains(k)) {
+          keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v)
+        } else {
+          k -> v
+        }
+      }
   }
 
   /**
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index ec1f746..8ac6312 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -41,8 +41,12 @@ import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOL
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.{SPARK_VERSION, SparkConf}
-
 import java.util.{Locale, Properties}
+
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -90,35 +94,13 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
         .setBasePath(path)
         .setConf(conf)
         .build()
-     val tableSchema = getTableSqlSchema(metaClient)
-
-     // Get options from the external table and append with the options in ddl.
-     val originTableConfig =  HoodieOptionConfig.mappingTableConfigToSqlOption(
-       metaClient.getTableConfig.getProps.asScala.toMap)
-
-     val allPartitionPaths = getAllPartitionPaths(sparkSession, table)
-     var upgrateConfig = Map.empty[String, String]
-     // If this is a non-hive-styled partition table, disable the hive style config.
-     // (By default this config is enable for spark sql)
-     upgrateConfig = if (!isHiveStyledPartitioning(allPartitionPaths, table)) {
-        upgrateConfig + (DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false")
-     } else {
-       upgrateConfig
-     }
-      upgrateConfig = if (!isUrlEncodeEnabled(allPartitionPaths, table)) {
-        upgrateConfig + (DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key -> "false")
-      } else {
-        upgrateConfig
-      }
+      val tableSchema = getTableSqlSchema(metaClient)
 
-      // Use the origin keygen to generate record key to keep the rowkey consistent with the old table for spark sql.
-      // See SqlKeyGenerator#getRecordKey for detail.
-      upgrateConfig = if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
-        upgrateConfig + (SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
-      } else {
-        upgrateConfig
-      }
-      val options = originTableConfig ++ upgrateConfig ++ table.storage.properties
+      // Get options from the external table and append with the options in ddl.
+      val originTableConfig =  HoodieOptionConfig.mappingTableConfigToSqlOption(
+        metaClient.getTableConfig.getProps.asScala.toMap)
+      val extraConfig = extraTableConfig(sparkSession, isTableExists, originTableConfig)
+      val options = originTableConfig ++ table.storage.properties ++ extraConfig
 
       val userSpecifiedSchema = table.schema
       if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) {
@@ -137,7 +119,8 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
           s". The associated location('$path') already exists.")
       }
       // Add the meta fields to the schema if this is a managed table or an empty external table.
-      (addMetaFields(table.schema), table.storage.properties)
+      val options = table.storage.properties ++ extraTableConfig(sparkSession, false)
+      (addMetaFields(table.schema), options)
     }
 
     val tableType = HoodieOptionConfig.getTableType(table.storage.properties)
@@ -314,6 +297,43 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
           s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'")
     }
   }
+
+  def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean,
+      originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
+    val extraConfig = mutable.Map.empty[String, String]
+    if (isTableExists) {
+      val allPartitionPaths = getAllPartitionPaths(sparkSession, table)
+      if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
+        extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
+          originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
+      } else {
+        extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
+          String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table))
+      }
+      if (originTableConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)) {
+        extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
+        originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
+      } else {
+        extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
+          String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table))
+      }
+    } else {
+      extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true"
+      extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue()
+    }
+
+    val primaryColumns = HoodieOptionConfig.getPrimaryColumns(originTableConfig ++ table.storage.properties)
+    if (primaryColumns.isEmpty) {
+      extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[UuidKeyGenerator].getCanonicalName
+    } else if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
+      extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
+        HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
+          originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
+    } else {
+      extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName
+    }
+    extraConfig.toMap
+  }
 }
 
 object CreateHoodieTableCommand extends Logging {
@@ -342,6 +362,9 @@ object CreateHoodieTableCommand extends Logging {
     checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PRECOMBINE_FIELD.key)
     checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_FIELDS.key)
     checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.RECORDKEY_FIELDS.key)
+    checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
+    checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
+    checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
     // Save all the table config to the hoodie.properties.
     val parameters = originTableConfig ++ tableOptions
     val properties = new Properties()
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index 4d6d0a2..987ce0e 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hudi.command
 
 import org.apache.hudi.DataSourceWriteOptions.{OPERATION, _}
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
 import org.apache.hudi.hive.ddl.HiveSyncMode
@@ -58,7 +59,12 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
     val targetTable = sparkSession.sessionState.catalog
       .getTableMetadata(tableId)
     val path = getTableLocation(targetTable, sparkSession)
-
+    val conf = sparkSession.sessionState.newHadoopConf()
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(path)
+      .setConf(conf)
+      .build()
+    val tableConfig = metaClient.getTableConfig
     val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
 
     assert(primaryColumns.nonEmpty,
@@ -66,13 +72,14 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
     withSparkConf(sparkSession, targetTable.storage.properties) {
       Map(
         "path" -> path,
-        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
         TBL_NAME.key -> tableId.table,
+        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
+        KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
         OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
         PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
         HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
         HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
-        HIVE_STYLE_PARTITIONING.key -> "true",
         HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
         SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
       )
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index e1c61ed..2b88373 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -21,12 +21,14 @@ import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
 import org.apache.hudi.exception.HoodieDuplicateKeyException
 import org.apache.hudi.hive.MultiPartKeysValueExtractor
 import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
 import org.apache.spark.internal.Logging
@@ -90,7 +92,6 @@ object InsertIntoHoodieTableCommand extends Logging {
       // for insert into or insert overwrite partition we use append mode.
       SaveMode.Append
     }
-    val parameters = HoodieWriterUtils.parametersWithWriteDefaults(config)
     val conf = sparkSession.sessionState.conf
     val alignedQuery = alignOutputFields(query, table, insertPartitions, conf)
     // If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery),
@@ -100,7 +101,7 @@ object InsertIntoHoodieTableCommand extends Logging {
     val inputDF = sparkSession.createDataFrame(
       Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
     val success =
-      HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, inputDF)._1
+      HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, inputDF)._1
     if (success) {
       if (refreshTable) {
         sparkSession.catalog.refreshTable(table.identifier.unquotedString)
@@ -197,19 +198,42 @@ object InsertIntoHoodieTableCommand extends Logging {
     val parameters = withSparkConf(sparkSession, options)()
 
     val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue)
-
-    val partitionFields = table.partitionColumnNames.mkString(",")
-    val path = getTableLocation(table, sparkSession)
-
-    val tableSchema = table.schema
-
     val primaryColumns = HoodieOptionConfig.getPrimaryColumns(options)
+    val partitionFields = table.partitionColumnNames.mkString(",")
 
-    val keyGenClass = if (primaryColumns.nonEmpty) {
-      classOf[SqlKeyGenerator].getCanonicalName
+    val path = getTableLocation(table, sparkSession)
+    val conf = sparkSession.sessionState.newHadoopConf()
+    val isTableExists = tableExistsInPath(path, conf)
+    val tableConfig = if (isTableExists) {
+      HoodieTableMetaClient.builder()
+        .setBasePath(path)
+        .setConf(conf)
+        .build()
+        .getTableConfig
     } else {
-      classOf[UuidKeyGenerator].getName
+      null
     }
+    val hiveStylePartitioningEnable = if (null == tableConfig || null == tableConfig.getHiveStylePartitioningEnable) {
+      "true"
+    } else {
+      tableConfig.getHiveStylePartitioningEnable
+    }
+    val urlEncodePartitioning = if (null == tableConfig || null == tableConfig.getUrlEncodePartitoning) {
+      "false"
+    } else {
+      tableConfig.getUrlEncodePartitoning
+    }
+    val keyGeneratorClassName = if (null == tableConfig || null == tableConfig.getKeyGeneratorClassName) {
+      if (primaryColumns.nonEmpty) {
+        classOf[ComplexKeyGenerator].getCanonicalName
+      } else {
+        classOf[UuidKeyGenerator].getCanonicalName
+      }
+    } else {
+      tableConfig.getKeyGeneratorClassName
+    }
+
+    val tableSchema = table.schema
 
     val dropDuplicate = sparkSession.conf
       .getOption(INSERT_DROP_DUPS.key)
@@ -267,7 +291,9 @@ object InsertIntoHoodieTableCommand extends Logging {
         TBL_NAME.key -> table.identifier.table,
         PRECOMBINE_FIELD.key -> tableSchema.fields.last.name,
         OPERATION.key -> operation,
-        KEYGENERATOR_CLASS_NAME.key -> keyGenClass,
+        HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
+        URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
+        KEYGENERATOR_CLASS_NAME.key -> keyGeneratorClassName,
         RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
         PARTITIONPATH_FIELD.key -> partitionFields,
         PAYLOAD_CLASS_NAME.key -> payloadClassName,
@@ -279,10 +305,8 @@ object InsertIntoHoodieTableCommand extends Logging {
         HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"),
         HIVE_TABLE.key -> table.identifier.table,
         HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
-        HIVE_STYLE_PARTITIONING.key -> "true",
         HIVE_PARTITION_FIELDS.key -> partitionFields,
         HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
-        URL_ENCODE_PARTITIONING.key -> "true",
         HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200",
         HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
         SqlKeyGenerator.PARTITION_SCHEMA -> table.partitionSchema.toDDL
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index b22c607..dd1be20 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command
 
 import org.apache.avro.Schema
 import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
 import org.apache.hudi.hive.MultiPartKeysValueExtractor
@@ -34,7 +35,6 @@ import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
 import org.apache.spark.sql.hudi.{HoodieOptionConfig, SerDeUtils}
 import org.apache.spark.sql.types.{BooleanType, StructType}
-
 import java.util.Base64
 
 /**
@@ -419,7 +419,12 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
     val targetTableDb = targetTableIdentify.database.getOrElse("default")
     val targetTableName = targetTableIdentify.identifier
     val path = getTableLocation(targetTable, sparkSession)
-
+    val conf = sparkSession.sessionState.newHadoopConf()
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(path)
+      .setConf(conf)
+      .build()
+    val tableConfig = metaClient.getTableConfig
     val options = targetTable.storage.properties
     val definedPk = HoodieOptionConfig.getPrimaryColumns(options)
     // TODO Currently the mergeEqualConditionKeys must be the same the primary key.
@@ -429,31 +434,30 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
     }
     // Enable the hive sync by default if spark have enable the hive metastore.
     val enableHive = isEnableHive(sparkSession)
-    HoodieWriterUtils.parametersWithWriteDefaults(
-      withSparkConf(sparkSession, options) {
-        Map(
-          "path" -> path,
-          RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","),
-          KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-          PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field
-          TBL_NAME.key -> targetTableName,
-          PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
-          PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
-          META_SYNC_ENABLED.key -> enableHive.toString,
-          HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
-          HIVE_USE_JDBC.key -> "false",
-          HIVE_DATABASE.key -> targetTableDb,
-          HIVE_TABLE.key -> targetTableName,
-          HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
-          HIVE_STYLE_PARTITIONING.key -> "true",
-          HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
-          HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
-          URL_ENCODE_PARTITIONING.key -> "true", // enable the url decode for sql.
-          HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql
-          HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
-          HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
-          SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
-        )
-      })
+    withSparkConf(sparkSession, options) {
+      Map(
+        "path" -> path,
+        RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","),
+        PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field
+        TBL_NAME.key -> targetTableName,
+        PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
+        PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
+        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
+        KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
+        META_SYNC_ENABLED.key -> enableHive.toString,
+        HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+        HIVE_USE_JDBC.key -> "false",
+        HIVE_DATABASE.key -> targetTableDb,
+        HIVE_TABLE.key -> targetTableName,
+        HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+        HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
+        HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
+        HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql
+        HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
+        HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
+        SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
+      )
+    }
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index b59984a..e069df9 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -18,11 +18,13 @@
 package org.apache.spark.sql.hudi.command
 
 import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}
+
 import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.keygen.{BaseKeyGenerator, ComplexKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface}
+import org.apache.hudi.keygen._
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{StructType, TimestampType}
 import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
@@ -48,7 +50,8 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
       val keyGenProps = new TypedProperties()
       keyGenProps.putAll(props)
       keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME)
-      keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, beforeKeyGenClassName)
+      val convertedKeyGenClassName = SqlKeyGenerator.getRealKeyGenClassName(props)
+      keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)
       Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
     } else {
       None
@@ -64,7 +67,7 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
   }
 
   override def getRecordKey(row: Row): String = {
-    if (originKeyGen.isDefined && originKeyGen.get.isInstanceOf[SparkKeyGeneratorInterface]) {
+    if (originKeyGen.isDefined) {
       originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
     } else {
       super.getRecordKey(row)
@@ -121,4 +124,13 @@ object SqlKeyGenerator {
   val ORIGIN_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class"
   private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
   private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S")
+
+  def getRealKeyGenClassName(props: TypedProperties): String = {
+    val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
+    if (beforeKeyGenClassName != null) {
+      HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName)
+    } else {
+      classOf[ComplexKeyGenerator].getCanonicalName
+    }
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
index 20a8274..b1c8a04 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command
 
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
 import org.apache.hudi.hive.MultiPartKeysValueExtractor
@@ -85,7 +86,12 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
     val targetTable = sparkSession.sessionState.catalog
       .getTableMetadata(tableId)
     val path = getTableLocation(targetTable, sparkSession)
-
+    val conf = sparkSession.sessionState.newHadoopConf()
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(path)
+      .setConf(conf)
+      .build()
+    val tableConfig = metaClient.getTableConfig
     val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
 
     assert(primaryColumns.nonEmpty,
@@ -95,9 +101,11 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
       Map(
         "path" -> path,
         RECORDKEY_FIELD.key -> primaryColumns.mkString(","),
-        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
         PRECOMBINE_FIELD.key -> primaryColumns.head, //set the default preCombine field.
         TBL_NAME.key -> tableId.table,
+        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning,
+        KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName,
         OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
         PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
         META_SYNC_ENABLED.key -> enableHive.toString,
@@ -107,9 +115,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
         HIVE_TABLE.key -> tableId.table,
         HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","),
         HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
-        URL_ENCODE_PARTITIONING.key -> "true",
         HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
-        HIVE_STYLE_PARTITIONING.key -> "true",
         HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
         SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL
       )
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
index ff95e87..96fb18d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
 import org.apache.hudi.functional.TestBootstrap
 import org.apache.hudi.hive.HiveSyncConfig
-import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
+import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.testutils.DataSourceTestUtils
 import org.apache.spark.SparkContext
 import org.apache.spark.api.java.JavaSparkContext
@@ -48,8 +48,10 @@ import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, inte
 import java.time.Instant
 import java.util
 import java.util.{Collections, Date, UUID}
+
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters
+import scala.util.control.NonFatal
 
 /**
  * Test suite for SparkSqlWriter class.
@@ -161,7 +163,6 @@ class HoodieSparkSqlWriterSuite {
       .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
       .updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields))
       .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), sortMode.name())
-    val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
 
     // generate the inserts
     val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -175,7 +176,7 @@ class HoodieSparkSqlWriterSuite {
     val recordsSeq = convertRowListToSeq(records)
     val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
     // write to Hudi
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
 
     // collect all partition paths to issue read of parquet files
     val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
@@ -242,21 +243,19 @@ class HoodieSparkSqlWriterSuite {
     //create a new table
     val fooTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
       "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
-    val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
     val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, dataFrame)
 
     //on same path try append with different("hoodie_bar_tbl") table name which should throw an exception
     val barTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> "hoodie_bar_tbl",
       "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4")
-    val barTableParams = HoodieWriterUtils.parametersWithWriteDefaults(barTableModifier)
     val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
-    val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableParams, dataFrame2))
+    val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableModifier, dataFrame2))
     assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
 
     //on same path try append with delete operation and different("hoodie_bar_tbl") table name which should throw an exception
-    val deleteTableParams = barTableParams ++ Map(OPERATION.key -> "delete")
-    val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableParams, dataFrame2))
+    val deleteTableModifier = barTableModifier ++ Map(OPERATION.key -> "delete")
+    val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableModifier, dataFrame2))
     assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist"))
   }
 
@@ -295,7 +294,6 @@ class HoodieSparkSqlWriterSuite {
         .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
         .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
         .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name())
-      val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
 
       // generate the inserts
       val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -304,7 +302,7 @@ class HoodieSparkSqlWriterSuite {
       val df = spark.createDataFrame(sc.parallelize(inserts), structType)
       try {
         // write to Hudi
-        HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
+        HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
         Assertions.fail("Should have thrown exception")
       } catch {
         case e: HoodieException => assertTrue(e.getMessage.contains("hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back"))
@@ -323,7 +321,6 @@ class HoodieSparkSqlWriterSuite {
         .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
         .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
         .updated(INSERT_DROP_DUPS.key, "true")
-      val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
 
       // generate the inserts
       val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -332,7 +329,7 @@ class HoodieSparkSqlWriterSuite {
       val recordsSeq = convertRowListToSeq(records)
       val df = spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType)
       // write to Hudi
-      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
       fail("Drop duplicates with bulk insert in row writing should have thrown exception")
     } catch {
       case e: HoodieException => assertTrue(e.getMessage.contains("Dropping duplicates with bulk_insert in row writer path is not supported yet"))
@@ -348,7 +345,6 @@ class HoodieSparkSqlWriterSuite {
     //create a new table
     val fooTableModifier = commonTableModifier.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .updated(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "false")
-    val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
 
     // generate the inserts
     val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -357,7 +353,7 @@ class HoodieSparkSqlWriterSuite {
     val recordsSeq = convertRowListToSeq(records)
     val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
     // write to Hudi
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams - DataSourceWriteOptions.PRECOMBINE_FIELD.key, df)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier - DataSourceWriteOptions.PRECOMBINE_FIELD.key, df)
 
     // collect all partition paths to issue read of parquet files
     val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
@@ -384,7 +380,6 @@ class HoodieSparkSqlWriterSuite {
     val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
       .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
       .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
-    val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
     val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
       HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
     val fullPartitionPaths = new Array[String](3)
@@ -400,7 +395,7 @@ class HoodieSparkSqlWriterSuite {
       val recordsSeq = convertRowListToSeq(records)
       val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
       // write to Hudi
-      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df)
       // Fetch records from entire dataset
       val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
       // remove metadata columns so that expected and actual DFs can be compared as is
@@ -450,7 +445,7 @@ class HoodieSparkSqlWriterSuite {
       new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
       mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
 
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df, Option.empty, Option(client))
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df, Option.empty, Option(client))
     // Verify that asynchronous compaction is not scheduled
     verify(client, times(0)).scheduleCompaction(any())
     // Verify that HoodieWriteClient is closed correctly
@@ -504,14 +499,14 @@ class HoodieSparkSqlWriterSuite {
       val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
       initializeMetaClientForBootstrap(fooTableParams, tableType, true)
 
-         val client = spy(DataSourceUtils.createHoodieClient(
+      val client = spy(DataSourceUtils.createHoodieClient(
         new JavaSparkContext(sc),
         null,
         tempBasePath,
         hoodieFooTableName,
         mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
 
-      HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append, fooTableParams, spark.emptyDataFrame, Option.empty,
+      HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Append, fooTableModifier, spark.emptyDataFrame, Option.empty,
         Option(client))
 
       // Verify that HoodieWriteClient is closed correctly
@@ -556,7 +551,6 @@ class HoodieSparkSqlWriterSuite {
     //create a new table
     val fooTableModifier = getCommonParams(tempPath, hoodieFooTableName, tableType)
       .updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "true")
-    val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
 
     // generate the inserts
     val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -564,7 +558,7 @@ class HoodieSparkSqlWriterSuite {
     var records = DataSourceTestUtils.generateRandomRows(10)
     var recordsSeq = convertRowListToSeq(records)
     val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableModifier, df1)
 
     val snapshotDF1 = spark.read.format("org.apache.hudi")
       .load(tempBasePath + "/*/*/*/*")
@@ -577,7 +571,7 @@ class HoodieSparkSqlWriterSuite {
     // issue updates so that log files are created for MOR table
     val updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
     val updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, updatesDf)
 
     val snapshotDF2 = spark.read.format("org.apache.hudi")
       .load(tempBasePath + "/*/*/*/*")
@@ -595,7 +589,7 @@ class HoodieSparkSqlWriterSuite {
     recordsSeq = convertRowListToSeq(records)
     val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), evolStructType)
     // write to Hudi with new column
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df3)
 
     val snapshotDF3 = spark.read.format("org.apache.hudi")
       .load(tempBasePath + "/*/*/*/*")
@@ -610,7 +604,7 @@ class HoodieSparkSqlWriterSuite {
     records = DataSourceTestUtils.generateRandomRows(10)
     recordsSeq = convertRowListToSeq(records)
     val df4 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df4)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df4)
 
     val snapshotDF4 = spark.read.format("org.apache.hudi")
       .load(tempBasePath + "/*/*/*/*")
@@ -743,14 +737,13 @@ class HoodieSparkSqlWriterSuite {
   @ValueSource(booleans = Array(true, false))
   def testDeletePartitionsV2(usePartitionsToDeleteConfig: Boolean): Unit = {
     val fooTableModifier = getCommonParams(tempPath, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name())
-    val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
     val schema = DataSourceTestUtils.getStructTypeExampleSchema
     val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
     val records = DataSourceTestUtils.generateRandomRows(10)
     val recordsSeq = convertRowListToSeq(records)
     val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
     // write to Hudi
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableModifier, df1)
     val snapshotDF1 = spark.read.format("org.apache.hudi")
       .load(tempBasePath + "/*/*/*/*")
     assertEquals(10, snapshotDF1.count())
@@ -761,7 +754,7 @@ class HoodieSparkSqlWriterSuite {
     val updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
     val updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
     // write updates to Hudi
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, updatesDf)
     val snapshotDF2 = spark.read.format("org.apache.hudi")
       .load(tempBasePath + "/*/*/*/*")
     assertEquals(10, snapshotDF2.count())
@@ -770,7 +763,7 @@ class HoodieSparkSqlWriterSuite {
     // ensure 2nd batch of updates matches.
     assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)
     if (usePartitionsToDeleteConfig) {
-      fooTableParams.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+      fooTableModifier.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
     }
     // delete partitions contains the primary key
     val recordsToDelete = df1.filter(entry => {
@@ -778,7 +771,7 @@ class HoodieSparkSqlWriterSuite {
       partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) ||
         partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)
     })
-    val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
+    val updatedParams = fooTableModifier.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
     HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete)
     val snapshotDF3 = spark.read.format("org.apache.hudi")
       .load(tempBasePath + "/*/*/*/*")
@@ -819,4 +812,88 @@ class HoodieSparkSqlWriterSuite {
       assert(spark.read.format("hudi").load(tempBasePath).where("age >= 2000").count() == 10)
     }
   }
+
+  /**
+   * Test case for no need to specify hiveStylePartitioning/urlEncodePartitioning/KeyGenerator included in HoodieTableConfig except the first time write
+   */
+  @Test
+  def testToWriteWithoutParametersIncludedInHoodieTableConfig(): Unit = {
+    val _spark = spark
+    import _spark.implicits._
+    val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
+    val options = Map(
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+      DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
+    )
+
+    // case 1: test table which created by sql
+    val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1")
+    spark.sql(
+      s"""
+         | create table $tableName1 (
+         |   id int,
+         |   name string,
+         |   price double,
+         |   ts long,
+         |   dt string
+         | ) using hudi
+         | partitioned by (dt)
+         | options (
+         |  primaryKey = 'id'
+         | )
+         | location '$tablePath1'
+       """.stripMargin)
+    val tableConfig1 = HoodieTableMetaClient.builder()
+      .setConf(spark.sparkContext.hadoopConfiguration)
+      .setBasePath(tablePath1).build().getTableConfig
+    assert(tableConfig1.getHiveStylePartitioningEnable == "true")
+    assert(tableConfig1.getUrlEncodePartitoning == "false")
+    assert(tableConfig1.getKeyGeneratorClassName == classOf[ComplexKeyGenerator].getName)
+    df.write.format("hudi")
+      .options(options)
+      .option(HoodieWriteConfig.TBL_NAME.key, tableName1)
+      .mode(SaveMode.Append).save(tablePath1)
+    assert(spark.read.format("hudi").load(tablePath1 + "/*").count() == 1)
+
+    // case 2: test table which created by dataframe
+    val (tableName2, tablePath2) = ("hoodie_test_params_2", s"$tempBasePath" + "_2")
+    // the first write need to specify params
+    df.write.format("hudi")
+      .options(options)
+      .option(HoodieWriteConfig.TBL_NAME.key, tableName2)
+      .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key, "true")
+      .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
+      .mode(SaveMode.Overwrite).save(tablePath2)
+    val tableConfig2 = HoodieTableMetaClient.builder()
+      .setConf(spark.sparkContext.hadoopConfiguration)
+      .setBasePath(tablePath2).build().getTableConfig
+    assert(tableConfig2.getHiveStylePartitioningEnable == "false")
+    assert(tableConfig2.getUrlEncodePartitoning == "true")
+    assert(tableConfig2.getKeyGeneratorClassName == classOf[SimpleKeyGenerator].getName)
+
+    val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
+    // raise exception when use params which is not same with HoodieTableConfig
+    try {
+      df2.write.format("hudi")
+        .options(options)
+        .option(HoodieWriteConfig.TBL_NAME.key, tableName2)
+        .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
+        .mode(SaveMode.Append).save(tablePath2)
+    } catch {
+      case NonFatal(e) =>
+        assert(e.getMessage.contains("Config conflict"))
+        assert(e.getMessage.contains(
+          s"${HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key}\t${classOf[ComplexKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))
+    }
+
+    // do not need to specify hiveStylePartitioning/urlEncodePartitioning/KeyGenerator params
+    df2.write.format("hudi")
+      .options(options)
+      .option(HoodieWriteConfig.TBL_NAME.key, tableName2)
+      .mode(SaveMode.Append).save(tablePath2)
+    val data = spark.read.format("hudi").load(tablePath2 + "/*")
+    assert(data.count() == 2)
+    assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "dt=2021-10-16")
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 94e9620..7c58cc0 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -17,6 +17,8 @@
 
 package org.apache.hudi
 
+import java.util.Properties
+
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -58,6 +60,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
   )
 
   @BeforeEach override def setUp() {
+    setTableName("hoodie_test")
     initPath()
     initSparkContexts()
     spark = sqlContext.sparkSession
@@ -71,6 +74,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testPartitionSchema(partitionEncode: Boolean): Unit = {
+    val props = new Properties()
+    props.setProperty(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key, String.valueOf(partitionEncode))
+    initMetaClient(props)
     val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100)
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
     inputDF1.write.format("hudi")
@@ -128,6 +134,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testPartitionPruneWithPartitionEncode(partitionEncode: Boolean): Unit = {
+    val props = new Properties()
+    props.setProperty(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key, String.valueOf(partitionEncode))
+    initMetaClient(props)
     val partitions = Array("2021/03/08", "2021/03/09", "2021/03/10", "2021/03/11", "2021/03/12")
     val newDataGen =  new HoodieTestDataGenerator(partitions)
     val records1 = newDataGen.generateInsertsContainsAllPartitions("000", 100)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 8fc6e7f..d6ae80d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -154,7 +154,9 @@ class TestDataSourceForBootstrap {
 
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
-      DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
+      DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
+      Some("datestr"),
+      Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"))
 
     // Read bootstrapped table and verify count
     val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -473,11 +475,13 @@ class TestDataSourceForBootstrap {
   }
 
   def runMetadataBootstrapAndVerifyCommit(tableType: String,
-                                          partitionColumns: Option[String] = None): String = {
+                                          partitionColumns: Option[String] = None,
+                                          extraOpts: Map[String, String] = Map.empty): String = {
     val bootstrapDF = spark.emptyDataFrame
     bootstrapDF.write
       .format("hudi")
       .options(commonOpts)
+      .options(extraOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionColumns.getOrElse(""))
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index ee914ae..eba2a3d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -17,6 +17,8 @@
 
 package org.apache.hudi.functional
 
+import java.util.Properties
+
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -61,6 +63,7 @@ class TestMORDataSource extends HoodieClientTestBase {
   val updatedVerificationVal: String = "driver_update"
 
   @BeforeEach override def setUp() {
+    setTableName("hoodie_test")
     initPath()
     initSparkContexts()
     spark = sqlContext.sparkSession
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
index bb102a4..9482ae3 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
@@ -46,6 +46,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
   )
 
   @BeforeEach override def setUp() {
+    setTableName("hoodie_test")
     initPath()
     initSparkContexts()
     spark = sqlContext.sparkSession