You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/01/23 06:11:48 UTC

[hudi] branch master updated: [HUDI-2837] Add support for using database name in incremental query (#4083)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 56cd8ff  [HUDI-2837] Add support for using database name in incremental query (#4083)
56cd8ff is described below

commit 56cd8ffae0aa1dc92633ebe880a5188cb9e74f5c
Author: 董可伦 <do...@inspur.com>
AuthorDate: Sun Jan 23 14:11:27 2022 +0800

    [HUDI-2837] Add support for using database name in incremental query (#4083)
---
 .../hudi/common/table/HoodieTableConfig.java       |  13 +++
 .../hudi/common/table/HoodieTableMetaClient.java   |  12 +++
 .../hudi/common/testutils/HoodieTestUtils.java     |  22 +++++
 .../hudi/hadoop/HoodieFileInputFormatBase.java     |   8 +-
 .../org/apache/hudi/hadoop/InputPathHandler.java   |  30 +++---
 .../realtime/HoodieParquetRealtimeInputFormat.java |   5 +-
 .../apache/hudi/hadoop/utils/HoodieHiveUtils.java  |   6 ++
 .../hudi/hadoop/TestHoodieHFileInputFormat.java    | 102 +++++++++++++++++++--
 .../hudi/hadoop/TestHoodieParquetInputFormat.java  | 102 +++++++++++++++++++--
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |  24 ++++-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   2 +
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |   5 +-
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala |  10 +-
 .../hudi/command/CreateHoodieTableCommand.scala    |   4 +-
 .../hudi/command/TruncateHoodieTableCommand.scala  |   2 +-
 .../hudi/command/payload/ExpressionPayload.scala   |   4 +-
 .../command/InsertIntoHoodieTableCommand.scala     |  10 +-
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   2 +-
 .../apache/spark/sql/hudi/TestCreateTable.scala    |  30 ++++--
 19 files changed, 330 insertions(+), 63 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 54724d5..624c027 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -76,6 +76,12 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
   public static final String HOODIE_PROPERTIES_FILE_BACKUP = "hoodie.properties.backup";
 
+  public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
+      .key("hoodie.database.name")
+      .noDefaultValue()
+      .withDocumentation("Database name that will be used for incremental query.If different databases have the same table name during incremental query, "
+          + "we can set it to limit the table name under a specific database");
+
   public static final ConfigProperty<String> NAME = ConfigProperty
       .key("hoodie.table.name")
       .noDefaultValue()
@@ -423,6 +429,13 @@ public class HoodieTableConfig extends HoodieConfig {
   }
 
   /**
+   * Read the database name.
+   */
+  public String getDatabaseName() {
+    return getString(DATABASE_NAME);
+  }
+
+  /**
    * Read the table name.
    */
   public String getTableName() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index f44d28e..b9a3673 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -624,6 +624,7 @@ public class HoodieTableMetaClient implements Serializable {
   public static class PropertyBuilder {
 
     private HoodieTableType tableType;
+    private String databaseName;
     private String tableName;
     private String tableCreateSchema;
     private String recordKeyFields;
@@ -655,6 +656,11 @@ public class HoodieTableMetaClient implements Serializable {
       return setTableType(HoodieTableType.valueOf(tableType));
     }
 
+    public PropertyBuilder setDatabaseName(String databaseName) {
+      this.databaseName = databaseName;
+      return this;
+    }
+
     public PropertyBuilder setTableName(String tableName) {
       this.tableName = tableName;
       return this;
@@ -753,6 +759,9 @@ public class HoodieTableMetaClient implements Serializable {
 
     public PropertyBuilder fromProperties(Properties properties) {
       HoodieConfig hoodieConfig = new HoodieConfig(properties);
+      if (hoodieConfig.contains(HoodieTableConfig.DATABASE_NAME)) {
+        setDatabaseName(hoodieConfig.getString(HoodieTableConfig.DATABASE_NAME));
+      }
       if (hoodieConfig.contains(HoodieTableConfig.NAME)) {
         setTableName(hoodieConfig.getString(HoodieTableConfig.NAME));
       }
@@ -819,6 +828,9 @@ public class HoodieTableMetaClient implements Serializable {
       ValidationUtils.checkArgument(tableName != null, "tableName is null");
 
       HoodieTableConfig tableConfig = new HoodieTableConfig();
+      if (databaseName != null) {
+        tableConfig.setValue(HoodieTableConfig.DATABASE_NAME, databaseName);
+      }
       tableConfig.setValue(HoodieTableConfig.NAME, tableName);
       tableConfig.setValue(HoodieTableConfig.TYPE, tableType.name());
       tableConfig.setValue(HoodieTableConfig.VERSION,
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index d03dca0..c623c2f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -46,6 +46,7 @@ import java.util.UUID;
  */
 public class HoodieTestUtils {
 
+  public static final String HOODIE_DATABASE = "test_incremental";
   public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
   public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
   public static final int DEFAULT_LOG_VERSION = 1;
@@ -92,6 +93,14 @@ public class HoodieTestUtils {
   }
 
   public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
+                                           HoodieFileFormat baseFileFormat, String databaseName)
+      throws IOException {
+    Properties properties = new Properties();
+    properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.toString());
+    return init(hadoopConf, basePath, tableType, properties, databaseName);
+  }
+
+  public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
                                            HoodieFileFormat baseFileFormat)
       throws IOException {
     Properties properties = new Properties();
@@ -111,6 +120,19 @@ public class HoodieTestUtils {
     return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
   }
 
+  public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
+                                           Properties properties, String databaseName)
+      throws IOException {
+    properties = HoodieTableMetaClient.withPropertyBuilder()
+      .setDatabaseName(databaseName)
+      .setTableName(RAW_TRIPS_TEST_NAME)
+      .setTableType(tableType)
+      .setPayloadClass(HoodieAvroPayload.class)
+      .fromProperties(properties)
+      .build();
+    return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
+  }
+
   public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, HoodieFileFormat baseFileFormat) throws IOException {
     Properties props = new Properties();
     props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath);
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
index 9597256..a35eb50 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
@@ -121,7 +121,7 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
         continue;
       }
       List<Path> inputPaths = inputPathHandler.getGroupedIncrementalPaths().get(metaClient);
-      List<FileStatus> result = listStatusForIncrementalMode(job, metaClient, inputPaths);
+      List<FileStatus> result = listStatusForIncrementalMode(job, metaClient, inputPaths, table);
       if (result != null) {
         returns.addAll(result);
       }
@@ -229,14 +229,14 @@ public abstract class HoodieFileInputFormatBase extends FileInputFormat<NullWrit
    * partitions and then filtering based on the commits of interest, this logic first extracts the
    * partitions touched by the desired commits and then lists only those partitions.
    */
-  protected List<FileStatus> listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
-    String tableName = tableMetaClient.getTableConfig().getTableName();
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job, HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> inputPaths, String incrementalTable) throws IOException {
     Job jobContext = Job.getInstance(job);
     Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
     if (!timeline.isPresent()) {
       return null;
     }
-    Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
+    Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timeline.get());
     if (!commitsToCheck.isPresent()) {
       return null;
     }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
index f7adf38..07bd82a 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
@@ -19,11 +19,13 @@
 package org.apache.hudi.hadoop;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.InvalidTableException;
 import org.apache.hudi.exception.TableNotFoundException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -53,11 +55,12 @@ public class InputPathHandler {
   public static final Logger LOG = LogManager.getLogger(InputPathHandler.class);
 
   private final Configuration conf;
-  // tablename to metadata mapping for all Hoodie tables(both incremental & snapshot)
+  // tableName to metadata mapping for all Hoodie tables(both incremental & snapshot)
   private final Map<String, HoodieTableMetaClient> tableMetaClientMap;
   private final Map<HoodieTableMetaClient, List<Path>> groupedIncrementalPaths;
   private final List<Path> snapshotPaths;
   private final List<Path> nonHoodieInputPaths;
+  private boolean isIncrementalUseDatabase;
 
   public InputPathHandler(Configuration conf, Path[] inputPaths, List<String> incrementalTables) throws IOException {
     this.conf = conf;
@@ -65,13 +68,14 @@ public class InputPathHandler {
     snapshotPaths = new ArrayList<>();
     nonHoodieInputPaths = new ArrayList<>();
     groupedIncrementalPaths = new HashMap<>();
+    this.isIncrementalUseDatabase = HoodieHiveUtils.isIncrementalUseDatabase(conf);
     parseInputPaths(inputPaths, incrementalTables);
   }
 
   /**
    * Takes in the original InputPaths and classifies each of them into incremental, snapshot and
    * non-hoodie InputPaths. The logic is as follows:
-   * 1. Check if an inputPath starts with the same basepath as any of the metadata basepaths we know
+   * 1. Check if an inputPath starts with the same basePath as any of the metadata basePaths we know
    *    1a. If yes, this belongs to a Hoodie table that we already know about. Simply classify this
    *        as incremental or snapshot - We can get the table name of this inputPath from the
    *        metadata. Then based on the list of incrementalTables, we can classify this inputPath.
@@ -95,19 +99,17 @@ public class InputPathHandler {
           // We already know the base path for this inputPath.
           basePathKnown = true;
           // Check if this is for a snapshot query
-          String tableName = metaClient.getTableConfig().getTableName();
-          tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables);
+          tagAsIncrementalOrSnapshot(inputPath, metaClient, incrementalTables);
           break;
         }
       }
       if (!basePathKnown) {
-        // This path is for a table that we dont know about yet.
+        // This path is for a table that we don't know about yet.
         HoodieTableMetaClient metaClient;
         try {
           metaClient = getTableMetaClientForBasePath(inputPath.getFileSystem(conf), inputPath);
-          String tableName = metaClient.getTableConfig().getTableName();
-          tableMetaClientMap.put(tableName, metaClient);
-          tagAsIncrementalOrSnapshot(inputPath, tableName, metaClient, incrementalTables);
+          tableMetaClientMap.put(getIncrementalTable(metaClient), metaClient);
+          tagAsIncrementalOrSnapshot(inputPath, metaClient, incrementalTables);
         } catch (TableNotFoundException | InvalidTableException e) {
           // This is a non Hoodie inputPath
           LOG.info("Handling a non-hoodie path " + inputPath);
@@ -117,9 +119,8 @@ public class InputPathHandler {
     }
   }
 
-  private void tagAsIncrementalOrSnapshot(Path inputPath, String tableName,
-      HoodieTableMetaClient metaClient, List<String> incrementalTables) {
-    if (!incrementalTables.contains(tableName)) {
+  private void tagAsIncrementalOrSnapshot(Path inputPath, HoodieTableMetaClient metaClient, List<String> incrementalTables) {
+    if (!incrementalTables.contains(getIncrementalTable(metaClient))) {
       snapshotPaths.add(inputPath);
     } else {
       // Group incremental Paths belonging to same table.
@@ -145,4 +146,11 @@ public class InputPathHandler {
   public List<Path> getNonHoodieInputPaths() {
     return nonHoodieInputPaths;
   }
+
+  private String getIncrementalTable(HoodieTableMetaClient metaClient) {
+    String databaseName = metaClient.getTableConfig().getDatabaseName();
+    String tableName = metaClient.getTableConfig().getTableName();
+    return isIncrementalUseDatabase && !StringUtils.isNullOrEmpty(databaseName)
+            ? databaseName + "." + tableName : tableName;
+  }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index b6a7fe9..f3cf4ff 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -113,9 +113,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
    */
   @Override
   protected List<FileStatus> listStatusForIncrementalMode(
-      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
+      JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths, String incrementalTable) throws IOException {
     List<FileStatus> result = new ArrayList<>();
-    String tableName = tableMetaClient.getTableConfig().getTableName();
     Job jobContext = Job.getInstance(job);
 
     // step1
@@ -123,7 +122,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
     if (!timeline.isPresent()) {
       return result;
     }
-    HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, tableName, timeline.get());
+    HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, incrementalTable, timeline.get());
     Option<List<HoodieInstant>> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
     if (!commitsToCheck.isPresent()) {
       return result;
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
index 8abf27e..b4f7e33 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -43,6 +44,7 @@ public class HoodieHiveUtils {
 
   public static final Logger LOG = LogManager.getLogger(HoodieHiveUtils.class);
 
+  public static final String HOODIE_INCREMENTAL_USE_DATABASE = "hoodie.incremental.use.database";
   public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
   public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
   public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits";
@@ -178,4 +180,8 @@ public class HoodieHiveUtils {
     }
     return timeline.findInstantsBeforeOrEquals(maxCommit);
   }
+
+  public static boolean isIncrementalUseDatabase(Configuration conf) {
+    return conf.getBoolean(HOODIE_INCREMENTAL_USE_DATABASE, false);
+  }
 }
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
index 2c34027..67d15f9 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -232,9 +233,90 @@ public class TestHoodieHFileInputFormat {
 
     InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
 
+    HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(),
+            HoodieTableType.COPY_ON_WRITE, baseFileFormat);
+    assertEquals(null, metaClient.getTableConfig().getDatabaseName(),
+        "When hoodie.database.name is not set, it should default to null");
+
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(0, files.length,
+        "We should exclude commit 100 when returning incremental pull with start commit time as 100");
+
+    InputFormatTestUtil.setupIncremental(jobConf, "100", 1, true);
+
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(0, files.length,
+        "We should exclude commit 100 when returning incremental pull with start commit time as 100");
+
+    metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
+            baseFileFormat, HoodieTestUtils.HOODIE_DATABASE);
+    assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(),
+        String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE));
+
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length,
+        "When hoodie.incremental.use.database is true and hoodie.database.name is not null or empty"
+                + " and the incremental database name is not set, then the incremental query will not take effect");
+  }
+
+  @Test
+  public void testIncrementalWithDatabaseName() throws IOException {
+    // initial commit
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
+    createCommitFile(basePath, "100", "2016/05/01");
+
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+    InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, true);
+
+    HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(),
+            HoodieTableType.COPY_ON_WRITE, baseFileFormat);
+    assertEquals(null, metaClient.getTableConfig().getDatabaseName(),
+        "When hoodie.database.name is not set, it should default to null");
+
     FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length,
