You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/16 11:51:45 UTC

[flink-table-store] branch master updated: [FLINK-28079] Check Hive DDL against table store schema when creating table

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a3d92b20 [FLINK-28079] Check Hive DDL against table store schema when creating table
a3d92b20 is described below

commit a3d92b20f21d265109b667291ffd62177d5e0a78
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Jun 16 19:51:41 2022 +0800

    [FLINK-28079] Check Hive DDL against table store schema when creating table
    
    This closes #158
---
 .../flink/table/store/TableStoreJobConf.java       | 123 ------------
 .../apache/flink/table/store/hive/HiveSchema.java  | 139 +++++++++----
 .../flink/table/store/hive/HiveTypeUtils.java      | 173 +++++++++--------
 .../table/store/hive/TableStoreHiveMetaHook.java   |   1 -
 .../store/hive/TableStoreHiveStorageHandler.java   |   4 -
 .../flink/table/store/hive/TableStoreSerDe.java    |   2 +-
 .../TableStoreCharObjectInspector.java             |   8 +-
 .../TableStoreDecimalObjectInspector.java          |   6 +-
 .../TableStoreListObjectInspector.java             |  10 +-
 .../TableStoreMapObjectInspector.java              |  16 +-
 .../TableStoreRowDataObjectInspector.java          |  16 +-
 .../TableStoreVarcharObjectInspector.java          |   8 +-
 .../table/store/mapred/TableStoreInputFormat.java  |  11 +-
 .../flink/table/store/hive/HiveSchemaTest.java     | 215 +++++++++++++++++++++
 .../store/hive/RandomGenericRowDataGenerator.java  |  56 +++---
 .../hive/TableStoreHiveStorageHandlerITCase.java   |  68 +------
 .../table/store/hive/TableStoreSerDeTest.java      |  27 ++-
 .../TableStoreCharObjectInspectorTest.java         |  13 +-
 .../TableStoreDecimalObjectInspectorTest.java      |  13 +-
 .../TableStoreListObjectInspectorTest.java         |   6 +-
 .../TableStoreMapObjectInspectorTest.java          |   6 +-
 .../TableStoreRowDataObjectInspectorTest.java      |   8 +-
 .../TableStoreVarcharObjectInspectorTest.java      |  13 +-
 23 files changed, 531 insertions(+), 411 deletions(-)

diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
index 9af0648f..7d630b9d 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
@@ -18,25 +18,11 @@
 
 package org.apache.flink.table.store;
 
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.hive.HiveTypeUtils;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
-import org.apache.flink.util.Preconditions;
-
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.JobConf;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 /**
  * Utility class to convert Hive table property keys and get file store specific configurations from
@@ -44,24 +30,7 @@ import java.util.stream.Collectors;
  */
 public class TableStoreJobConf {
 
-    private static final String TBLPROPERTIES_PREFIX = "table-store.";
-    private static final String TBLPROPERTIES_PRIMARY_KEYS = TBLPROPERTIES_PREFIX + "primary-keys";
-    private static final String INTERNAL_TBLPROPERTIES_PREFIX =
-            "table-store.internal.tblproperties.";
-
-    private static final String INTERNAL_DB_NAME = "table-store.internal.db-name";
-    private static final String INTERNAL_TABLE_NAME = "table-store.internal.table-name";
     private static final String INTERNAL_LOCATION = "table-store.internal.location";
-
-    private static final String INTERNAL_COLUMN_NAMES = "table-store.internal.column-names";
-
-    private static final String INTERNAL_COLUMN_TYPES = "table-store.internal.column-types";
-    private static final String COLUMN_TYPES_SEPARATOR = "\0";
-
-    private static final String INTERNAL_PARTITION_COLUMN_NAMES =
-            "table-store.internal.partition-column-names";
-    private static final String INTERNAL_PRIMARY_KEYS = "table-store.internal.primary-keys";
-
     private static final String INTERNAL_FILE_STORE_USER = "table-store.internal.file-store.user";
 
     private final JobConf jobConf;
@@ -71,107 +40,15 @@ public class TableStoreJobConf {
     }
 
     public static void configureInputJobProperties(Properties properties, Map<String, String> map) {
-        String tableNameString = properties.getProperty(hive_metastoreConstants.META_TABLE_NAME);
-        String[] tableName = tableNameString.split("\\.");
-        Preconditions.checkState(
-                tableName.length >= 2,
-                "There is no dot in META_TABLE_NAME " + tableNameString + ". This is unexpected.");
-
-        map.put(
-                INTERNAL_DB_NAME,
-                String.join(".", Arrays.copyOfRange(tableName, 0, tableName.length - 1)));
-
-        map.put(INTERNAL_TABLE_NAME, tableName[tableName.length - 1]);
-
         map.put(
                 INTERNAL_LOCATION,
                 properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
-
-        map.put(
-                INTERNAL_COLUMN_NAMES,
-                properties.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS));
-
-        List<String> serializedLogicalTypes =
-                TypeInfoUtils.getTypeInfosFromTypeString(
-                                properties.getProperty(
-                                        hive_metastoreConstants.META_TABLE_COLUMN_TYPES))
-                        .stream()
-                        .map(
-                                t ->
-                                        HiveTypeUtils.typeInfoToDataType(t)
-                                                .getLogicalType()
-                                                .asSerializableString())
-                        .collect(Collectors.toList());
-        map.put(INTERNAL_COLUMN_TYPES, String.join(COLUMN_TYPES_SEPARATOR, serializedLogicalTypes));
-
-        if (properties.containsKey(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS)) {
-            map.put(
-                    INTERNAL_PARTITION_COLUMN_NAMES,
-                    properties.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS));
-        }
-
-        if (properties.containsKey(TBLPROPERTIES_PRIMARY_KEYS)) {
-            map.put(INTERNAL_PRIMARY_KEYS, properties.getProperty(TBLPROPERTIES_PRIMARY_KEYS));
-        }
-
-        for (ConfigOption<?> option : FileStoreOptions.allOptions()) {
-            if (properties.containsKey(TBLPROPERTIES_PREFIX + option.key())) {
-                map.put(
-                        INTERNAL_TBLPROPERTIES_PREFIX + option.key(),
-                        properties.getProperty(TBLPROPERTIES_PREFIX + option.key()));
-            }
-        }
-    }
-
-    public String getDbName() {
-        return jobConf.get(INTERNAL_DB_NAME);
-    }
-
-    public String getTableName() {
-        return jobConf.get(INTERNAL_TABLE_NAME);
     }
 
     public String getLocation() {
         return jobConf.get(INTERNAL_LOCATION);
     }
 
