You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/07/19 07:38:53 UTC

[hudi] branch master updated: [HUDI-4416] Default database path for hoodie hive catalog (#6136)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c3578069e [HUDI-4416] Default database path for hoodie hive catalog (#6136)
6c3578069e is described below

commit 6c3578069e985a3ec2a115585bd0fa8753d50ff6
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Tue Jul 19 15:38:47 2022 +0800

    [HUDI-4416] Default database path for hoodie hive catalog (#6136)
---
 hudi-flink-datasource/hudi-flink/pom.xml           |  25 +++---
 .../apache/hudi/table/catalog/HiveSchemaUtils.java |  32 ++++---
 .../hudi/table/catalog/HoodieCatalogFactory.java   |   4 +-
 .../hudi/table/catalog/HoodieHiveCatalog.java      | 100 ++++++++++++---------
 .../table/catalog/TypeInfoLogicalTypeVisitor.java  |  74 ++++-----------
 .../java/org/apache/hudi/util/StreamerUtil.java    |  15 +++-
 .../apache/hudi/table/ITTestHoodieDataSource.java  |   1 -
 .../hudi/table/catalog/HoodieCatalogTestUtils.java |   1 +
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  |   3 +-
 9 files changed, 123 insertions(+), 132 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/pom.xml b/hudi-flink-datasource/hudi-flink/pom.xml
index 77e8a77cf5..cd584304fe 100644
--- a/hudi-flink-datasource/hudi-flink/pom.xml
+++ b/hudi-flink-datasource/hudi-flink/pom.xml
@@ -269,18 +269,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>javax.transaction</groupId>
-            <artifactId>jta</artifactId>
-            <version>1.1</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>javax.transaction</groupId>
-            <artifactId>javax.transaction-api</artifactId>
-            <version>1.3</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>${hive.groupid}</groupId>
             <artifactId>hive-metastore</artifactId>
@@ -421,5 +409,18 @@
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+        <!-- Hive dependencies -->
+        <dependency>
+            <groupId>javax.transaction</groupId>
+            <artifactId>jta</artifactId>
+            <version>1.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>javax.transaction</groupId>
+            <artifactId>javax.transaction-api</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
index 36503c152c..c9590ff4a2 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -19,11 +19,13 @@
 package org.apache.hudi.table.catalog;
 
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
@@ -37,7 +39,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -60,9 +61,8 @@ public class HiveSchemaUtils {
     allCols.addAll(hiveTable.getPartitionKeys());
 
     String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME);
-    List<String> primaryColNames = StringUtils.isNullOrEmpty(pkConstraintName)
-        ? Collections.EMPTY_LIST
-        : StringUtils.split(hiveTable.getParameters().get(TableOptionProperties.PK_COLUMNS),",");
+    String pkColumnStr = hiveTable.getParameters().getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
+    List<String> pkColumns = StringUtils.split(pkColumnStr,",");
 
     String[] colNames = new String[allCols.size()];
     DataType[] colTypes = new DataType[allCols.size()];
@@ -73,14 +73,16 @@ public class HiveSchemaUtils {
       colNames[i] = fs.getName();
       colTypes[i] =
           toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
-      if (primaryColNames.contains(colNames[i])) {
+      if (pkColumns.contains(colNames[i])) {
         colTypes[i] = colTypes[i].notNull();
       }
     }
 
     org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder().fromFields(colNames, colTypes);
     if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
-      builder.primaryKeyNamed(pkConstraintName, primaryColNames);
+      builder.primaryKeyNamed(pkConstraintName, pkColumns);
+    } else {
+      builder.primaryKey(pkColumns);
     }
 
     return builder.build();
@@ -152,7 +154,8 @@ public class HiveSchemaUtils {
       case DATE:
         return DataTypes.DATE();
       case TIMESTAMP:
-        return DataTypes.TIMESTAMP(9);
+        // see org.apache.hudi.hive.util.HiveSchemaUtil#convertField for details.
+        return DataTypes.TIMESTAMP(6);
       case BINARY:
         return DataTypes.BYTES();
       case DECIMAL:
@@ -168,8 +171,10 @@ public class HiveSchemaUtils {
 
   /** Create Hive columns from Flink TableSchema. */
   public static List<FieldSchema> createHiveColumns(TableSchema schema) {
-    String[] fieldNames = schema.getFieldNames();
-    DataType[] fieldTypes = schema.getFieldDataTypes();
+    final DataType dataType = schema.toPersistedRowDataType();
+    final RowType rowType = (RowType) dataType.getLogicalType();
+    final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+    final DataType[] fieldTypes = dataType.getChildren().toArray(new DataType[0]);
 
     List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
 
@@ -177,7 +182,7 @@ public class HiveSchemaUtils {
       columns.add(
           new FieldSchema(
               fieldNames[i],
-              toHiveTypeInfo(fieldTypes[i], true).getTypeName(),
+              toHiveTypeInfo(fieldTypes[i]).getTypeName(),
               null));
     }
 
@@ -191,13 +196,12 @@ public class HiveSchemaUtils {
    * checkPrecision is true.
    *
    * @param dataType a Flink DataType
-   * @param checkPrecision whether to fail the conversion if the precision of the DataType is not
-   *     supported by Hive
+   *
    * @return the corresponding Hive data type
    */
-  public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision) {
+  public static TypeInfo toHiveTypeInfo(DataType dataType) {
     checkNotNull(dataType, "type cannot be null");
     LogicalType logicalType = dataType.getLogicalType();
-    return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType, checkPrecision));
+    return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType));
   }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
index 4a63b7a26b..e9fbd95e8f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
@@ -34,7 +34,6 @@ import java.util.Locale;
 import java.util.Set;
 
 import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
-import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
 
 /**
  * A catalog factory impl that creates {@link HoodieCatalog}.
@@ -59,6 +58,7 @@ public class HoodieCatalogFactory implements CatalogFactory {
       case "hms":
         return new HoodieHiveCatalog(
             context.getName(),
+            helper.getOptions().get(CatalogOptions.CATALOG_PATH),
             helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE),
             helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR));
       case "dfs":
@@ -82,7 +82,7 @@ public class HoodieCatalogFactory implements CatalogFactory {
     options.add(PROPERTY_VERSION);
     options.add(CatalogOptions.HIVE_CONF_DIR);
     options.add(CatalogOptions.MODE);
-    options.add(CATALOG_PATH);
+    options.add(CatalogOptions.CATALOG_PATH);
     return options;
   }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index ff80a7004b..1e877b133e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.table.catalog;
 
-import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -69,6 +68,7 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -92,7 +92,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
 import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
 import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
@@ -104,7 +103,6 @@ import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 import static org.apache.hudi.configuration.FlinkOptions.PATH;
 import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB;
 import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
-import static org.apache.hudi.table.catalog.TableOptionProperties.PK_COLUMNS;
 import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
 import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
 
@@ -117,12 +115,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
   private final HiveConf hiveConf;
   private IMetaStoreClient client;
 
-  public HoodieHiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir) {
-    this(catalogName, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
+  // optional catalog base path: used for db/table path inference.
+  private final String catalogPath;
+
+  public HoodieHiveCatalog(String catalogName, String catalogPath, String defaultDatabase, String hiveConfDir) {
+    this(catalogName, catalogPath, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
   }
 
-  public HoodieHiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, boolean allowEmbedded) {
+  public HoodieHiveCatalog(
+      String catalogName,
+      String catalogPath,
+      String defaultDatabase,
+      HiveConf hiveConf,
+      boolean allowEmbedded) {
     super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
+    // fallback to hive.metastore.warehouse.dir if catalog path is not specified
+    this.catalogPath = catalogPath == null ? hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) : catalogPath;
     this.hiveConf = hiveConf;
     if (!allowEmbedded) {
       checkArgument(
@@ -145,7 +153,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     }
     if (!databaseExists(getDefaultDatabase())) {
       LOG.info("{} does not exist, will be created.", getDefaultDatabase());
-      CatalogDatabase database = new CatalogDatabaseImpl(Collections.EMPTY_MAP, "default database");
+      CatalogDatabase database = new CatalogDatabaseImpl(Collections.emptyMap(), "default database");
       try {
         createDatabase(getDefaultDatabase(), database, true);
       } catch (DatabaseAlreadyExistException e) {
@@ -227,6 +235,10 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     Map<String, String> properties = database.getProperties();
 
     String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
+    if (dbLocationUri == null && this.catalogPath != null) {
+      // infer default location uri
+      dbLocationUri = new Path(this.catalogPath, databaseName).toString();
+    }
 
     Database hiveDatabase =
         new Database(databaseName, database.getComment(), dbLocationUri, properties);
@@ -381,8 +393,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
   @Override
   public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
     checkNotNull(tablePath, "Table path cannot be null");
-    Table hiveTable = getHiveTable(tablePath);
-    hiveTable = translateSparkTable2Flink(tablePath, hiveTable);
+    Table hiveTable = translateSparkTable2Flink(tablePath, getHiveTable(tablePath));
     String path = hiveTable.getSd().getLocation();
     Map<String, String> parameters = hiveTable.getParameters();
     Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf);
@@ -391,16 +402,21 @@ public class HoodieHiveCatalog extends AbstractCatalog {
       org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder()
           .fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema));
       String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
+      String pkColumns = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
       if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
-        builder.primaryKeyNamed(pkConstraintName, StringUtils.split(parameters.get(PK_COLUMNS), ","));
+        // pkColumns expect not to be null
+        builder.primaryKeyNamed(pkConstraintName, StringUtils.split(pkColumns, ","));
+      } else if (pkColumns != null) {
+        builder.primaryKey(StringUtils.split(pkColumns, ","));
       }
       schema = builder.build();
     } else {
       LOG.warn("{} does not have any hoodie schema, and use hive table schema to infer the table schema", tablePath);
       schema = HiveSchemaUtils.convertTableSchema(hiveTable);
     }
+    Map<String, String> options = supplementOptions(tablePath, parameters);
     return CatalogTable.of(schema, parameters.get(COMMENT),
-        HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), parameters);
+        HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), options);
   }
 
   @Override
@@ -439,8 +455,8 @@ public class HoodieHiveCatalog extends AbstractCatalog {
   }
 
   private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) {
-    Configuration flinkConf = Configuration.fromMap(applyOptionsHook(catalogTable.getOptions()));
-    final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()).toString();
+    Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
+    final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPersistedRowDataType().getLogicalType()).toString();
     flinkConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
 
     // stores two copies of options:
@@ -449,15 +465,13 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     // because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it
     // when calling #getTable.
 
-    if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()) {
+    if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()
+        && !flinkConf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
       final String pkColumns = String.join(",", catalogTable.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
-      String recordKey = flinkConf.get(FlinkOptions.RECORD_KEY_FIELD);
-      if (!Objects.equals(pkColumns, recordKey)) {
-        throw new HoodieCatalogException(String.format("%s and %s are the different", pkColumns, recordKey));
-      }
+      flinkConf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns);
     }
 
-    if (catalogTable.isPartitioned()) {
+    if (catalogTable.isPartitioned() && !flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) {
       final String partitions = String.join(",", catalogTable.getPartitionKeys());
       flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
     }
@@ -468,7 +482,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
 
     flinkConf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
     try {
-      StreamerUtil.initTableIfNotExists(flinkConf);
+      StreamerUtil.initTableIfNotExists(flinkConf, hiveConf);
     } catch (IOException e) {
       throw new HoodieCatalogException("Initialize table exception.", e);
     }
@@ -487,20 +501,6 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     return location;
   }
 
-  private Map<String, String> applyOptionsHook(Map<String, String> options) {
-    Map<String, String> properties = new HashMap<>(options);
-    if (!options.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
-      properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
-    }
-    if (!options.containsKey(FlinkOptions.PRECOMBINE_FIELD.key())) {
-      properties.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.PRECOMBINE_FIELD.defaultValue());
-    }
-    if (!options.containsKey(FlinkOptions.TABLE_TYPE.key())) {
-      properties.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE.defaultValue());
-    }
-    return properties;
-  }
-
   private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, String location, boolean useRealTimeInputFormat) throws IOException {
     // let Hive set default parameters for us, e.g. serialization.format
     Table hiveTable =
@@ -510,7 +510,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
     hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
 
-    Map<String, String> properties = applyOptionsHook(table.getOptions());
+    Map<String, String> properties = new HashMap<>(table.getOptions());
 
     if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) {
       hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
@@ -523,17 +523,11 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     }
 
     //set pk
-    if (table.getUnresolvedSchema().getPrimaryKey().isPresent()) {
+    if (table.getUnresolvedSchema().getPrimaryKey().isPresent()
+        && !properties.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
       String pkColumns = String.join(",", table.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
-      String recordKey = properties.getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
-      if (!Objects.equals(pkColumns, recordKey)) {
-        throw new HoodieCatalogException(
-            String.format("Primary key [%s] and record key [%s] should be the the same.",
-                pkColumns,
-                recordKey));
-      }
       properties.put(PK_CONSTRAINT_NAME, table.getUnresolvedSchema().getPrimaryKey().get().getConstraintName());
-      properties.put(PK_COLUMNS, pkColumns);
+      properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), pkColumns);
     }
 
     if (!properties.containsKey(FlinkOptions.PATH.key())) {
@@ -896,4 +890,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
       throws PartitionNotExistException, CatalogException {
     throw new HoodieCatalogException("Not supported.");
   }
+
+  private Map<String, String> supplementOptions(
+      ObjectPath tablePath,
+      Map<String, String> options) {
+    if (HoodieCatalogUtil.isEmbeddedMetastore(hiveConf)) {
+      return options;
+    } else {
+      Map<String, String> newOptions = new HashMap<>(options);
+      // set up hive sync options
+      newOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
+      newOptions.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
+      newOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
+      newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
+      newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_DB.key(), k -> tablePath.getDatabaseName());
+      newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_TABLE.key(), k -> tablePath.getObjectName());
+      return newOptions;
+    }
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java
index d6cfe3ed72..e6b15788fe 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.table.catalog;
 
-import org.apache.hudi.exception.HoodieCatalogException;
-
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
@@ -40,8 +38,6 @@ import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
-import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
@@ -53,64 +49,25 @@ import java.util.List;
  */
 public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor<TypeInfo> {
   private final LogicalType type;
-  // whether to check type precision
-  private final boolean checkPrecision;
 
-  TypeInfoLogicalTypeVisitor(DataType dataType, boolean checkPrecision) {
-    this(dataType.getLogicalType(), checkPrecision);
+  TypeInfoLogicalTypeVisitor(DataType dataType) {
+    this(dataType.getLogicalType());
   }
 
-  TypeInfoLogicalTypeVisitor(LogicalType type, boolean checkPrecision) {
+  TypeInfoLogicalTypeVisitor(LogicalType type) {
     this.type = type;
-    this.checkPrecision = checkPrecision;
   }
 
   @Override
   public TypeInfo visit(CharType charType) {
-    // Flink and Hive have different length limit for CHAR. Promote it to STRING if it
-    // exceeds the limits of
-    // Hive and we're told not to check precision. This can be useful when calling Hive UDF
-    // to process data.
-    if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH || charType.getLength() < 1) {
-      if (checkPrecision) {
-        throw new HoodieCatalogException(
-            String.format(
-                "HiveCatalog doesn't support char type with length of '%d'. "
-                    + "The supported length is [%d, %d]",
-                charType.getLength(), 1, HiveChar.MAX_CHAR_LENGTH));
-      } else {
-        return TypeInfoFactory.stringTypeInfo;
-      }
-    }
-    return TypeInfoFactory.getCharTypeInfo(charType.getLength());
+    // hoodie only supports avro compatible data type
+    return TypeInfoFactory.stringTypeInfo;
   }
 
   @Override
   public TypeInfo visit(VarCharType varCharType) {
-    // Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE)
-    // We don't have more information in LogicalTypeRoot to distinguish StringType and a
-    // VARCHAR(Integer.MAX_VALUE) instance
-    // Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType
-    if (varCharType.getLength() == Integer.MAX_VALUE) {
-      return TypeInfoFactory.stringTypeInfo;
-    }
-    // Flink and Hive have different length limit for VARCHAR. Promote it to STRING if it
-    // exceeds the limits of
-    // Hive and we're told not to check precision. This can be useful when calling Hive UDF
-    // to process data.
-    if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH
-        || varCharType.getLength() < 1) {
-      if (checkPrecision) {
-        throw new HoodieCatalogException(
-            String.format(
-                "HiveCatalog doesn't support varchar type with length of '%d'. "
-                    + "The supported length is [%d, %d]",
-                varCharType.getLength(), 1, HiveVarchar.MAX_VARCHAR_LENGTH));
-      } else {
-        return TypeInfoFactory.stringTypeInfo;
-      }
-    }
-    return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
+    // hoodie only supports avro compatible data type
+    return TypeInfoFactory.stringTypeInfo;
   }
 
   @Override
@@ -140,12 +97,14 @@ public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor<TypeIn
 
   @Override
   public TypeInfo visit(TinyIntType tinyIntType) {
-    return TypeInfoFactory.byteTypeInfo;
+    // hoodie only supports avro compatible data type
+    return TypeInfoFactory.intTypeInfo;
   }
 
   @Override
   public TypeInfo visit(SmallIntType smallIntType) {
-    return TypeInfoFactory.shortTypeInfo;
+    // hoodie only supports avro compatible data type
+    return TypeInfoFactory.intTypeInfo;
   }
 
   @Override
@@ -175,11 +134,14 @@ public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor<TypeIn
 
   @Override
   public TypeInfo visit(TimestampType timestampType) {
-    if (checkPrecision && timestampType.getPrecision() == 9) {
-      throw new HoodieCatalogException(
-          "HoodieCatalog currently does not support timestamp of precision 9");
+    int precision = timestampType.getPrecision();
+    // see org.apache.hudi.hive.util.HiveSchemaUtil#convertField for details.
+    // default supports timestamp
+    if (precision == 6) {
+      return TypeInfoFactory.timestampTypeInfo;
+    } else {
+      return TypeInfoFactory.longTypeInfo;
     }
-    return TypeInfoFactory.timestampTypeInfo;
   }
 
   @Override
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index d817f537cb..5a34f2a178 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -273,8 +273,19 @@ public class StreamerUtil {
    * @throws IOException if errors happens when writing metadata
    */
   public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException {
+    return initTableIfNotExists(conf, HadoopConfigurations.getHadoopConf(conf));
+  }
+
+  /**
+   * Initialize the table if it does not exist.
+   *
+   * @param conf the configuration
+   * @throws IOException if errors happens when writing metadata
+   */
+  public static HoodieTableMetaClient initTableIfNotExists(
+      Configuration conf,
+      org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
     final String basePath = conf.getString(FlinkOptions.PATH);
-    final org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);
     if (!tableExists(basePath, hadoopConf)) {
       HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder()
           .setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA))
@@ -529,7 +540,7 @@ public class StreamerUtil {
     }
     return null;
   }
-    
+
   public static boolean fileExists(FileSystem fs, Path path) {
     try {
       return fs.exists(path);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 9fd7e2f912..44d300f555 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1342,7 +1342,6 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
         .field("f_par string")
         .pkField("f_int")
         .partitionField("f_par")
-        .option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/" + dbName + "/" + "t1")
         .option(FlinkOptions.RECORD_KEY_FIELD, "f_int")
         .option(FlinkOptions.PRECOMBINE_FIELD, "f_date")
         .end();
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
index 6a077ec7c4..8bcf7e7953 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
@@ -41,6 +41,7 @@ public class HoodieCatalogTestUtils {
     return new HoodieHiveCatalog(
         name,
         null,
+        null,
         createHiveConf(),
         true);
   }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 4d9b4e0518..da6cde4e89 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -54,6 +54,7 @@ import java.util.Map;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -106,7 +107,7 @@ public class TestHoodieHiveCatalog {
     assertEquals(table1.getOptions().get(CONNECTOR.key()), "hudi");
     assertEquals(table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()), tableType.toString());
     assertEquals(table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()), "uuid");
-    assertEquals(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "ts");
+    assertNull(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "preCombine key is not declared");
     assertEquals(table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames(), Collections.singletonList("uuid"));
     assertEquals(((CatalogTable)table1).getPartitionKeys(), Collections.singletonList("par1"));
   }