+        "When hoodie.database.name is null, then the incremental query will not take effect");
+
+    metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
+            baseFileFormat, "");
+    assertEquals("", metaClient.getTableConfig().getDatabaseName(),
+        "The hoodie.database.name should be empty");
+
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length,
+        "When hoodie.database.name is empty, then the incremental query will not take effect");
+
+    metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
+            baseFileFormat, HoodieTestUtils.HOODIE_DATABASE);
+    assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(),
+        String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE));
+
+    files = inputFormat.listStatus(jobConf);
     assertEquals(0, files.length,
         "We should exclude commit 100 when returning incremental pull with start commit time as 100");
+
+    InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, false);
+
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length,
+        "When hoodie.incremental.use.database is false and the incremental database name is set,"
+                + "then the incremental query will not take effect");
+
+    // The configuration with and without database name exists together
+    InputFormatTestUtil.setupIncremental(jobConf, "1", 1, true);
+
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(0, files.length,
+        "When hoodie.incremental.use.database is true, "
+                + "We should exclude commit 100 because the returning incremental pull with start commit time is 100");
+
+    InputFormatTestUtil.setupIncremental(jobConf, "1", 1, false);
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length,
+        "When hoodie.incremental.use.database is false, "
+                + "We should include commit 100 because the returning incremental pull with start commit time is 1");
   }
 
   private void createCommitFile(java.nio.file.Path basePath, String commitNumber, String partitionPath)