-    public void updateFileStoreOptions(Configuration fileStoreOptions) {
-        fileStoreOptions.set(FileStoreOptions.PATH, getLocation());
-        for (Map.Entry<String, String> entry :
-                jobConf.getPropsWithPrefix(INTERNAL_TBLPROPERTIES_PREFIX).entrySet()) {
-            fileStoreOptions.setString(entry.getKey(), entry.getValue());
-        }
-    }
-
-    public List<String> getColumnNames() {
-        // see MetastoreUtils#addCols for the exact separator
-        return Arrays.asList(jobConf.get(INTERNAL_COLUMN_NAMES).split(","));
-    }
-
-    public List<LogicalType> getColumnTypes() {
-        return Arrays.stream(jobConf.get(INTERNAL_COLUMN_TYPES).split(COLUMN_TYPES_SEPARATOR))
-                .map(LogicalTypeParser::parse)
-                .collect(Collectors.toList());
-    }
-
-    public List<String> getPartitionColumnNames() {
-        String partitionColumnNameString = jobConf.get(INTERNAL_PARTITION_COLUMN_NAMES);
-        // see MetastoreUtils#addCols for the exact separator
-        return partitionColumnNameString == null
-                ? Collections.emptyList()
-                : Arrays.asList(partitionColumnNameString.split("/"));
-    }
-
-    public Optional<List<String>> getPrimaryKeyNames() {
-        String primaryKeyNameString = jobConf.get(INTERNAL_PRIMARY_KEYS);
-        if (primaryKeyNameString == null) {
-            return Optional.empty();
-        } else {
-            // TODO add a constant for this separator?
-            return Optional.of(Arrays.asList(primaryKeyNameString.split(",")));
-        }
-    }
-
     public String getFileStoreUser() {
         return jobConf.get(INTERNAL_FILE_STORE_USER);
     }
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
index 22c6ef3d..47b111f1 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
@@ -18,66 +18,139 @@
 
 package org.apache.flink.table.store.hive;
 
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.types.logical.LogicalType;
 
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 /** Column names, types and comments of a Hive table. */
 public class HiveSchema {
 
-    private final List<String> fieldNames;
-    private final List<TypeInfo> fieldTypeInfos;
-    private final List<String> fieldComments;
-
-    private HiveSchema(
-            List<String> fieldNames, List<TypeInfo> fieldTypeInfos, List<String> fieldComments) {
-        Preconditions.checkArgument(
-                fieldNames.size() == fieldTypeInfos.size()
-                        && fieldNames.size() == fieldComments.size(),
-                "Length of field names (%s), type infos (%s) and comments (%s) are different.",
-                fieldNames.size(),
-                fieldTypeInfos.size(),
-                fieldComments.size());
-        this.fieldNames = fieldNames;
-        this.fieldTypeInfos = fieldTypeInfos;
-        this.fieldComments = fieldComments;
+    private final Schema schema;
+
+    private HiveSchema(Schema schema) {
+        this.schema = schema;
     }
 
     public List<String> fieldNames() {
-        return fieldNames;
+        return schema.fieldNames();
     }
 
-    public List<TypeInfo> fieldTypeInfos() {
-        return fieldTypeInfos;
+    public List<LogicalType> fieldTypes() {
+        return schema.logicalRowType().getChildren();
     }
 
     public List<String> fieldComments() {
-        return fieldComments;
+        return schema.fields().stream().map(DataField::description).collect(Collectors.toList());
     }
 
     /** Extract {@link HiveSchema} from Hive serde properties. */
     public static HiveSchema extract(Properties properties) {
-        String columnNames = properties.getProperty(serdeConstants.LIST_COLUMNS);
-        String columnNameDelimiter =
-                properties.getProperty(
-                        serdeConstants.COLUMN_NAME_DELIMITER, String.valueOf(SerDeUtils.COMMA));
-        List<String> names = Arrays.asList(columnNames.split(columnNameDelimiter));
+        String location = properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
+        if (location == null) {
+            String tableName = properties.getProperty(hive_metastoreConstants.META_TABLE_NAME);
+            throw new UnsupportedOperationException(
+                    "Location property is missing for table "
+                            + tableName
+                            + ". Currently Flink table store only supports external table for Hive "
+                            + "so location property must be set.");
+        }
+        Schema schema =
+                new SchemaManager(new Path(location))
+                        .latest()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "Schema file not found in location "
+                                                        + location
+                                                        + ". Please create table first."));
+
+        if (properties.containsKey(serdeConstants.LIST_COLUMNS)
+                && properties.containsKey(serdeConstants.LIST_COLUMN_TYPES)) {
+            String columnNames = properties.getProperty(serdeConstants.LIST_COLUMNS);
+            String columnNameDelimiter =
+                    properties.getProperty(
+                            serdeConstants.COLUMN_NAME_DELIMITER, String.valueOf(SerDeUtils.COMMA));
+            List<String> names = Arrays.asList(columnNames.split(columnNameDelimiter));
+
+            String columnTypes = properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+            List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
+
+            if (names.size() > 0 && typeInfos.size() > 0) {
+                checkSchemaMatched(names, typeInfos, schema);
+            }
+        }
+
+        return new HiveSchema(schema);
+    }
+
+    private static void checkSchemaMatched(
+            List<String> names, List<TypeInfo> typeInfos, Schema schema) {
+        List<String> ddlNames = new ArrayList<>(names);
+        List<TypeInfo> ddlTypeInfos = new ArrayList<>(typeInfos);
+        List<String> schemaNames = schema.fieldNames();
+        List<TypeInfo> schemaTypeInfos =
+                schema.logicalRowType().getChildren().stream()
+                        .map(HiveTypeUtils::logicalTypeToTypeInfo)
+                        .collect(Collectors.toList());
 
-        String columnTypes = properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
-        List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
+        // make the lengths of lists equal
+        while (ddlNames.size() < schemaNames.size()) {
+            ddlNames.add(null);
+        }
+        while (schemaNames.size() < ddlNames.size()) {
+            schemaNames.add(null);
+        }
+        while (ddlTypeInfos.size() < schemaTypeInfos.size()) {
+            ddlTypeInfos.add(null);
+        }
+        while (schemaTypeInfos.size() < ddlTypeInfos.size()) {
+            schemaTypeInfos.add(null);
+        }
 
-        // see MetastoreUtils#addCols for the exact property name and separator
-        String columnCommentsPropertyName = "columns.comments";
-        List<String> comments =
-                Arrays.asList(properties.getProperty(columnCommentsPropertyName).split("\0", -1));
+        // compare names and type infos
+        List<String> mismatched = new ArrayList<>();
+        for (int i = 0; i < ddlNames.size(); i++) {
+            if (!Objects.equals(ddlNames.get(i), schemaNames.get(i))
+                    || !Objects.equals(ddlTypeInfos.get(i), schemaTypeInfos.get(i))) {
+                String ddlField =
+                        ddlNames.get(i) == null
+                                ? "null"
+                                : ddlNames.get(i) + " " + ddlTypeInfos.get(i).getTypeName();
+                String schemaField =
+                        schemaNames.get(i) == null
+                                ? "null"
+                                : schemaNames.get(i) + " " + schemaTypeInfos.get(i).getTypeName();
+                mismatched.add(
+                        String.format(
+                                "Field #%d\n"
+                                        + "Hive DDL          : %s\n"
+                                        + "Table Store Schema: %s\n",
+                                i, ddlField, schemaField));
+            }
+        }
 
-        return new HiveSchema(names, typeInfos, comments);
+        if (mismatched.size() > 0) {
+            throw new IllegalArgumentException(
+                    "Hive DDL and table store schema mismatched! "
+                            + "It is recommended not to write any column definition "
+                            + "as Flink table store external table can read schema from the specified location.\n"
+                            + "Mismatched fields are:\n"
+                            + String.join("--------------------\n", mismatched));
+        }
     }
 }
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
index 2009bd1f..d6061b12 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.hive;
 
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.store.hive.objectinspector.TableStoreCharObjectInspector;
 import org.apache.flink.table.store.hive.objectinspector.TableStoreDateObjectInspector;
 import org.apache.flink.table.store.hive.objectinspector.TableStoreDecimalObjectInspector;