@@ -316,7 +398,7 @@ public class TestHoodieHFileInputFormat {
     ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200", 1);
   }
 
-  // TODO enable this after enabling predicate pushdown
+  // TODO enable this after enabling predicate push down
   public void testPredicatePushDown() throws IOException {
     // initial commit
     Schema schema = getSchemaFromResource(TestHoodieHFileInputFormat.class, "/sample1.avsc");
@@ -337,7 +419,7 @@ public class TestHoodieHFileInputFormat {
     // check whether we have 2 records at this point
     ensureRecordsInCommit("We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2,
         2, 2);
-    // Make sure we have the 10 records if we roll back the stattime
+    // Make sure we have the 10 records if we roll back the start time
     InputFormatTestUtil.setupIncremental(jobConf, "0", 2);
     ensureRecordsInCommit("We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1,
         8, 10);
@@ -347,19 +429,19 @@ public class TestHoodieHFileInputFormat {
 
   @Test
   public void testGetIncrementalTableNames() throws IOException {
-    String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"};
+    String[] expectedIncrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"};
     JobConf conf = new JobConf();
-    String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
+    String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[0]);
     conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
-    String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
+    String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[1]);
     conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
     String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
     conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase());
-    String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
-    conf.set(defaultmode, HoodieHiveUtils.DEFAULT_SCAN_MODE);
-    List<String> actualincrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf));
-    for (String expectedincrTable : expectedincrTables) {
-      assertTrue(actualincrTables.contains(expectedincrTable));
+    String defaultMode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
+    conf.set(defaultMode, HoodieHiveUtils.DEFAULT_SCAN_MODE);
+    List<String> actualIncrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf));
+    for (String expectedIncrTable : expectedIncrTables) {
+      assertTrue(actualIncrTables.contains(expectedIncrTable));
     }
   }
 
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index c45c614..5c7a1fd 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -286,9 +287,90 @@ public class TestHoodieParquetInputFormat {
 
     InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
 
+    HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(),
+            HoodieTableType.COPY_ON_WRITE, baseFileFormat);
+    assertEquals(null, metaClient.getTableConfig().getDatabaseName(),
+        "When hoodie.database.name is not set, it should default to null");
+
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(0, files.length,
+        "We should exclude commit 100 when returning incremental pull with start commit time as 100");
+
+    InputFormatTestUtil.setupIncremental(jobConf, "100", 1, true);
+
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(0, files.length,
+        "We should exclude commit 100 when returning incremental pull with start commit time as 100");
+
+    metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
+            baseFileFormat, HoodieTestUtils.HOODIE_DATABASE);
+    assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(),
+        String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE));
+
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length,
+        "When hoodie.incremental.use.database is true and hoodie.database.name is not null or empty"
+                + " and the incremental database name is not set, then the incremental query will not take effect");
+  }
+
+  @Test
+  public void testIncrementalWithDatabaseName() throws IOException {
+    // initial commit
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
+    createCommitFile(basePath, "100", "2016/05/01");
+
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+    InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, true);
+
+    HoodieTableMetaClient metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(),
+            HoodieTableType.COPY_ON_WRITE, baseFileFormat);
+    assertEquals(null, metaClient.getTableConfig().getDatabaseName(),
+        "When hoodie.database.name is not set, it should default to null");
+
     FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length,
+        "When hoodie.database.name is null, then the incremental query will not take effect");
+
+    metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
+            baseFileFormat, "");
+    assertEquals("", metaClient.getTableConfig().getDatabaseName(),
+        "The hoodie.database.name should be empty");
+
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length,
+        "When hoodie.database.name is empty, then the incremental query will not take effect");
+
+    metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
+            baseFileFormat, HoodieTestUtils.HOODIE_DATABASE);
+    assertEquals(HoodieTestUtils.HOODIE_DATABASE, metaClient.getTableConfig().getDatabaseName(),
+            String.format("The hoodie.database.name should be %s ", HoodieTestUtils.HOODIE_DATABASE));
+
+    files = inputFormat.listStatus(jobConf);
     assertEquals(0, files.length,
         "We should exclude commit 100 when returning incremental pull with start commit time as 100");
+
+    InputFormatTestUtil.setupIncremental(jobConf, "100", 1, HoodieTestUtils.HOODIE_DATABASE, false);
+
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length,
+        "When hoodie.incremental.use.database is false and the incremental database name is set, "
+                + "then the incremental query will not take effect");
+
+    // The configuration with and without database name exists together
+    InputFormatTestUtil.setupIncremental(jobConf, "1", 1, true);
+
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(0, files.length,
+        "When hoodie.incremental.use.database is true, "
+                + "We should exclude commit 100 because the returning incremental pull with start commit time is 100");
+
+    InputFormatTestUtil.setupIncremental(jobConf, "1", 1, false);
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length,
+        "When hoodie.incremental.use.database is false, "
+                + "We should include commit 100 because the returning incremental pull with start commit time is 1");
   }
 
   @Test