@@ -27,111 +26,115 @@ import org.apache.flink.table.store.hive.objectinspector.TableStoreMapObjectInsp
 import org.apache.flink.table.store.hive.objectinspector.TableStoreStringObjectInspector;
 import org.apache.flink.table.store.hive.objectinspector.TableStoreTimestampObjectInspector;
 import org.apache.flink.table.store.hive.objectinspector.TableStoreVarcharObjectInspector;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.VarCharType;
 
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 /** Utils for converting types related classes between Flink and Hive. */
 public class HiveTypeUtils {
 
-    public static DataType typeInfoToDataType(TypeInfo typeInfo) {
-        ObjectInspector.Category category = typeInfo.getCategory();
-        switch (category) {
-            case PRIMITIVE:
-                PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
-                        ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
-                switch (primitiveCategory) {
-                    case BOOLEAN:
-                        return DataTypes.BOOLEAN();
-                    case BYTE:
-                        return DataTypes.TINYINT();
-                    case SHORT:
-                        return DataTypes.SMALLINT();
-                    case INT:
-                        return DataTypes.INT();
-                    case LONG:
-                        return DataTypes.BIGINT();
-                    case FLOAT:
-                        return DataTypes.FLOAT();
-                    case DOUBLE:
-                        return DataTypes.DOUBLE();
-                    case DECIMAL:
-                        DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
-                        return DataTypes.DECIMAL(
-                                decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
-                    case CHAR:
-                        return DataTypes.CHAR(((CharTypeInfo) typeInfo).getLength());
-                    case VARCHAR:
-                        return DataTypes.VARCHAR(((VarcharTypeInfo) typeInfo).getLength());
-                    case STRING:
-                        return DataTypes.STRING();
-                    case BINARY:
-                        return DataTypes.BYTES();
-                    case DATE:
-                        return DataTypes.DATE();
-                    case TIMESTAMP:
-                        // TODO verify precision
-                        return DataTypes.TIMESTAMP(3);
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unsupported primitive type info category "
-                                        + primitiveCategory.name());
+    public static TypeInfo logicalTypeToTypeInfo(LogicalType logicalType) {
+        switch (logicalType.getTypeRoot()) {
+            case BOOLEAN:
+                return TypeInfoFactory.booleanTypeInfo;
+            case TINYINT:
+                return TypeInfoFactory.byteTypeInfo;
+            case SMALLINT:
+                return TypeInfoFactory.shortTypeInfo;
+            case INTEGER:
+                return TypeInfoFactory.intTypeInfo;
+            case BIGINT:
+                return TypeInfoFactory.longTypeInfo;
+            case FLOAT:
+                return TypeInfoFactory.floatTypeInfo;
+            case DOUBLE:
+                return TypeInfoFactory.doubleTypeInfo;
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) logicalType;
+                return TypeInfoFactory.getDecimalTypeInfo(
+                        decimalType.getPrecision(), decimalType.getScale());
+            case CHAR:
+                CharType charType = (CharType) logicalType;
+                return TypeInfoFactory.getCharTypeInfo(charType.getLength());
+            case VARCHAR:
+                VarCharType varCharType = (VarCharType) logicalType;
+                if (varCharType.getLength() == VarCharType.MAX_LENGTH) {
+                    return TypeInfoFactory.stringTypeInfo;
+                } else {
+                    return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
                 }
-            case LIST:
-                ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
-                return DataTypes.ARRAY(typeInfoToDataType(listTypeInfo.getListElementTypeInfo()));
+            case BINARY:
+            case VARBINARY:
+                return TypeInfoFactory.binaryTypeInfo;
+            case DATE:
+                return TypeInfoFactory.dateTypeInfo;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return TypeInfoFactory.timestampTypeInfo;
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) logicalType;
+                return TypeInfoFactory.getListTypeInfo(
+                        logicalTypeToTypeInfo(arrayType.getElementType()));
             case MAP:
-                MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
-                return DataTypes.MAP(
-                        typeInfoToDataType(mapTypeInfo.getMapKeyTypeInfo()),
-                        typeInfoToDataType(mapTypeInfo.getMapValueTypeInfo()));
+                MapType mapType = (MapType) logicalType;
+                return TypeInfoFactory.getMapTypeInfo(
+                        logicalTypeToTypeInfo(mapType.getKeyType()),
+                        logicalTypeToTypeInfo(mapType.getValueType()));
             default:
                 throw new UnsupportedOperationException(
-                        "Unsupported type info category " + category.name());
+                        "Unsupported logical type " + logicalType.asSummaryString());
         }
     }
 
-    public static ObjectInspector getObjectInspector(TypeInfo typeInfo) {
-        ObjectInspector.Category category = typeInfo.getCategory();
-        switch (category) {
-            case PRIMITIVE:
-                PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
-                switch (primitiveTypeInfo.getPrimitiveCategory()) {
-                    case DECIMAL:
-                        return new TableStoreDecimalObjectInspector((DecimalTypeInfo) typeInfo);
-                    case CHAR:
-                        return new TableStoreCharObjectInspector((CharTypeInfo) typeInfo);
-                    case VARCHAR:
-                        return new TableStoreVarcharObjectInspector((VarcharTypeInfo) typeInfo);
-                    case STRING:
-                        return new TableStoreStringObjectInspector();
-                    case DATE:
-                        return new TableStoreDateObjectInspector();
-                    case TIMESTAMP:
-                        return new TableStoreTimestampObjectInspector();
-                    default:
-                        return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
-                                primitiveTypeInfo);
+    public static ObjectInspector getObjectInspector(LogicalType logicalType) {
+        switch (logicalType.getTypeRoot()) {
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case BINARY:
+            case VARBINARY:
+                return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
+                        (PrimitiveTypeInfo) logicalTypeToTypeInfo(logicalType));
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) logicalType;
+                return new TableStoreDecimalObjectInspector(
+                        decimalType.getPrecision(), decimalType.getScale());
+            case CHAR:
+                CharType charType = (CharType) logicalType;
+                return new TableStoreCharObjectInspector(charType.getLength());
+            case VARCHAR:
+                VarCharType varCharType = (VarCharType) logicalType;
+                if (varCharType.getLength() == VarCharType.MAX_LENGTH) {
+                    return new TableStoreStringObjectInspector();
+                } else {
+                    return new TableStoreVarcharObjectInspector(varCharType.getLength());
                 }
-            case LIST:
-                ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
-                return new TableStoreListObjectInspector(listTypeInfo.getListElementTypeInfo());
+            case DATE:
+                return new TableStoreDateObjectInspector();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return new TableStoreTimestampObjectInspector();
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) logicalType;
+                return new TableStoreListObjectInspector(arrayType.getElementType());
             case MAP:
-                MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+                MapType mapType = (MapType) logicalType;
                 return new TableStoreMapObjectInspector(
-                        mapTypeInfo.getMapKeyTypeInfo(), mapTypeInfo.getMapValueTypeInfo());
+                        mapType.getKeyType(), mapType.getValueType());
             default:
                 throw new UnsupportedOperationException(
-                        "Unsupported type info category " + category.name());
+                        "Unsupported logical type " + logicalType.asSummaryString());
         }
     }
 }
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
index b3690dca..ef27cd37 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
@@ -34,7 +34,6 @@ public class TableStoreHiveMetaHook implements HiveMetaHook {
 
     @Override
     public void preCreateTable(Table table) throws MetaException {
-        // TODO support partitioned table after schema is recorded in table store files
         Preconditions.checkArgument(
                 !table.isSetPartitionKeys() || table.getPartitionKeys().isEmpty(),
                 "Flink Table Store currently does not support creating partitioned table "
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
index 01203f6e..94788716 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
@@ -36,8 +36,6 @@ import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Properties;
@@ -47,8 +45,6 @@ import java.util.UUID;
 public class TableStoreHiveStorageHandler
         implements HiveStoragePredicateHandler, HiveStorageHandler {
 
-    private static final Logger LOG = LoggerFactory.getLogger(TableStoreHiveStorageHandler.class);
-
     private Configuration conf;
 
     @Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
index 59289f50..253ed4e2 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
@@ -47,7 +47,7 @@ public class TableStoreSerDe extends AbstractSerDe {
         HiveSchema schema = HiveSchema.extract(properties);
         inspector =
                 new TableStoreRowDataObjectInspector(
-                        schema.fieldNames(), schema.fieldTypeInfos(), schema.fieldComments());
+                        schema.fieldNames(), schema.fieldTypes(), schema.fieldComments());
     }
 
     @Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
index d0bddf94..b0486aa1 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 /** {@link AbstractPrimitiveJavaObjectInspector} for CHAR type. */
 public class TableStoreCharObjectInspector extends AbstractPrimitiveJavaObjectInspector
@@ -32,9 +32,9 @@ public class TableStoreCharObjectInspector extends AbstractPrimitiveJavaObjectIn
 
     private final int len;
 
-    public TableStoreCharObjectInspector(CharTypeInfo typeInfo) {
-        super(typeInfo);
-        this.len = typeInfo.getLength();
+    public TableStoreCharObjectInspector(int len) {
+        super(TypeInfoFactory.getCharTypeInfo(len));
+        this.len = len;
     }
 
     @Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
index 595ecc14..7f85731f 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
@@ -24,14 +24,14 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 /** {@link AbstractPrimitiveJavaObjectInspector} for DECIMAL type. */
 public class TableStoreDecimalObjectInspector extends AbstractPrimitiveJavaObjectInspector
         implements HiveDecimalObjectInspector {
 
-    public TableStoreDecimalObjectInspector(DecimalTypeInfo typeInfo) {
-        super(typeInfo);
+    public TableStoreDecimalObjectInspector(int precision, int scale) {
+        super(TypeInfoFactory.getDecimalTypeInfo(precision, scale));
     }
 
     @Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
index a7c6bce4..02b0e724 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
@@ -20,11 +20,11 @@ package org.apache.flink.table.store.hive.objectinspector;
 
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.store.hive.HiveTypeUtils;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -40,11 +40,9 @@ public class TableStoreListObjectInspector implements ListObjectInspector {
     private final ObjectInspector elementObjectInspector;
     private final ArrayData.ElementGetter elementGetter;
 
-    public TableStoreListObjectInspector(TypeInfo elementTypeInfo) {
-        this.elementObjectInspector = HiveTypeUtils.getObjectInspector(elementTypeInfo);
-        this.elementGetter =
-                ArrayData.createElementGetter(
-                        HiveTypeUtils.typeInfoToDataType(elementTypeInfo).getLogicalType());
+    public TableStoreListObjectInspector(LogicalType elementType) {
+        this.elementObjectInspector = HiveTypeUtils.getObjectInspector(elementType);
+        this.elementGetter = ArrayData.createElementGetter(elementType);
     }
 
     @Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
index 08cf09d3..4b4d606f 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
@@ -21,11 +21,11 @@ package org.apache.flink.table.store.hive.objectinspector;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.store.hive.HiveTypeUtils;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -44,15 +44,11 @@ public class TableStoreMapObjectInspector implements MapObjectInspector {
     private final ArrayData.ElementGetter keyGetter;
     private final ArrayData.ElementGetter valueGetter;
 
-    public TableStoreMapObjectInspector(TypeInfo keyTypeInfo, TypeInfo valueTypeInfo) {
-        this.keyObjectInspector = HiveTypeUtils.getObjectInspector(keyTypeInfo);
-        this.valueObjectInspector = HiveTypeUtils.getObjectInspector(valueTypeInfo);
-        this.keyGetter =
-                ArrayData.createElementGetter(
-                        HiveTypeUtils.typeInfoToDataType(keyTypeInfo).getLogicalType());
-        this.valueGetter =
-                ArrayData.createElementGetter(
-                        HiveTypeUtils.typeInfoToDataType(valueTypeInfo).getLogicalType());
+    public TableStoreMapObjectInspector(LogicalType keyType, LogicalType valueType) {
+        this.keyObjectInspector = HiveTypeUtils.getObjectInspector(keyType);
+        this.valueObjectInspector = HiveTypeUtils.getObjectInspector(valueType);
+        this.keyGetter = ArrayData.createElementGetter(keyType);
+        this.valueGetter = ArrayData.createElementGetter(valueType);
     }
 
     @Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
index 2cbc30e3..9545ccdc 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
@@ -20,11 +20,11 @@ package org.apache.flink.table.store.hive.objectinspector;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.hive.HiveTypeUtils;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -40,21 +40,20 @@ public class TableStoreRowDataObjectInspector extends StructObjectInspector {
     private final String typeName;
 
     public TableStoreRowDataObjectInspector(
-            List<String> fieldNames, List<TypeInfo> typeInfos, List<String> fieldComments) {
+            List<String> fieldNames, List<LogicalType> fieldTypes, List<String> fieldComments) {
         this.structFields = new ArrayList<>();
         this.structFieldMap = new HashMap<>();
         StringBuilder typeNameBuilder = new StringBuilder("struct<");
 
         for (int i = 0; i < fieldNames.size(); i++) {
             String name = fieldNames.get(i);
-            TypeInfo typeInfo = typeInfos.get(i);
+            LogicalType logicalType = fieldTypes.get(i);
             TableStoreStructField structField =
                     new TableStoreStructField(
                             name,
-                            HiveTypeUtils.getObjectInspector(typeInfo),
+                            HiveTypeUtils.getObjectInspector(logicalType),
                             i,
-                            RowData.createFieldGetter(
-                                    HiveTypeUtils.typeInfoToDataType(typeInfo).getLogicalType(), i),
+                            RowData.createFieldGetter(logicalType, i),
                             fieldComments.get(i));
             structFields.add(structField);
             structFieldMap.put(name, structField);
@@ -62,7 +61,10 @@ public class TableStoreRowDataObjectInspector extends StructObjectInspector {
             if (i > 0) {
                 typeNameBuilder.append(",");
             }
-            typeNameBuilder.append(name).append(":").append(typeInfo.getTypeName());
+            typeNameBuilder
+                    .append(name)
+                    .append(":")
+                    .append(HiveTypeUtils.logicalTypeToTypeInfo(logicalType).getTypeName());
         }
 
         typeNameBuilder.append(">");
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
index 342eff1e..78c0dd70 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 /** {@link AbstractPrimitiveJavaObjectInspector} for VARCHAR type. */
 public class TableStoreVarcharObjectInspector extends AbstractPrimitiveJavaObjectInspector
@@ -32,9 +32,9 @@ public class TableStoreVarcharObjectInspector extends AbstractPrimitiveJavaObjec
 
     private final int len;
 
-    public TableStoreVarcharObjectInspector(VarcharTypeInfo typeInfo) {
-        super(typeInfo);
-        this.len = typeInfo.getLength();
+    public TableStoreVarcharObjectInspector(int len) {
+        super(TypeInfoFactory.getVarcharTypeInfo(len));
+        this.len = len;
     }
 
     @Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 8653682f..f1bdb3ce 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -22,7 +22,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.RowDataContainer;
 import org.apache.flink.table.store.SearchArgumentToPredicateConverter;
 import org.apache.flink.table.store.TableStoreJobConf;
+import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 import org.apache.flink.table.store.table.source.TableScan;
@@ -51,7 +53,7 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
     public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
         FileStoreTable table = createFileStoreTable(jobConf);
         TableScan scan = table.newScan();
-        createPredicate(jobConf).ifPresent(scan::withFilter);
+        createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
         return scan.plan().splits.stream()
                 .map(TableStoreInputSplit::create)
                 .toArray(TableStoreInputSplit[]::new);
@@ -71,23 +73,22 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
     private FileStoreTable createFileStoreTable(JobConf jobConf) {
         TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
         Configuration conf = new Configuration();
-        wrapper.updateFileStoreOptions(conf);
+        conf.set(FileStoreOptions.PATH, wrapper.getLocation());
         return FileStoreTableFactory.create(conf, wrapper.getFileStoreUser());
     }
 
-    private Optional<Predicate> createPredicate(JobConf jobConf) {
+    private Optional<Predicate> createPredicate(Schema schema, JobConf jobConf) {
         String hiveFilter = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
         if (hiveFilter == null) {
             return Optional.empty();
         }
 
-        TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
         ExprNodeGenericFuncDesc exprNodeDesc =
                 SerializationUtilities.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
         SearchArgument sarg = ConvertAstToSearchArg.create(jobConf, exprNodeDesc);
         SearchArgumentToPredicateConverter converter =
                 new SearchArgumentToPredicateConverter(
-                        sarg, wrapper.getColumnNames(), wrapper.getColumnTypes());
+                        sarg, schema.fieldNames(), schema.logicalRowType().getChildren());
         return converter.convert();
     }
 }
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveSchemaTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveSchemaTest.java
new file mode 100644
index 00000000..7df77e85
--- /dev/null
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveSchemaTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link HiveSchema}. */
+public class HiveSchemaTest {
+
+    private static final RowType ROW_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new RowType.RowField(
+                                    "a", DataTypes.INT().getLogicalType(), "first comment"),
+                            new RowType.RowField(
+                                    "b", DataTypes.STRING().getLogicalType(), "second comment"),
+                            new RowType.RowField(
+                                    "c",
+                                    DataTypes.DECIMAL(5, 3).getLogicalType(),
+                                    "last comment")));
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testExtractSchema() throws Exception {
+        createSchema();
+
+        Properties properties = new Properties();
+        properties.setProperty("columns", "a,b,c");
+        properties.setProperty(
+                "columns.types",
+                String.join(
+                        ":",
+                        Arrays.asList(
+                                TypeInfoFactory.intTypeInfo.getTypeName(),
+                                TypeInfoFactory.stringTypeInfo.getTypeName(),
+                                TypeInfoFactory.getDecimalTypeInfo(5, 3).getTypeName())));
+        properties.setProperty("location", tempDir.toString());
+
+        HiveSchema schema = HiveSchema.extract(properties);
+        assertThat(schema.fieldNames()).isEqualTo(Arrays.asList("a", "b", "c"));
+        assertThat(schema.fieldTypes())
+                .isEqualTo(
+                        Arrays.asList(
+                                DataTypes.INT().getLogicalType(),
+                                DataTypes.STRING().getLogicalType(),
+                                DataTypes.DECIMAL(5, 3).getLogicalType()));
+        assertThat(schema.fieldComments())
+                .isEqualTo(Arrays.asList("first comment", "second comment", "last comment"));
+    }
+
+    @Test
+    public void testExtractSchemaWithEmptyDDL() throws Exception {
+        createSchema();
+
+        Properties properties = new Properties();
+        properties.setProperty("columns", "");
+        properties.setProperty("columns.types", "");
+        properties.setProperty("location", tempDir.toString());
+
+        HiveSchema schema = HiveSchema.extract(properties);
+        assertThat(schema.fieldNames()).isEqualTo(Arrays.asList("a", "b", "c"));
+        assertThat(schema.fieldTypes())
+                .isEqualTo(
+                        Arrays.asList(
+                                DataTypes.INT().getLogicalType(),
+                                DataTypes.STRING().getLogicalType(),
+                                DataTypes.DECIMAL(5, 3).getLogicalType()));
+        assertThat(schema.fieldComments())
+                .isEqualTo(Arrays.asList("first comment", "second comment", "last comment"));
+    }
+
+    @Test
+    public void testMismatchedColumnNameAndType() throws Exception {
+        createSchema();
+
+        Properties properties = new Properties();
+        properties.setProperty("columns", "a,mismatched,c");
+        properties.setProperty(
+                "columns.types",
+                String.join(
+                        ":",
+                        Arrays.asList(
+                                TypeInfoFactory.intTypeInfo.getTypeName(),
+                                TypeInfoFactory.stringTypeInfo.getTypeName(),
+                                TypeInfoFactory.getDecimalTypeInfo(6, 3).getTypeName())));
+        properties.setProperty("location", tempDir.toString());
+
+        String expected =
+                String.join(
+                        "\n",
+                        "Hive DDL and table store schema mismatched! "
+                                + "It is recommended not to write any column definition "
+                                + "as Flink table store external table can read schema from the specified location.",
+                        "Mismatched fields are:",
+                        "Field #1",
+                        "Hive DDL          : mismatched string",
+                        "Table Store Schema: b string",
+                        "--------------------",
+                        "Field #2",
+                        "Hive DDL          : c decimal(6,3)",
+                        "Table Store Schema: c decimal(5,3)");
+        IllegalArgumentException exception =
+                assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+        assertThat(exception).hasMessageContaining(expected);
+    }
+
+    @Test
+    public void testTooFewColumns() throws Exception {
+        createSchema();
+
+        Properties properties = new Properties();
+        properties.setProperty("columns", "a");
+        properties.setProperty("columns.types", TypeInfoFactory.intTypeInfo.getTypeName());
+        properties.setProperty("location", tempDir.toString());
+
+        String expected =
+                String.join(
+                        "\n",
+                        "Hive DDL and table store schema mismatched! "
+                                + "It is recommended not to write any column definition "
+                                + "as Flink table store external table can read schema from the specified location.",
+                        "Mismatched fields are:",
+                        "Field #1",
+                        "Hive DDL          : null",
+                        "Table Store Schema: b string",
+                        "--------------------",
+                        "Field #2",
+                        "Hive DDL          : null",
+                        "Table Store Schema: c decimal(5,3)");
+        IllegalArgumentException exception =
+                assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+        assertThat(exception).hasMessageContaining(expected);
+    }
+
+    @Test
+    public void testTooManyColumns() throws Exception {
+        createSchema();
+
+        Properties properties = new Properties();
+        properties.setProperty("columns", "a,b,c,d,e");
+        properties.setProperty(
+                "columns.types",
+                String.join(
+                        ":",
+                        Arrays.asList(
+                                TypeInfoFactory.intTypeInfo.getTypeName(),
+                                TypeInfoFactory.stringTypeInfo.getTypeName(),
+                                TypeInfoFactory.getDecimalTypeInfo(5, 3).getTypeName(),
+                                TypeInfoFactory.intTypeInfo.getTypeName(),
+                                TypeInfoFactory.stringTypeInfo.getTypeName())));
+        properties.setProperty("location", tempDir.toString());
+
+        String expected =
+                String.join(
+                        "\n",
+                        "Hive DDL and table store schema mismatched! "
+                                + "It is recommended not to write any column definition "
+                                + "as Flink table store external table can read schema from the specified location.",
+                        "Mismatched fields are:",
+                        "Field #3",
+                        "Hive DDL          : d int",
+                        "Table Store Schema: null",
+                        "--------------------",
+                        "Field #4",
+                        "Hive DDL          : e string",
+                        "Table Store Schema: null");
+        IllegalArgumentException exception =
+                assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+        assertThat(exception).hasMessageContaining(expected);
+    }
+
+    private void createSchema() throws Exception {
+        new SchemaManager(new Path(tempDir.toString()))
+                .commitNewVersion(
+                        new UpdateSchema(
+                                ROW_TYPE,
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                new HashMap<>(),
+                                ""));
+    }
+}
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
index a39c6670..4df95275 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.table.store.hive;
 
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
-
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
 import java.math.BigDecimal;
 import java.util.Arrays;
@@ -34,30 +34,31 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /** Util class for generating random {@link GenericRowData}. */
 public class RandomGenericRowDataGenerator {
 
-    public static final List<TypeInfo> TYPE_INFOS =
+    public static final List<LogicalType> LOGICAL_TYPES =
             Arrays.asList(
-                    TypeInfoFactory.booleanTypeInfo,
-                    TypeInfoFactory.byteTypeInfo,
-                    TypeInfoFactory.shortTypeInfo,
-                    TypeInfoFactory.intTypeInfo,
-                    TypeInfoFactory.longTypeInfo,
-                    TypeInfoFactory.floatTypeInfo,
-                    TypeInfoFactory.doubleTypeInfo,
-                    TypeInfoFactory.getDecimalTypeInfo(5, 3),
-                    TypeInfoFactory.getDecimalTypeInfo(28, 6),
-                    TypeInfoFactory.getCharTypeInfo(10),
-                    TypeInfoFactory.getVarcharTypeInfo(10),
-                    TypeInfoFactory.stringTypeInfo,
-                    TypeInfoFactory.binaryTypeInfo,
-                    TypeInfoFactory.dateTypeInfo,
-                    TypeInfoFactory.timestampTypeInfo,
-                    TypeInfoFactory.getListTypeInfo(TypeInfoFactory.longTypeInfo),
-                    TypeInfoFactory.getMapTypeInfo(
-                            TypeInfoFactory.stringTypeInfo, TypeInfoFactory.intTypeInfo));
+                    DataTypes.BOOLEAN().getLogicalType(),
+                    DataTypes.TINYINT().getLogicalType(),
+                    DataTypes.SMALLINT().getLogicalType(),
+                    DataTypes.INT().getLogicalType(),
+                    DataTypes.BIGINT().getLogicalType(),
+                    DataTypes.FLOAT().getLogicalType(),
+                    DataTypes.DOUBLE().getLogicalType(),
+                    DataTypes.DECIMAL(5, 3).getLogicalType(),
+                    DataTypes.DECIMAL(28, 6).getLogicalType(),
+                    DataTypes.CHAR(10).getLogicalType(),
+                    DataTypes.VARCHAR(10).getLogicalType(),
+                    DataTypes.STRING().getLogicalType(),
+                    DataTypes.VARBINARY(Integer.MAX_VALUE).getLogicalType(),
+                    DataTypes.DATE().getLogicalType(),
+                    DataTypes.TIMESTAMP(3).getLogicalType(),
+                    DataTypes.ARRAY(DataTypes.BIGINT()).getLogicalType(),
+                    DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()).getLogicalType());
 
     public static final List<String> TYPE_NAMES =
             Arrays.asList(
@@ -119,6 +120,17 @@ public class RandomGenericRowDataGenerator {
                     "comment_list_long",
                     "comment_map_string_int");
 
+    public static final RowType ROW_TYPE =
+            new RowType(
+                    IntStream.range(0, FIELD_NAMES.size())
+                            .mapToObj(
+                                    i ->
+                                            new RowType.RowField(
+                                                    FIELD_NAMES.get(i),
+                                                    LOGICAL_TYPES.get(i),
+                                                    FIELD_COMMENTS.get(i)))
+                            .collect(Collectors.toList()));
+
     public static GenericRowData generate() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         byte[] randomBytes = new byte[random.nextInt(20)];
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index d0a87ccf..5b518d50 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -113,11 +113,7 @@ public class TableStoreHiveStorageHandlerITCase {
                 String.join(
                         "\n",
                         Arrays.asList(
-                                "CREATE EXTERNAL TABLE test_table (",
-                                "  a INT,",
-                                "  b BIGINT,",
-                                "  c STRING",
-                                ")",
+                                "CREATE EXTERNAL TABLE test_table",
                                 "STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
                                 "LOCATION '" + path + "'")));
         List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM test_table ORDER BY b");
@@ -159,11 +155,7 @@ public class TableStoreHiveStorageHandlerITCase {
                 String.join(
                         "\n",
                         Arrays.asList(
-                                "CREATE EXTERNAL TABLE test_table (",
-                                "  a INT,",
-                                "  b BIGINT,",
-                                "  c STRING",
-                                ")",
+                                "CREATE EXTERNAL TABLE test_table",
                                 "STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
                                 "LOCATION '" + path + "'")));
         List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM test_table ORDER BY b");
@@ -181,14 +173,7 @@ public class TableStoreHiveStorageHandlerITCase {
         FileStoreTable table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
-                        RowType.of(
-                                RandomGenericRowDataGenerator.TYPE_INFOS.stream()
-                                        .map(
-                                                t ->
-                                                        HiveTypeUtils.typeInfoToDataType(t)
-                                                                .getLogicalType())
-                                        .toArray(LogicalType[]::new),
-                                RandomGenericRowDataGenerator.FIELD_NAMES.toArray(new String[0])),
+                        RandomGenericRowDataGenerator.ROW_TYPE,
                         Collections.emptyList(),
                         Collections.singletonList("f_int"));
 
@@ -212,33 +197,13 @@ public class TableStoreHiveStorageHandlerITCase {
         table.newCommit().commit("0", write.prepareCommit());
         write.close();
 
-        StringBuilder ddl = new StringBuilder();
-        for (int i = 0; i < RandomGenericRowDataGenerator.FIELD_NAMES.size(); i++) {
-            if (i != 0) {
-                ddl.append(",\n");
-            }
-            ddl.append("  ")
-                    .append(RandomGenericRowDataGenerator.FIELD_NAMES.get(i))
-                    .append(" ")
-                    .append(RandomGenericRowDataGenerator.TYPE_NAMES.get(i))
-                    .append(" COMMENT '")
-                    .append(RandomGenericRowDataGenerator.FIELD_COMMENTS.get(i))
-                    .append("'");
-        }
         hiveShell.execute(
                 String.join(
                         "\n",
                         Arrays.asList(
-                                "CREATE EXTERNAL TABLE test_table (",
-                                ddl.toString(),
-                                ")",
+                                "CREATE EXTERNAL TABLE test_table",
                                 "STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '" + root + "'",
-                                "TBLPROPERTIES (",
-                                "  'table-store.catalog' = 'test_catalog',",
-                                "  'table-store.primary-keys' = 'f_int',",
-                                "  'table-store.file.format' = 'avro'",
-                                ")")));
+                                "LOCATION '" + root + "'")));
         List<Object[]> actual =
                 hiveShell.executeStatement("SELECT * FROM test_table WHERE f_int > 0");
 
@@ -261,7 +226,7 @@ public class TableStoreHiveStorageHandlerITCase {
                 }
                 ObjectInspector oi =
                         HiveTypeUtils.getObjectInspector(
-                                RandomGenericRowDataGenerator.TYPE_INFOS.get(i));
+                                RandomGenericRowDataGenerator.LOGICAL_TYPES.get(i));
                 switch (oi.getCategory()) {
                     case PRIMITIVE:
                         AbstractPrimitiveJavaObjectInspector primitiveOi =
@@ -350,15 +315,9 @@ public class TableStoreHiveStorageHandlerITCase {
                 String.join(
                         "\n",
                         Arrays.asList(
-                                "CREATE EXTERNAL TABLE test_table (",
-                                "  a INT",
-                                ")",
+                                "CREATE EXTERNAL TABLE test_table",
                                 "STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '" + path + "'",
-                                "TBLPROPERTIES (",
-                                "  'table-store.catalog' = 'test_catalog',",
-                                "  'table-store.file.format' = 'avro'",
-                                ")")));
+                                "LOCATION '" + path + "'")));
         Assert.assertEquals(
                 Arrays.asList("1", "5"),
                 hiveShell.executeQuery("SELECT * FROM test_table WHERE a = 1 OR a = 5"));
@@ -442,16 +401,9 @@ public class TableStoreHiveStorageHandlerITCase {
                 String.join(
                         "\n",
                         Arrays.asList(
-                                "CREATE EXTERNAL TABLE test_table (",
-                                "  dt DATE,",
-                                "  ts TIMESTAMP",
-                                ")",
+                                "CREATE EXTERNAL TABLE test_table",
                                 "STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '" + path + "'",
-                                "TBLPROPERTIES (",
-                                "  'table-store.catalog' = 'test_catalog',",
-                                "  'table-store.file.format' = 'avro'",
-                                ")")));
+                                "LOCATION '" + path + "'")));
         Assert.assertEquals(
                 Collections.singletonList("1971-01-11\t2022-05-17 17:29:20.0"),
                 hiveShell.executeQuery("SELECT * FROM test_table WHERE dt = '1971-01-11'"));
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
index fad20dc5..44972f00 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
@@ -18,28 +18,34 @@
 
 package org.apache.flink.table.store.hive;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.RowDataContainer;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.hive.objectinspector.TableStoreRowDataObjectInspector;
 
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.FIELD_COMMENTS;
 import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.FIELD_NAMES;
-import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.TYPE_INFOS;
+import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.ROW_TYPE;
 import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.generate;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link TableStoreSerDe}. */
 public class TableStoreSerDeTest {
 
+    @TempDir java.nio.file.Path tempDir;
+
     @Test
     public void testInitialize() throws Exception {
         TableStoreSerDe serDe = createInitializedSerDe();
@@ -66,12 +72,17 @@ public class TableStoreSerDeTest {
     }
 
     private TableStoreSerDe createInitializedSerDe() throws Exception {
+        new SchemaManager(new Path(tempDir.toString()))
+                .commitNewVersion(
+                        new UpdateSchema(
+                                ROW_TYPE,
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                new HashMap<>(),
+                                ""));
+
         Properties properties = new Properties();
-        properties.setProperty("columns", String.join(",", FIELD_NAMES));
-        properties.setProperty(
-                "columns.types",
-                TYPE_INFOS.stream().map(TypeInfo::getTypeName).collect(Collectors.joining(":")));
-        properties.setProperty("columns.comments", String.join("\0", FIELD_COMMENTS));
+        properties.setProperty("location", tempDir.toString());
 
         TableStoreSerDe serDe = new TableStoreSerDe();
         serDe.initialize(null, properties);
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
index 2a202fde..152962c7 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -34,8 +33,7 @@ public class TableStoreCharObjectInspectorTest {
 
     @Test
     public void testCategoryAndClass() {
-        TableStoreCharObjectInspector oi =
-                new TableStoreCharObjectInspector(TypeInfoFactory.getCharTypeInfo(10));
+        TableStoreCharObjectInspector oi = new TableStoreCharObjectInspector(10);
 
         assertThat(oi.getCategory()).isEqualTo(ObjectInspector.Category.PRIMITIVE);
         assertThat(oi.getPrimitiveCategory())
@@ -47,8 +45,7 @@ public class TableStoreCharObjectInspectorTest {
 
     @Test
     public void testGetPrimitiveJavaObject() {
-        TableStoreCharObjectInspector oi =
-                new TableStoreCharObjectInspector(TypeInfoFactory.getCharTypeInfo(10));
+        TableStoreCharObjectInspector oi = new TableStoreCharObjectInspector(10);
 
         StringData input1 = StringData.fromString("testString");
         HiveChar expected1 = new HiveChar("testString", 10);
@@ -61,8 +58,7 @@ public class TableStoreCharObjectInspectorTest {
 
     @Test
     public void testGetPrimitiveWritableObject() {
-        TableStoreCharObjectInspector oi =
-                new TableStoreCharObjectInspector(TypeInfoFactory.getCharTypeInfo(10));
+        TableStoreCharObjectInspector oi = new TableStoreCharObjectInspector(10);
 
         StringData input1 = StringData.fromString("testString");
         HiveCharWritable expected1 = new HiveCharWritable(new HiveChar("testString", 10));
@@ -75,8 +71,7 @@ public class TableStoreCharObjectInspectorTest {
 
     @Test
     public void testCopyObject() {
-        TableStoreCharObjectInspector oi =
-                new TableStoreCharObjectInspector(TypeInfoFactory.getCharTypeInfo(10));
+        TableStoreCharObjectInspector oi = new TableStoreCharObjectInspector(10);
 
         StringData input1 = StringData.fromString("testString");
         Object copy1 = oi.copyObject(input1);
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
index bbc9f055..575fba11 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
@@ -36,8 +35,7 @@ public class TableStoreDecimalObjectInspectorTest {
 
     @Test
     public void testCategoryAndClass() {
-        TableStoreDecimalObjectInspector oi =
-                new TableStoreDecimalObjectInspector(TypeInfoFactory.getDecimalTypeInfo(5, 3));
+        TableStoreDecimalObjectInspector oi = new TableStoreDecimalObjectInspector(5, 3);
 
         assertThat(oi.getCategory()).isEqualTo(ObjectInspector.Category.PRIMITIVE);
         assertThat(oi.getPrimitiveCategory())
@@ -49,8 +47,7 @@ public class TableStoreDecimalObjectInspectorTest {
 
     @Test
     public void testGetPrimitiveJavaObject() {
-        TableStoreDecimalObjectInspector oi =
-                new TableStoreDecimalObjectInspector(TypeInfoFactory.getDecimalTypeInfo(5, 3));
+        TableStoreDecimalObjectInspector oi = new TableStoreDecimalObjectInspector(5, 3);
 
         DecimalData input = DecimalData.fromBigDecimal(new BigDecimal("12.345"), 5, 3);
         HiveDecimal expected = HiveDecimal.create("12.345");
@@ -60,8 +57,7 @@ public class TableStoreDecimalObjectInspectorTest {
 
     @Test
     public void testGetPrimitiveWritableObject() {
-        TableStoreDecimalObjectInspector oi =
-                new TableStoreDecimalObjectInspector(TypeInfoFactory.getDecimalTypeInfo(5, 3));
+        TableStoreDecimalObjectInspector oi = new TableStoreDecimalObjectInspector(5, 3);
 
         DecimalData input = DecimalData.fromBigDecimal(new BigDecimal("12.345"), 5, 3);
         HiveDecimalWritable expected = new HiveDecimalWritable(HiveDecimal.create("12.345"));
@@ -71,8 +67,7 @@ public class TableStoreDecimalObjectInspectorTest {
 
     @Test
     public void testCopyObject() {
-        TableStoreDecimalObjectInspector oi =
-                new TableStoreDecimalObjectInspector(TypeInfoFactory.getDecimalTypeInfo(5, 3));
+        TableStoreDecimalObjectInspector oi = new TableStoreDecimalObjectInspector(5, 3);
 
         DecimalData input1 = DecimalData.fromBigDecimal(new BigDecimal("12.345"), 5, 3);
         Object copy1 = oi.copyObject(input1);
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
index 8d1f4372..e1d5593d 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.table.store.hive.objectinspector;
 
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.StringData;
 
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -35,7 +35,7 @@ public class TableStoreListObjectInspectorTest {
     @Test
     public void testCategoryAndTypeName() {
         TableStoreListObjectInspector oi =
-                new TableStoreListObjectInspector(TypeInfoFactory.stringTypeInfo);
+                new TableStoreListObjectInspector(DataTypes.STRING().getLogicalType());
 
         assertThat(oi.getCategory()).isEqualTo(ObjectInspector.Category.LIST);
         assertThat(oi.getTypeName()).isEqualTo("array<string>");
@@ -44,7 +44,7 @@ public class TableStoreListObjectInspectorTest {
     @Test
     public void testGetListAndElement() {
         TableStoreListObjectInspector oi =
-                new TableStoreListObjectInspector(TypeInfoFactory.stringTypeInfo);
+                new TableStoreListObjectInspector(DataTypes.STRING().getLogicalType());
 
         StringData[] stringDataArray =
                 new StringData[] {
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
index cad419ae..6b1da65f 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.table.store.hive.objectinspector;
 
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.StringData;
 
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
@@ -37,7 +37,7 @@ public class TableStoreMapObjectInspectorTest {
     public void testCategoryAndTypeName() {
         TableStoreMapObjectInspector oi =
                 new TableStoreMapObjectInspector(
-                        TypeInfoFactory.stringTypeInfo, TypeInfoFactory.longTypeInfo);
+                        DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType());
 
         assertThat(oi.getCategory()).isEqualTo(ObjectInspector.Category.MAP);
         assertThat(oi.getTypeName()).isEqualTo("map<string,bigint>");
@@ -47,7 +47,7 @@ public class TableStoreMapObjectInspectorTest {
     public void testGetMapAndValue() {
         TableStoreMapObjectInspector oi =
                 new TableStoreMapObjectInspector(
-                        TypeInfoFactory.stringTypeInfo, TypeInfoFactory.longTypeInfo);
+                        DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType());
 
         StringData[] keyArray =
                 new StringData[] {
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
index 37d3df10..a0979a98 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
@@ -29,7 +29,7 @@ import java.util.List;
 
 import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.FIELD_COMMENTS;
 import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.FIELD_NAMES;
-import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.TYPE_INFOS;
+import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.LOGICAL_TYPES;
 import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.TYPE_NAMES;
 import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.generate;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -40,7 +40,7 @@ public class TableStoreRowDataObjectInspectorTest {
     @Test
     public void testGetStructFieldRef() {
         TableStoreRowDataObjectInspector oi =
-                new TableStoreRowDataObjectInspector(FIELD_NAMES, TYPE_INFOS, FIELD_COMMENTS);
+                new TableStoreRowDataObjectInspector(FIELD_NAMES, LOGICAL_TYPES, FIELD_COMMENTS);
         List<? extends StructField> structFields = oi.getAllStructFieldRefs();
         List<ObjectInspector.Category> expectedOiCategories =
                 Arrays.asList(
@@ -77,7 +77,7 @@ public class TableStoreRowDataObjectInspectorTest {
     @Test
     public void testGetTypeName() {
         TableStoreRowDataObjectInspector oi =
-                new TableStoreRowDataObjectInspector(FIELD_NAMES, TYPE_INFOS, FIELD_COMMENTS);
+                new TableStoreRowDataObjectInspector(FIELD_NAMES, LOGICAL_TYPES, FIELD_COMMENTS);
         String expected =
                 "struct<"
                         + String.join(
@@ -107,7 +107,7 @@ public class TableStoreRowDataObjectInspectorTest {
     @Test
     public void testGetStructFieldData() {
         TableStoreRowDataObjectInspector oi =
-                new TableStoreRowDataObjectInspector(FIELD_NAMES, TYPE_INFOS, FIELD_COMMENTS);
+                new TableStoreRowDataObjectInspector(FIELD_NAMES, LOGICAL_TYPES, FIELD_COMMENTS);
         GenericRowData rowData = generate();
         List<Object> structFieldsData = oi.getStructFieldsDataAsList(rowData);
         for (int i = 0; i < structFieldsData.size(); i++) {
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
index d9ce7dea..7c022a2e 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -34,8 +33,7 @@ public class TableStoreVarcharObjectInspectorTest {
 
     @Test
     public void testCategoryAndClass() {
-        TableStoreVarcharObjectInspector oi =
-                new TableStoreVarcharObjectInspector(TypeInfoFactory.getVarcharTypeInfo(10));
+        TableStoreVarcharObjectInspector oi = new TableStoreVarcharObjectInspector(10);
 
         assertThat(oi.getCategory()).isEqualTo(ObjectInspector.Category.PRIMITIVE);
         assertThat(oi.getPrimitiveCategory())
@@ -47,8 +45,7 @@ public class TableStoreVarcharObjectInspectorTest {
 
     @Test
     public void testGetPrimitiveJavaObject() {
-        TableStoreVarcharObjectInspector oi =
-                new TableStoreVarcharObjectInspector(TypeInfoFactory.getVarcharTypeInfo(10));
+        TableStoreVarcharObjectInspector oi = new TableStoreVarcharObjectInspector(10);
 
         StringData input1 = StringData.fromString("testString");
         HiveVarchar expected1 = new HiveVarchar("testString", 10);
@@ -61,8 +58,7 @@ public class TableStoreVarcharObjectInspectorTest {
 
     @Test
     public void testGetPrimitiveWritableObject() {
-        TableStoreVarcharObjectInspector oi =
-                new TableStoreVarcharObjectInspector(TypeInfoFactory.getVarcharTypeInfo(10));
+        TableStoreVarcharObjectInspector oi = new TableStoreVarcharObjectInspector(10);
 
         StringData input1 = StringData.fromString("testString");
         HiveVarcharWritable expected1 = new HiveVarcharWritable(new HiveVarchar("testString", 10));
@@ -75,8 +71,7 @@ public class TableStoreVarcharObjectInspectorTest {
 
     @Test
     public void testCopyObject() {
-        TableStoreVarcharObjectInspector oi =
-                new TableStoreVarcharObjectInspector(TypeInfoFactory.getVarcharTypeInfo(10));
+        TableStoreVarcharObjectInspector oi = new TableStoreVarcharObjectInspector(10);
 
         StringData input1 = StringData.fromString("testString");
         Object copy1 = oi.copyObject(input1);