@@ -429,7 +511,7 @@ public class TestHoodieParquetInputFormat {
     ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200", 1);
   }
 
-  @Disabled("enable this after enabling predicate pushdown")
+  @Disabled("enable this after enabling predicate push down")
   @Test
   public void testPredicatePushDown() throws IOException {
     // initial commit
@@ -451,7 +533,7 @@ public class TestHoodieParquetInputFormat {
     // check whether we have 2 records at this point
     ensureRecordsInCommit("We need to have 2 records that was modified at commit " + commit2 + " and no more", commit2,
         2, 2);
-    // Make sure we have the 10 records if we roll back the stattime
+    // Make sure we have the 10 records if we roll back the start time
     InputFormatTestUtil.setupIncremental(jobConf, "0", 2);
     ensureRecordsInCommit("We need to have 8 records that was modified at commit " + commit1 + " and no more", commit1,
         8, 10);
@@ -461,19 +543,19 @@ public class TestHoodieParquetInputFormat {
 
   @Test
   public void testGetIncrementalTableNames() throws IOException {
-    String[] expectedincrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"};
+    String[] expectedIncrTables = {"db1.raw_trips", "db2.model_trips", "db3.model_trips"};
     JobConf conf = new JobConf();
-    String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
+    String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[0]);
     conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
-    String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
+    String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedIncrTables[1]);
     conf.set(incrementalMode2, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
     String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
     conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase());
-    String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
-    conf.set(defaultmode, HoodieHiveUtils.DEFAULT_SCAN_MODE);
-    List<String> actualincrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf));
-    for (String expectedincrTable : expectedincrTables) {
-      assertTrue(actualincrTables.contains(expectedincrTable));
+    String defaultMode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
+    conf.set(defaultMode, HoodieHiveUtils.DEFAULT_SCAN_MODE);
+    List<String> actualIncrTables = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(conf));
+    for (String expectedIncrTable : expectedIncrTables) {
+      assertTrue(actualIncrTables.contains(expectedIncrTable));
     }
   }
 
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index b71652b..8c19524 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -130,6 +130,10 @@ public class InputFormatTestUtil {
   }
 
   public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
+    setupIncremental(jobConf, startCommit, numberOfCommitsToPull, false);
+  }
+
+  public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean isIncrementalUseDatabase) {
     String modePropertyName =
         String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
@@ -141,8 +145,26 @@ public class InputFormatTestUtil {
     String maxCommitPulls =
         String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
     jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
+
+    jobConf.setBoolean(HoodieHiveUtils.HOODIE_INCREMENTAL_USE_DATABASE, isIncrementalUseDatabase);
   }
-  
+
+  public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, String databaseName, boolean isIncrementalUseDatabase) {
+    String modePropertyName =
+            String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, databaseName + "." + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+    jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
+
+    String startCommitTimestampName =
+            String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, databaseName + "."  + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+    jobConf.set(startCommitTimestampName, startCommit);
+
+    String maxCommitPulls =
+            String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, databaseName + "."  + HoodieTestUtils.RAW_TRIPS_TEST_NAME);
+    jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
+
+    jobConf.setBoolean(HoodieHiveUtils.HOODIE_INCREMENTAL_USE_DATABASE, isIncrementalUseDatabase);
+  }
+
   public static void setupSnapshotIncludePendingCommits(JobConf jobConf, String instantTime) {
     setupSnapshotScanMode(jobConf, true);
     String validateTimestampName =
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 5dcf03d..1f2aae4 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -85,6 +85,7 @@ object HoodieSparkSqlWriter {
     validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
 
     val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
+    val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
     val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
       s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
     assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)),
@@ -131,6 +132,7 @@ object HoodieSparkSqlWriter {
 
         val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(tableType)
+          .setDatabaseName(databaseName)
           .setTableName(tblName)
           .setRecordKeyFields(recordKeyFields)
           .setBaseFileFormat(baseFileFormat)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 377d4db..f14ccbe 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -21,8 +21,7 @@ import org.apache.hudi.AvroConversionUtils
 import org.apache.hudi.HoodieWriterUtils._
 import org.apache.hudi.common.config.DFSPropertiesConfiguration
 import org.apache.hudi.common.model.HoodieTableType
-import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.ValidationUtils
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
@@ -184,8 +183,10 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
     } else {
       val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table)
       val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace)
+      val hoodieDatabaseName = formatName(spark, table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase))
       HoodieTableMetaClient.withPropertyBuilder()
         .fromProperties(properties)
+        .setDatabaseName(hoodieDatabaseName)
         .setTableName(table.identifier.table)
         .setTableCreateSchema(schema.toString())
         .setPartitionFields(table.partitionColumnNames.mkString(","))
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index e3388e2..24c6e21 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -202,11 +202,11 @@ object HoodieOptionConfig {
         s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
     }
 
-    // validate precombine key
-    val precombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName)
-    if (precombineKey.isDefined && precombineKey.get.nonEmpty) {
-      ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, precombineKey.get)),
-        s"Can't find preCombineKey `${precombineKey.get}` in ${schema.treeString}.")
+    // validate preCombine key
+    val preCombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName)
+    if (preCombineKey.isDefined && preCombineKey.get.nonEmpty) {
+      ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, preCombineKey.get)),
+        s"Can't find preCombineKey `${preCombineKey.get}` in ${schema.treeString}.")
     }
 
     // validate table type
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index 8c9d902..da9fcb8 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -123,12 +123,12 @@ object CreateHoodieTableCommand {
       table.storage.compressed,
       storageProperties + ("path" -> path))
 
-    val tablName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.table)
+    val tableName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.table)
     val newDatabaseName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.database
       .getOrElse(catalog.getCurrentDatabase))
 
     val newTableIdentifier = table.identifier
-      .copy(table = tablName, database = Some(newDatabaseName))
+      .copy(table = tableName, database = Some(newDatabaseName))
 
     val partitionColumnNames = hoodieCatalogTable.partitionSchema.map(_.name)
     // append pk, preCombineKey, type to the properties of table
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
index 12ec224..4d2debb 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
@@ -50,7 +50,7 @@ class TruncateHoodieTableCommand(
     }
 
     // If we have not specified the partition, truncate will delete all the data in the table path
-    // include the hoodi.properties. In this case we should reInit the table.
+    // include the hoodie.properties. In this case we should reInit the table.
     if (partitionSpec.isEmpty) {
       val hadoopConf = sparkSession.sessionState.newHadoopConf()
       // ReInit hoodie.properties
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index e660fe8..0800d17 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -115,7 +115,7 @@ class ExpressionPayload(record: GenericRecord,
         if (targetRecord.isEmpty || needUpdatingPersistedRecord(targetRecord.get, resultRecord, properties)) {
           resultRecordOpt = HOption.of(resultRecord)
         } else {
-          // if the PreCombine field value of targetRecord is greate
+          // if the PreCombine field value of targetRecord is greater
           // than the new incoming record, just keep the old record value.
           resultRecordOpt = HOption.of(targetRecord.get)
         }
@@ -270,7 +270,7 @@ class ExpressionPayload(record: GenericRecord,
 object ExpressionPayload {
 
   /**
-   * Property for pass the merge-into delete clause condition expresssion.
+   * Property for pass the merge-into delete clause condition expression.
    */
   val PAYLOAD_DELETE_CONDITION = "hoodie.payload.delete.condition"
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 560c7e1..030d3e3 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -146,7 +146,7 @@ object InsertIntoHoodieTableCommand extends Logging {
       queryOutputWithoutMetaFields
     }
     // Align for the data fields of the query
-    val dataProjectsWithputMetaFields = queryDataFieldsWithoutMetaFields.zip(
+    val dataProjectsWithoutMetaFields = queryDataFieldsWithoutMetaFields.zip(
       hoodieCatalogTable.dataSchemaWithoutMetaFields.fields).map { case (dataAttr, targetField) =>
         val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable),
           targetField.dataType, conf)
@@ -171,7 +171,7 @@ object InsertIntoHoodieTableCommand extends Logging {
         Alias(castAttr, f.name)()
       })
     }
-    val alignedProjects = dataProjectsWithputMetaFields ++ partitionProjects
+    val alignedProjects = dataProjectsWithoutMetaFields ++ partitionProjects
     Project(alignedProjects, query)
   }
 
@@ -217,7 +217,7 @@ object InsertIntoHoodieTableCommand extends Logging {
       DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
     val isNonStrictMode = insertMode == InsertMode.NON_STRICT
     val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
-    val hasPrecombineColumn = preCombineColumn.nonEmpty
+    val hasPreCombineColumn = preCombineColumn.nonEmpty
     val operation =
       (enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
         case (true, _, _, false, _) =>
@@ -234,7 +234,7 @@ object InsertIntoHoodieTableCommand extends Logging {
         // insert overwrite partition
         case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
         // disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode.
-        case (false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL
+        case (false, false, false, false, _) if hasPreCombineColumn => UPSERT_OPERATION_OPT_VAL
         // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
         case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
         // for the rest case, use the insert operation
@@ -267,7 +267,7 @@ object InsertIntoHoodieTableCommand extends Logging {
         PARTITIONPATH_FIELD.key -> partitionFields,
         PAYLOAD_CLASS_NAME.key -> payloadClassName,
         ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
-        HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
+        HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPreCombineColumn),
         META_SYNC_ENABLED.key -> enableHive.toString,
         HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
         HIVE_USE_JDBC.key -> "false",
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index b3ba034..2c76ad5 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -450,7 +450,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
         "path" -> path,
         RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
         PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
-        TBL_NAME.key -> targetTableName,
+        TBL_NAME.key -> hoodieCatalogTable.tableName,
         PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
         PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
         HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index f8a658a..8ebef19 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -33,6 +33,10 @@ import scala.collection.JavaConverters._
 class TestCreateTable extends TestHoodieSqlBase {
 
   test("Test Create Managed Hoodie Table") {
+    val databaseName = "hudi_database"
+    spark.sql(s"create database if not exists $databaseName")
+    spark.sql(s"use $databaseName")
+
     val tableName = generateTableName
     // Create a managed table
     spark.sql(
@@ -60,6 +64,14 @@ class TestCreateTable extends TestHoodieSqlBase {
         StructField("price", DoubleType),
         StructField("ts", LongType))
     )(table.schema.fields)
+
+    val tablePath = table.storage.properties("path")
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(tablePath)
+      .setConf(spark.sessionState.newHadoopConf())
+      .build()
+    val tableConfig = metaClient.getTableConfig
+    assertResult(databaseName)(tableConfig.getDatabaseName)
   }
 
   test("Test Create Hoodie Table With Options") {
@@ -88,7 +100,7 @@ class TestCreateTable extends TestHoodieSqlBase {
     assertResult(CatalogTableType.MANAGED)(table.tableType)
     assertResult(
       HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, StringType))
-      ++ Seq(
+        ++ Seq(
         StructField("id", IntegerType),
         StructField("name", StringType),
         StructField("price", DoubleType),
@@ -192,7 +204,7 @@ class TestCreateTable extends TestHoodieSqlBase {
   }
 
   test("Test Table Column Validate") {
-    withTempDir {tmp =>
+    withTempDir { tmp =>
       val tableName = generateTableName
       assertThrows[IllegalArgumentException] {
         spark.sql(
@@ -277,7 +289,7 @@ class TestCreateTable extends TestHoodieSqlBase {
            | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt
          """.stripMargin
       )
-      checkAnswer(s"select id, name, price, dt from $tableName2") (
+      checkAnswer(s"select id, name, price, dt from $tableName2")(
         Seq(1, "a1", 10, "2021-04-01")
       )
 
@@ -360,6 +372,10 @@ class TestCreateTable extends TestHoodieSqlBase {
 
   test("Test Create Table From Existing Hoodie Table") {
     withTempDir { tmp =>
+      val databaseName = "hudi_database"
+      spark.sql(s"create database if not exists $databaseName")
+      spark.sql(s"use $databaseName")
+
       Seq("2021-08-02", "2021/08/02").foreach { partitionValue =>
         val tableName = generateTableName
         val tablePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -367,7 +383,7 @@ class TestCreateTable extends TestHoodieSqlBase {
         val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt")
         // Write a table by spark dataframe.
         df.write.format("hudi")
-          .option(HoodieWriteConfig.TBL_NAME.key, tableName)
+          .option(HoodieWriteConfig.TBL_NAME.key, s"original_$tableName")
           .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
           .option(RECORDKEY_FIELD.key, "id")
           .option(PRECOMBINE_FIELD.key, "ts")
@@ -386,7 +402,7 @@ class TestCreateTable extends TestHoodieSqlBase {
              |partitioned by (dt)
              |location '$tablePath'
              |""".stripMargin
-        ) ("It is not allowed to specify partition columns when the table schema is not defined.")
+        )("It is not allowed to specify partition columns when the table schema is not defined.")
 
         spark.sql(
           s"""
@@ -405,6 +421,8 @@ class TestCreateTable extends TestHoodieSqlBase {
         assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
         assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
         assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key))
+        assertResult("")(metaClient.getTableConfig.getDatabaseName)
+        assertResult(s"original_$tableName")(metaClient.getTableConfig.getTableName)
 
         // Test insert into
         spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$partitionValue')")
@@ -512,7 +530,7 @@ class TestCreateTable extends TestHoodieSqlBase {
   }
 
   test("Test Create Table From Existing Hoodie Table For None Partitioned Table") {
-    withTempDir{tmp =>
+    withTempDir { tmp =>
       // Write a table by spark dataframe.
       val tableName = generateTableName
       import spark.implicits._