You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/16 11:51:45 UTC
[flink-table-store] branch master updated: [FLINK-28079] Check Hive DDL against table store schema when creating table
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new a3d92b20 [FLINK-28079] Check Hive DDL against table store schema when creating table
a3d92b20 is described below
commit a3d92b20f21d265109b667291ffd62177d5e0a78
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Jun 16 19:51:41 2022 +0800
[FLINK-28079] Check Hive DDL against table store schema when creating table
This closes #158
---
.../flink/table/store/TableStoreJobConf.java | 123 ------------
.../apache/flink/table/store/hive/HiveSchema.java | 139 +++++++++----
.../flink/table/store/hive/HiveTypeUtils.java | 173 +++++++++--------
.../table/store/hive/TableStoreHiveMetaHook.java | 1 -
.../store/hive/TableStoreHiveStorageHandler.java | 4 -
.../flink/table/store/hive/TableStoreSerDe.java | 2 +-
.../TableStoreCharObjectInspector.java | 8 +-
.../TableStoreDecimalObjectInspector.java | 6 +-
.../TableStoreListObjectInspector.java | 10 +-
.../TableStoreMapObjectInspector.java | 16 +-
.../TableStoreRowDataObjectInspector.java | 16 +-
.../TableStoreVarcharObjectInspector.java | 8 +-
.../table/store/mapred/TableStoreInputFormat.java | 11 +-
.../flink/table/store/hive/HiveSchemaTest.java | 215 +++++++++++++++++++++
.../store/hive/RandomGenericRowDataGenerator.java | 56 +++---
.../hive/TableStoreHiveStorageHandlerITCase.java | 68 +------
.../table/store/hive/TableStoreSerDeTest.java | 27 ++-
.../TableStoreCharObjectInspectorTest.java | 13 +-
.../TableStoreDecimalObjectInspectorTest.java | 13 +-
.../TableStoreListObjectInspectorTest.java | 6 +-
.../TableStoreMapObjectInspectorTest.java | 6 +-
.../TableStoreRowDataObjectInspectorTest.java | 8 +-
.../TableStoreVarcharObjectInspectorTest.java | 13 +-
23 files changed, 531 insertions(+), 411 deletions(-)
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
index 9af0648f..7d630b9d 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
@@ -18,25 +18,11 @@
package org.apache.flink.table.store;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.hive.HiveTypeUtils;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
-import org.apache.flink.util.Preconditions;
-
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.JobConf;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
-import java.util.stream.Collectors;
/**
* Utility class to convert Hive table property keys and get file store specific configurations from
@@ -44,24 +30,7 @@ import java.util.stream.Collectors;
*/
public class TableStoreJobConf {
- private static final String TBLPROPERTIES_PREFIX = "table-store.";
- private static final String TBLPROPERTIES_PRIMARY_KEYS = TBLPROPERTIES_PREFIX + "primary-keys";
- private static final String INTERNAL_TBLPROPERTIES_PREFIX =
- "table-store.internal.tblproperties.";
-
- private static final String INTERNAL_DB_NAME = "table-store.internal.db-name";
- private static final String INTERNAL_TABLE_NAME = "table-store.internal.table-name";
private static final String INTERNAL_LOCATION = "table-store.internal.location";
-
- private static final String INTERNAL_COLUMN_NAMES = "table-store.internal.column-names";
-
- private static final String INTERNAL_COLUMN_TYPES = "table-store.internal.column-types";
- private static final String COLUMN_TYPES_SEPARATOR = "\0";
-
- private static final String INTERNAL_PARTITION_COLUMN_NAMES =
- "table-store.internal.partition-column-names";
- private static final String INTERNAL_PRIMARY_KEYS = "table-store.internal.primary-keys";
-
private static final String INTERNAL_FILE_STORE_USER = "table-store.internal.file-store.user";
private final JobConf jobConf;
@@ -71,107 +40,15 @@ public class TableStoreJobConf {
}
public static void configureInputJobProperties(Properties properties, Map<String, String> map) {
- String tableNameString = properties.getProperty(hive_metastoreConstants.META_TABLE_NAME);
- String[] tableName = tableNameString.split("\\.");
- Preconditions.checkState(
- tableName.length >= 2,
- "There is no dot in META_TABLE_NAME " + tableNameString + ". This is unexpected.");
-
- map.put(
- INTERNAL_DB_NAME,
- String.join(".", Arrays.copyOfRange(tableName, 0, tableName.length - 1)));
-
- map.put(INTERNAL_TABLE_NAME, tableName[tableName.length - 1]);
-
map.put(
INTERNAL_LOCATION,
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
-
- map.put(
- INTERNAL_COLUMN_NAMES,
- properties.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS));
-
- List<String> serializedLogicalTypes =
- TypeInfoUtils.getTypeInfosFromTypeString(
- properties.getProperty(
- hive_metastoreConstants.META_TABLE_COLUMN_TYPES))
- .stream()
- .map(
- t ->
- HiveTypeUtils.typeInfoToDataType(t)
- .getLogicalType()
- .asSerializableString())
- .collect(Collectors.toList());
- map.put(INTERNAL_COLUMN_TYPES, String.join(COLUMN_TYPES_SEPARATOR, serializedLogicalTypes));
-
- if (properties.containsKey(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS)) {
- map.put(
- INTERNAL_PARTITION_COLUMN_NAMES,
- properties.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS));
- }
-
- if (properties.containsKey(TBLPROPERTIES_PRIMARY_KEYS)) {
- map.put(INTERNAL_PRIMARY_KEYS, properties.getProperty(TBLPROPERTIES_PRIMARY_KEYS));
- }
-
- for (ConfigOption<?> option : FileStoreOptions.allOptions()) {
- if (properties.containsKey(TBLPROPERTIES_PREFIX + option.key())) {
- map.put(
- INTERNAL_TBLPROPERTIES_PREFIX + option.key(),
- properties.getProperty(TBLPROPERTIES_PREFIX + option.key()));
- }
- }
- }
-
- public String getDbName() {
- return jobConf.get(INTERNAL_DB_NAME);
- }
-
- public String getTableName() {
- return jobConf.get(INTERNAL_TABLE_NAME);
}
public String getLocation() {
return jobConf.get(INTERNAL_LOCATION);
}
- public void updateFileStoreOptions(Configuration fileStoreOptions) {
- fileStoreOptions.set(FileStoreOptions.PATH, getLocation());
- for (Map.Entry<String, String> entry :
- jobConf.getPropsWithPrefix(INTERNAL_TBLPROPERTIES_PREFIX).entrySet()) {
- fileStoreOptions.setString(entry.getKey(), entry.getValue());
- }
- }
-
- public List<String> getColumnNames() {
- // see MetastoreUtils#addCols for the exact separator
- return Arrays.asList(jobConf.get(INTERNAL_COLUMN_NAMES).split(","));
- }
-
- public List<LogicalType> getColumnTypes() {
- return Arrays.stream(jobConf.get(INTERNAL_COLUMN_TYPES).split(COLUMN_TYPES_SEPARATOR))
- .map(LogicalTypeParser::parse)
- .collect(Collectors.toList());
- }
-
- public List<String> getPartitionColumnNames() {
- String partitionColumnNameString = jobConf.get(INTERNAL_PARTITION_COLUMN_NAMES);
- // see MetastoreUtils#addCols for the exact separator
- return partitionColumnNameString == null
- ? Collections.emptyList()
- : Arrays.asList(partitionColumnNameString.split("/"));
- }
-
- public Optional<List<String>> getPrimaryKeyNames() {
- String primaryKeyNameString = jobConf.get(INTERNAL_PRIMARY_KEYS);
- if (primaryKeyNameString == null) {
- return Optional.empty();
- } else {
- // TODO add a constant for this separator?
- return Optional.of(Arrays.asList(primaryKeyNameString.split(",")));
- }
- }
-
public String getFileStoreUser() {
return jobConf.get(INTERNAL_FILE_STORE_USER);
}
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
index 22c6ef3d..47b111f1 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
@@ -18,66 +18,139 @@
package org.apache.flink.table.store.hive;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.Properties;
+import java.util.stream.Collectors;
/** Column names, types and comments of a Hive table. */
public class HiveSchema {
- private final List<String> fieldNames;
- private final List<TypeInfo> fieldTypeInfos;
- private final List<String> fieldComments;
-
- private HiveSchema(
- List<String> fieldNames, List<TypeInfo> fieldTypeInfos, List<String> fieldComments) {
- Preconditions.checkArgument(
- fieldNames.size() == fieldTypeInfos.size()
- && fieldNames.size() == fieldComments.size(),
- "Length of field names (%s), type infos (%s) and comments (%s) are different.",
- fieldNames.size(),
- fieldTypeInfos.size(),
- fieldComments.size());
- this.fieldNames = fieldNames;
- this.fieldTypeInfos = fieldTypeInfos;
- this.fieldComments = fieldComments;
+ private final Schema schema;
+
+ private HiveSchema(Schema schema) {
+ this.schema = schema;
}
public List<String> fieldNames() {
- return fieldNames;
+ return schema.fieldNames();
}
- public List<TypeInfo> fieldTypeInfos() {
- return fieldTypeInfos;
+ public List<LogicalType> fieldTypes() {
+ return schema.logicalRowType().getChildren();
}
public List<String> fieldComments() {
- return fieldComments;
+ return schema.fields().stream().map(DataField::description).collect(Collectors.toList());
}
/** Extract {@link HiveSchema} from Hive serde properties. */
public static HiveSchema extract(Properties properties) {
- String columnNames = properties.getProperty(serdeConstants.LIST_COLUMNS);
- String columnNameDelimiter =
- properties.getProperty(
- serdeConstants.COLUMN_NAME_DELIMITER, String.valueOf(SerDeUtils.COMMA));
- List<String> names = Arrays.asList(columnNames.split(columnNameDelimiter));
+ String location = properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
+ if (location == null) {
+ String tableName = properties.getProperty(hive_metastoreConstants.META_TABLE_NAME);
+ throw new UnsupportedOperationException(
+ "Location property is missing for table "
+ + tableName
+ + ". Currently Flink table store only supports external table for Hive "
+ + "so location property must be set.");
+ }
+ Schema schema =
+ new SchemaManager(new Path(location))
+ .latest()
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Schema file not found in location "
+ + location
+ + ". Please create table first."));
+
+ if (properties.containsKey(serdeConstants.LIST_COLUMNS)
+ && properties.containsKey(serdeConstants.LIST_COLUMN_TYPES)) {
+ String columnNames = properties.getProperty(serdeConstants.LIST_COLUMNS);
+ String columnNameDelimiter =
+ properties.getProperty(
+ serdeConstants.COLUMN_NAME_DELIMITER, String.valueOf(SerDeUtils.COMMA));
+ List<String> names = Arrays.asList(columnNames.split(columnNameDelimiter));
+
+ String columnTypes = properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+ List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
+
+ if (names.size() > 0 && typeInfos.size() > 0) {
+ checkSchemaMatched(names, typeInfos, schema);
+ }
+ }
+
+ return new HiveSchema(schema);
+ }
+
+ private static void checkSchemaMatched(
+ List<String> names, List<TypeInfo> typeInfos, Schema schema) {
+ List<String> ddlNames = new ArrayList<>(names);
+ List<TypeInfo> ddlTypeInfos = new ArrayList<>(typeInfos);
+ List<String> schemaNames = schema.fieldNames();
+ List<TypeInfo> schemaTypeInfos =
+ schema.logicalRowType().getChildren().stream()
+ .map(HiveTypeUtils::logicalTypeToTypeInfo)
+ .collect(Collectors.toList());
- String columnTypes = properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
- List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
+ // make the lengths of lists equal
+ while (ddlNames.size() < schemaNames.size()) {
+ ddlNames.add(null);
+ }
+ while (schemaNames.size() < ddlNames.size()) {
+ schemaNames.add(null);
+ }
+ while (ddlTypeInfos.size() < schemaTypeInfos.size()) {
+ ddlTypeInfos.add(null);
+ }
+ while (schemaTypeInfos.size() < ddlTypeInfos.size()) {
+ schemaTypeInfos.add(null);
+ }
- // see MetastoreUtils#addCols for the exact property name and separator
- String columnCommentsPropertyName = "columns.comments";
- List<String> comments =
- Arrays.asList(properties.getProperty(columnCommentsPropertyName).split("\0", -1));
+ // compare names and type infos
+ List<String> mismatched = new ArrayList<>();
+ for (int i = 0; i < ddlNames.size(); i++) {
+ if (!Objects.equals(ddlNames.get(i), schemaNames.get(i))
+ || !Objects.equals(ddlTypeInfos.get(i), schemaTypeInfos.get(i))) {
+ String ddlField =
+ ddlNames.get(i) == null
+ ? "null"
+ : ddlNames.get(i) + " " + ddlTypeInfos.get(i).getTypeName();
+ String schemaField =
+ schemaNames.get(i) == null
+ ? "null"
+ : schemaNames.get(i) + " " + schemaTypeInfos.get(i).getTypeName();
+ mismatched.add(
+ String.format(
+ "Field #%d\n"
+ + "Hive DDL : %s\n"
+ + "Table Store Schema: %s\n",
+ i, ddlField, schemaField));
+ }
+ }
- return new HiveSchema(names, typeInfos, comments);
+ if (mismatched.size() > 0) {
+ throw new IllegalArgumentException(
+ "Hive DDL and table store schema mismatched! "
+ + "It is recommended not to write any column definition "
+ + "as Flink table store external table can read schema from the specified location.\n"
+ + "Mismatched fields are:\n"
+ + String.join("--------------------\n", mismatched));
+ }
}
}
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
index 2009bd1f..d6061b12 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.hive;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.store.hive.objectinspector.TableStoreCharObjectInspector;
import org.apache.flink.table.store.hive.objectinspector.TableStoreDateObjectInspector;
import org.apache.flink.table.store.hive.objectinspector.TableStoreDecimalObjectInspector;
@@ -27,111 +26,115 @@ import org.apache.flink.table.store.hive.objectinspector.TableStoreMapObjectInsp
import org.apache.flink.table.store.hive.objectinspector.TableStoreStringObjectInspector;
import org.apache.flink.table.store.hive.objectinspector.TableStoreTimestampObjectInspector;
import org.apache.flink.table.store.hive.objectinspector.TableStoreVarcharObjectInspector;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.VarCharType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/** Utils for converting types related classes between Flink and Hive. */
public class HiveTypeUtils {
- public static DataType typeInfoToDataType(TypeInfo typeInfo) {
- ObjectInspector.Category category = typeInfo.getCategory();
- switch (category) {
- case PRIMITIVE:
- PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
- ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
- switch (primitiveCategory) {
- case BOOLEAN:
- return DataTypes.BOOLEAN();
- case BYTE:
- return DataTypes.TINYINT();
- case SHORT:
- return DataTypes.SMALLINT();
- case INT:
- return DataTypes.INT();
- case LONG:
- return DataTypes.BIGINT();
- case FLOAT:
- return DataTypes.FLOAT();
- case DOUBLE:
- return DataTypes.DOUBLE();
- case DECIMAL:
- DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
- return DataTypes.DECIMAL(
- decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
- case CHAR:
- return DataTypes.CHAR(((CharTypeInfo) typeInfo).getLength());
- case VARCHAR:
- return DataTypes.VARCHAR(((VarcharTypeInfo) typeInfo).getLength());
- case STRING:
- return DataTypes.STRING();
- case BINARY:
- return DataTypes.BYTES();
- case DATE:
- return DataTypes.DATE();
- case TIMESTAMP:
- // TODO verify precision
- return DataTypes.TIMESTAMP(3);
- default:
- throw new UnsupportedOperationException(
- "Unsupported primitive type info category "
- + primitiveCategory.name());
+ public static TypeInfo logicalTypeToTypeInfo(LogicalType logicalType) {
+ switch (logicalType.getTypeRoot()) {
+ case BOOLEAN:
+ return TypeInfoFactory.booleanTypeInfo;
+ case TINYINT:
+ return TypeInfoFactory.byteTypeInfo;
+ case SMALLINT:
+ return TypeInfoFactory.shortTypeInfo;
+ case INTEGER:
+ return TypeInfoFactory.intTypeInfo;
+ case BIGINT:
+ return TypeInfoFactory.longTypeInfo;
+ case FLOAT:
+ return TypeInfoFactory.floatTypeInfo;
+ case DOUBLE:
+ return TypeInfoFactory.doubleTypeInfo;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) logicalType;
+ return TypeInfoFactory.getDecimalTypeInfo(
+ decimalType.getPrecision(), decimalType.getScale());
+ case CHAR:
+ CharType charType = (CharType) logicalType;
+ return TypeInfoFactory.getCharTypeInfo(charType.getLength());
+ case VARCHAR:
+ VarCharType varCharType = (VarCharType) logicalType;
+ if (varCharType.getLength() == VarCharType.MAX_LENGTH) {
+ return TypeInfoFactory.stringTypeInfo;
+ } else {
+ return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
}
- case LIST:
- ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
- return DataTypes.ARRAY(typeInfoToDataType(listTypeInfo.getListElementTypeInfo()));
+ case BINARY:
+ case VARBINARY:
+ return TypeInfoFactory.binaryTypeInfo;
+ case DATE:
+ return TypeInfoFactory.dateTypeInfo;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return TypeInfoFactory.timestampTypeInfo;
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) logicalType;
+ return TypeInfoFactory.getListTypeInfo(
+ logicalTypeToTypeInfo(arrayType.getElementType()));
case MAP:
- MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
- return DataTypes.MAP(
- typeInfoToDataType(mapTypeInfo.getMapKeyTypeInfo()),
- typeInfoToDataType(mapTypeInfo.getMapValueTypeInfo()));
+ MapType mapType = (MapType) logicalType;
+ return TypeInfoFactory.getMapTypeInfo(
+ logicalTypeToTypeInfo(mapType.getKeyType()),
+ logicalTypeToTypeInfo(mapType.getValueType()));
default:
throw new UnsupportedOperationException(
- "Unsupported type info category " + category.name());
+ "Unsupported logical type " + logicalType.asSummaryString());
}
}
- public static ObjectInspector getObjectInspector(TypeInfo typeInfo) {
- ObjectInspector.Category category = typeInfo.getCategory();
- switch (category) {
- case PRIMITIVE:
- PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
- switch (primitiveTypeInfo.getPrimitiveCategory()) {
- case DECIMAL:
- return new TableStoreDecimalObjectInspector((DecimalTypeInfo) typeInfo);
- case CHAR:
- return new TableStoreCharObjectInspector((CharTypeInfo) typeInfo);
- case VARCHAR:
- return new TableStoreVarcharObjectInspector((VarcharTypeInfo) typeInfo);
- case STRING:
- return new TableStoreStringObjectInspector();
- case DATE:
- return new TableStoreDateObjectInspector();
- case TIMESTAMP:
- return new TableStoreTimestampObjectInspector();
- default:
- return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
- primitiveTypeInfo);
+ public static ObjectInspector getObjectInspector(LogicalType logicalType) {
+ switch (logicalType.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case BINARY:
+ case VARBINARY:
+ return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
+ (PrimitiveTypeInfo) logicalTypeToTypeInfo(logicalType));
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) logicalType;
+ return new TableStoreDecimalObjectInspector(
+ decimalType.getPrecision(), decimalType.getScale());
+ case CHAR:
+ CharType charType = (CharType) logicalType;
+ return new TableStoreCharObjectInspector(charType.getLength());
+ case VARCHAR:
+ VarCharType varCharType = (VarCharType) logicalType;
+ if (varCharType.getLength() == VarCharType.MAX_LENGTH) {
+ return new TableStoreStringObjectInspector();
+ } else {
+ return new TableStoreVarcharObjectInspector(varCharType.getLength());
}
- case LIST:
- ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
- return new TableStoreListObjectInspector(listTypeInfo.getListElementTypeInfo());
+ case DATE:
+ return new TableStoreDateObjectInspector();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return new TableStoreTimestampObjectInspector();
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) logicalType;
+ return new TableStoreListObjectInspector(arrayType.getElementType());
case MAP:
- MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+ MapType mapType = (MapType) logicalType;
return new TableStoreMapObjectInspector(
- mapTypeInfo.getMapKeyTypeInfo(), mapTypeInfo.getMapValueTypeInfo());
+ mapType.getKeyType(), mapType.getValueType());
default:
throw new UnsupportedOperationException(
- "Unsupported type info category " + category.name());
+ "Unsupported logical type " + logicalType.asSummaryString());
}
}
}
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
index b3690dca..ef27cd37 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
@@ -34,7 +34,6 @@ public class TableStoreHiveMetaHook implements HiveMetaHook {
@Override
public void preCreateTable(Table table) throws MetaException {
- // TODO support partitioned table after schema is recorded in table store files
Preconditions.checkArgument(
!table.isSetPartitionKeys() || table.getPartitionKeys().isEmpty(),
"Flink Table Store currently does not support creating partitioned table "
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
index 01203f6e..94788716 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
@@ -36,8 +36,6 @@ import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
@@ -47,8 +45,6 @@ import java.util.UUID;
public class TableStoreHiveStorageHandler
implements HiveStoragePredicateHandler, HiveStorageHandler {
- private static final Logger LOG = LoggerFactory.getLogger(TableStoreHiveStorageHandler.class);
-
private Configuration conf;
@Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
index 59289f50..253ed4e2 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
@@ -47,7 +47,7 @@ public class TableStoreSerDe extends AbstractSerDe {
HiveSchema schema = HiveSchema.extract(properties);
inspector =
new TableStoreRowDataObjectInspector(
- schema.fieldNames(), schema.fieldTypeInfos(), schema.fieldComments());
+ schema.fieldNames(), schema.fieldTypes(), schema.fieldComments());
}
@Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
index d0bddf94..b0486aa1 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/** {@link AbstractPrimitiveJavaObjectInspector} for CHAR type. */
public class TableStoreCharObjectInspector extends AbstractPrimitiveJavaObjectInspector
@@ -32,9 +32,9 @@ public class TableStoreCharObjectInspector extends AbstractPrimitiveJavaObjectIn
private final int len;
- public TableStoreCharObjectInspector(CharTypeInfo typeInfo) {
- super(typeInfo);
- this.len = typeInfo.getLength();
+ public TableStoreCharObjectInspector(int len) {
+ super(TypeInfoFactory.getCharTypeInfo(len));
+ this.len = len;
}
@Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
index 595ecc14..7f85731f 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
@@ -24,14 +24,14 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/** {@link AbstractPrimitiveJavaObjectInspector} for DECIMAL type. */
public class TableStoreDecimalObjectInspector extends AbstractPrimitiveJavaObjectInspector
implements HiveDecimalObjectInspector {
- public TableStoreDecimalObjectInspector(DecimalTypeInfo typeInfo) {
- super(typeInfo);
+ public TableStoreDecimalObjectInspector(int precision, int scale) {
+ super(TypeInfoFactory.getDecimalTypeInfo(precision, scale));
}
@Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
index a7c6bce4..02b0e724 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
@@ -20,11 +20,11 @@ package org.apache.flink.table.store.hive.objectinspector;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.store.hive.HiveTypeUtils;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import java.util.ArrayList;
import java.util.List;
@@ -40,11 +40,9 @@ public class TableStoreListObjectInspector implements ListObjectInspector {
private final ObjectInspector elementObjectInspector;
private final ArrayData.ElementGetter elementGetter;
- public TableStoreListObjectInspector(TypeInfo elementTypeInfo) {
- this.elementObjectInspector = HiveTypeUtils.getObjectInspector(elementTypeInfo);
- this.elementGetter =
- ArrayData.createElementGetter(
- HiveTypeUtils.typeInfoToDataType(elementTypeInfo).getLogicalType());
+ public TableStoreListObjectInspector(LogicalType elementType) {
+ this.elementObjectInspector = HiveTypeUtils.getObjectInspector(elementType);
+ this.elementGetter = ArrayData.createElementGetter(elementType);
}
@Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
index 08cf09d3..4b4d606f 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
@@ -21,11 +21,11 @@ package org.apache.flink.table.store.hive.objectinspector;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.store.hive.HiveTypeUtils;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import java.util.HashMap;
import java.util.Map;
@@ -44,15 +44,11 @@ public class TableStoreMapObjectInspector implements MapObjectInspector {
private final ArrayData.ElementGetter keyGetter;
private final ArrayData.ElementGetter valueGetter;
- public TableStoreMapObjectInspector(TypeInfo keyTypeInfo, TypeInfo valueTypeInfo) {
- this.keyObjectInspector = HiveTypeUtils.getObjectInspector(keyTypeInfo);
- this.valueObjectInspector = HiveTypeUtils.getObjectInspector(valueTypeInfo);
- this.keyGetter =
- ArrayData.createElementGetter(
- HiveTypeUtils.typeInfoToDataType(keyTypeInfo).getLogicalType());
- this.valueGetter =
- ArrayData.createElementGetter(
- HiveTypeUtils.typeInfoToDataType(valueTypeInfo).getLogicalType());
+ public TableStoreMapObjectInspector(LogicalType keyType, LogicalType valueType) {
+ this.keyObjectInspector = HiveTypeUtils.getObjectInspector(keyType);
+ this.valueObjectInspector = HiveTypeUtils.getObjectInspector(valueType);
+ this.keyGetter = ArrayData.createElementGetter(keyType);
+ this.valueGetter = ArrayData.createElementGetter(valueType);
}
@Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
index 2cbc30e3..9545ccdc 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
@@ -20,11 +20,11 @@ package org.apache.flink.table.store.hive.objectinspector;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.hive.HiveTypeUtils;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import java.util.ArrayList;
import java.util.HashMap;
@@ -40,21 +40,20 @@ public class TableStoreRowDataObjectInspector extends StructObjectInspector {
private final String typeName;
public TableStoreRowDataObjectInspector(
- List<String> fieldNames, List<TypeInfo> typeInfos, List<String> fieldComments) {
+ List<String> fieldNames, List<LogicalType> fieldTypes, List<String> fieldComments) {
this.structFields = new ArrayList<>();
this.structFieldMap = new HashMap<>();
StringBuilder typeNameBuilder = new StringBuilder("struct<");
for (int i = 0; i < fieldNames.size(); i++) {
String name = fieldNames.get(i);
- TypeInfo typeInfo = typeInfos.get(i);
+ LogicalType logicalType = fieldTypes.get(i);
TableStoreStructField structField =
new TableStoreStructField(
name,
- HiveTypeUtils.getObjectInspector(typeInfo),
+ HiveTypeUtils.getObjectInspector(logicalType),
i,
- RowData.createFieldGetter(
- HiveTypeUtils.typeInfoToDataType(typeInfo).getLogicalType(), i),
+ RowData.createFieldGetter(logicalType, i),
fieldComments.get(i));
structFields.add(structField);
structFieldMap.put(name, structField);
@@ -62,7 +61,10 @@ public class TableStoreRowDataObjectInspector extends StructObjectInspector {
if (i > 0) {
typeNameBuilder.append(",");
}
- typeNameBuilder.append(name).append(":").append(typeInfo.getTypeName());
+ typeNameBuilder
+ .append(name)
+ .append(":")
+ .append(HiveTypeUtils.logicalTypeToTypeInfo(logicalType).getTypeName());
}
typeNameBuilder.append(">");
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
index 342eff1e..78c0dd70 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/** {@link AbstractPrimitiveJavaObjectInspector} for VARCHAR type. */
public class TableStoreVarcharObjectInspector extends AbstractPrimitiveJavaObjectInspector
@@ -32,9 +32,9 @@ public class TableStoreVarcharObjectInspector extends AbstractPrimitiveJavaObjec
private final int len;
- public TableStoreVarcharObjectInspector(VarcharTypeInfo typeInfo) {
- super(typeInfo);
- this.len = typeInfo.getLength();
+ public TableStoreVarcharObjectInspector(int len) {
+ super(TypeInfoFactory.getVarcharTypeInfo(len));
+ this.len = len;
}
@Override
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 8653682f..f1bdb3ce 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -22,7 +22,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.RowDataContainer;
import org.apache.flink.table.store.SearchArgumentToPredicateConverter;
import org.apache.flink.table.store.TableStoreJobConf;
+import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.source.TableScan;
@@ -51,7 +53,7 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
FileStoreTable table = createFileStoreTable(jobConf);
TableScan scan = table.newScan();
- createPredicate(jobConf).ifPresent(scan::withFilter);
+ createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
return scan.plan().splits.stream()
.map(TableStoreInputSplit::create)
.toArray(TableStoreInputSplit[]::new);
@@ -71,23 +73,22 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
private FileStoreTable createFileStoreTable(JobConf jobConf) {
TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
Configuration conf = new Configuration();
- wrapper.updateFileStoreOptions(conf);
+ conf.set(FileStoreOptions.PATH, wrapper.getLocation());
return FileStoreTableFactory.create(conf, wrapper.getFileStoreUser());
}
- private Optional<Predicate> createPredicate(JobConf jobConf) {
+ private Optional<Predicate> createPredicate(Schema schema, JobConf jobConf) {
String hiveFilter = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (hiveFilter == null) {
return Optional.empty();
}
- TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
ExprNodeGenericFuncDesc exprNodeDesc =
SerializationUtilities.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
SearchArgument sarg = ConvertAstToSearchArg.create(jobConf, exprNodeDesc);
SearchArgumentToPredicateConverter converter =
new SearchArgumentToPredicateConverter(
- sarg, wrapper.getColumnNames(), wrapper.getColumnTypes());
+ sarg, schema.fieldNames(), schema.logicalRowType().getChildren());
return converter.convert();
}
}
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveSchemaTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveSchemaTest.java
new file mode 100644
index 00000000..7df77e85
--- /dev/null
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveSchemaTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link HiveSchema}. */
+public class HiveSchemaTest {
+
+ private static final RowType ROW_TYPE =
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField(
+ "a", DataTypes.INT().getLogicalType(), "first comment"),
+ new RowType.RowField(
+ "b", DataTypes.STRING().getLogicalType(), "second comment"),
+ new RowType.RowField(
+ "c",
+ DataTypes.DECIMAL(5, 3).getLogicalType(),
+ "last comment")));
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testExtractSchema() throws Exception {
+ createSchema();
+
+ Properties properties = new Properties();
+ properties.setProperty("columns", "a,b,c");
+ properties.setProperty(
+ "columns.types",
+ String.join(
+ ":",
+ Arrays.asList(
+ TypeInfoFactory.intTypeInfo.getTypeName(),
+ TypeInfoFactory.stringTypeInfo.getTypeName(),
+ TypeInfoFactory.getDecimalTypeInfo(5, 3).getTypeName())));
+ properties.setProperty("location", tempDir.toString());
+
+ HiveSchema schema = HiveSchema.extract(properties);
+ assertThat(schema.fieldNames()).isEqualTo(Arrays.asList("a", "b", "c"));
+ assertThat(schema.fieldTypes())
+ .isEqualTo(
+ Arrays.asList(
+ DataTypes.INT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.DECIMAL(5, 3).getLogicalType()));
+ assertThat(schema.fieldComments())
+ .isEqualTo(Arrays.asList("first comment", "second comment", "last comment"));
+ }
+
+ @Test
+ public void testExtractSchemaWithEmptyDDL() throws Exception {
+ createSchema();
+
+ Properties properties = new Properties();
+ properties.setProperty("columns", "");
+ properties.setProperty("columns.types", "");
+ properties.setProperty("location", tempDir.toString());
+
+ HiveSchema schema = HiveSchema.extract(properties);
+ assertThat(schema.fieldNames()).isEqualTo(Arrays.asList("a", "b", "c"));
+ assertThat(schema.fieldTypes())
+ .isEqualTo(
+ Arrays.asList(
+ DataTypes.INT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.DECIMAL(5, 3).getLogicalType()));
+ assertThat(schema.fieldComments())
+ .isEqualTo(Arrays.asList("first comment", "second comment", "last comment"));
+ }
+
+ @Test
+ public void testMismatchedColumnNameAndType() throws Exception {
+ createSchema();
+
+ Properties properties = new Properties();
+ properties.setProperty("columns", "a,mismatched,c");
+ properties.setProperty(
+ "columns.types",
+ String.join(
+ ":",
+ Arrays.asList(
+ TypeInfoFactory.intTypeInfo.getTypeName(),
+ TypeInfoFactory.stringTypeInfo.getTypeName(),
+ TypeInfoFactory.getDecimalTypeInfo(6, 3).getTypeName())));
+ properties.setProperty("location", tempDir.toString());
+
+ String expected =
+ String.join(
+ "\n",
+ "Hive DDL and table store schema mismatched! "
+ + "It is recommended not to write any column definition "
+ + "as Flink table store external table can read schema from the specified location.",
+ "Mismatched fields are:",
+ "Field #1",
+ "Hive DDL : mismatched string",
+ "Table Store Schema: b string",
+ "--------------------",
+ "Field #2",
+ "Hive DDL : c decimal(6,3)",
+ "Table Store Schema: c decimal(5,3)");
+ IllegalArgumentException exception =
+ assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+ assertThat(exception).hasMessageContaining(expected);
+ }
+
+ @Test
+ public void testTooFewColumns() throws Exception {
+ createSchema();
+
+ Properties properties = new Properties();
+ properties.setProperty("columns", "a");
+ properties.setProperty("columns.types", TypeInfoFactory.intTypeInfo.getTypeName());
+ properties.setProperty("location", tempDir.toString());
+
+ String expected =
+ String.join(
+ "\n",
+ "Hive DDL and table store schema mismatched! "
+ + "It is recommended not to write any column definition "
+ + "as Flink table store external table can read schema from the specified location.",
+ "Mismatched fields are:",
+ "Field #1",
+ "Hive DDL : null",
+ "Table Store Schema: b string",
+ "--------------------",
+ "Field #2",
+ "Hive DDL : null",
+ "Table Store Schema: c decimal(5,3)");
+ IllegalArgumentException exception =
+ assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+ assertThat(exception).hasMessageContaining(expected);
+ }
+
+ @Test
+ public void testTooManyColumns() throws Exception {
+ createSchema();
+
+ Properties properties = new Properties();
+ properties.setProperty("columns", "a,b,c,d,e");
+ properties.setProperty(
+ "columns.types",
+ String.join(
+ ":",
+ Arrays.asList(
+ TypeInfoFactory.intTypeInfo.getTypeName(),
+ TypeInfoFactory.stringTypeInfo.getTypeName(),
+ TypeInfoFactory.getDecimalTypeInfo(5, 3).getTypeName(),
+ TypeInfoFactory.intTypeInfo.getTypeName(),
+ TypeInfoFactory.stringTypeInfo.getTypeName())));
+ properties.setProperty("location", tempDir.toString());
+
+ String expected =
+ String.join(
+ "\n",
+ "Hive DDL and table store schema mismatched! "
+ + "It is recommended not to write any column definition "
+ + "as Flink table store external table can read schema from the specified location.",
+ "Mismatched fields are:",
+ "Field #3",
+ "Hive DDL : d int",
+ "Table Store Schema: null",
+ "--------------------",
+ "Field #4",
+ "Hive DDL : e string",
+ "Table Store Schema: null");
+ IllegalArgumentException exception =
+ assertThrows(IllegalArgumentException.class, () -> HiveSchema.extract(properties));
+ assertThat(exception).hasMessageContaining(expected);
+ }
+
+ private void createSchema() throws Exception {
+ new SchemaManager(new Path(tempDir.toString()))
+ .commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(),
+ ""));
+ }
+}
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
index a39c6670..4df95275 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
@@ -18,15 +18,15 @@
package org.apache.flink.table.store.hive;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
-
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import java.math.BigDecimal;
import java.util.Arrays;
@@ -34,30 +34,31 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/** Util class for generating random {@link GenericRowData}. */
public class RandomGenericRowDataGenerator {
- public static final List<TypeInfo> TYPE_INFOS =
+ public static final List<LogicalType> LOGICAL_TYPES =
Arrays.asList(
- TypeInfoFactory.booleanTypeInfo,
- TypeInfoFactory.byteTypeInfo,
- TypeInfoFactory.shortTypeInfo,
- TypeInfoFactory.intTypeInfo,
- TypeInfoFactory.longTypeInfo,
- TypeInfoFactory.floatTypeInfo,
- TypeInfoFactory.doubleTypeInfo,
- TypeInfoFactory.getDecimalTypeInfo(5, 3),
- TypeInfoFactory.getDecimalTypeInfo(28, 6),
- TypeInfoFactory.getCharTypeInfo(10),
- TypeInfoFactory.getVarcharTypeInfo(10),
- TypeInfoFactory.stringTypeInfo,
- TypeInfoFactory.binaryTypeInfo,
- TypeInfoFactory.dateTypeInfo,
- TypeInfoFactory.timestampTypeInfo,
- TypeInfoFactory.getListTypeInfo(TypeInfoFactory.longTypeInfo),
- TypeInfoFactory.getMapTypeInfo(
- TypeInfoFactory.stringTypeInfo, TypeInfoFactory.intTypeInfo));
+ DataTypes.BOOLEAN().getLogicalType(),
+ DataTypes.TINYINT().getLogicalType(),
+ DataTypes.SMALLINT().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType(),
+ DataTypes.FLOAT().getLogicalType(),
+ DataTypes.DOUBLE().getLogicalType(),
+ DataTypes.DECIMAL(5, 3).getLogicalType(),
+ DataTypes.DECIMAL(28, 6).getLogicalType(),
+ DataTypes.CHAR(10).getLogicalType(),
+ DataTypes.VARCHAR(10).getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.VARBINARY(Integer.MAX_VALUE).getLogicalType(),
+ DataTypes.DATE().getLogicalType(),
+ DataTypes.TIMESTAMP(3).getLogicalType(),
+ DataTypes.ARRAY(DataTypes.BIGINT()).getLogicalType(),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()).getLogicalType());
public static final List<String> TYPE_NAMES =
Arrays.asList(
@@ -119,6 +120,17 @@ public class RandomGenericRowDataGenerator {
"comment_list_long",
"comment_map_string_int");
+ public static final RowType ROW_TYPE =
+ new RowType(
+ IntStream.range(0, FIELD_NAMES.size())
+ .mapToObj(
+ i ->
+ new RowType.RowField(
+ FIELD_NAMES.get(i),
+ LOGICAL_TYPES.get(i),
+ FIELD_COMMENTS.get(i)))
+ .collect(Collectors.toList()));
+
public static GenericRowData generate() {
ThreadLocalRandom random = ThreadLocalRandom.current();
byte[] randomBytes = new byte[random.nextInt(20)];
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index d0a87ccf..5b518d50 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -113,11 +113,7 @@ public class TableStoreHiveStorageHandlerITCase {
String.join(
"\n",
Arrays.asList(
- "CREATE EXTERNAL TABLE test_table (",
- " a INT,",
- " b BIGINT,",
- " c STRING",
- ")",
+ "CREATE EXTERNAL TABLE test_table",
"STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
"LOCATION '" + path + "'")));
List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM test_table ORDER BY b");
@@ -159,11 +155,7 @@ public class TableStoreHiveStorageHandlerITCase {
String.join(
"\n",
Arrays.asList(
- "CREATE EXTERNAL TABLE test_table (",
- " a INT,",
- " b BIGINT,",
- " c STRING",
- ")",
+ "CREATE EXTERNAL TABLE test_table",
"STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
"LOCATION '" + path + "'")));
List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM test_table ORDER BY b");
@@ -181,14 +173,7 @@ public class TableStoreHiveStorageHandlerITCase {
FileStoreTable table =
FileStoreTestUtils.createFileStoreTable(
conf,
- RowType.of(
- RandomGenericRowDataGenerator.TYPE_INFOS.stream()
- .map(
- t ->
- HiveTypeUtils.typeInfoToDataType(t)
- .getLogicalType())
- .toArray(LogicalType[]::new),
- RandomGenericRowDataGenerator.FIELD_NAMES.toArray(new String[0])),
+ RandomGenericRowDataGenerator.ROW_TYPE,
Collections.emptyList(),
Collections.singletonList("f_int"));
@@ -212,33 +197,13 @@ public class TableStoreHiveStorageHandlerITCase {
table.newCommit().commit("0", write.prepareCommit());
write.close();
- StringBuilder ddl = new StringBuilder();
- for (int i = 0; i < RandomGenericRowDataGenerator.FIELD_NAMES.size(); i++) {
- if (i != 0) {
- ddl.append(",\n");
- }
- ddl.append(" ")
- .append(RandomGenericRowDataGenerator.FIELD_NAMES.get(i))
- .append(" ")
- .append(RandomGenericRowDataGenerator.TYPE_NAMES.get(i))
- .append(" COMMENT '")
- .append(RandomGenericRowDataGenerator.FIELD_COMMENTS.get(i))
- .append("'");
- }
hiveShell.execute(
String.join(
"\n",
Arrays.asList(
- "CREATE EXTERNAL TABLE test_table (",
- ddl.toString(),
- ")",
+ "CREATE EXTERNAL TABLE test_table",
"STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '" + root + "'",
- "TBLPROPERTIES (",
- " 'table-store.catalog' = 'test_catalog',",
- " 'table-store.primary-keys' = 'f_int',",
- " 'table-store.file.format' = 'avro'",
- ")")));
+ "LOCATION '" + root + "'")));
List<Object[]> actual =
hiveShell.executeStatement("SELECT * FROM test_table WHERE f_int > 0");
@@ -261,7 +226,7 @@ public class TableStoreHiveStorageHandlerITCase {
}
ObjectInspector oi =
HiveTypeUtils.getObjectInspector(
- RandomGenericRowDataGenerator.TYPE_INFOS.get(i));
+ RandomGenericRowDataGenerator.LOGICAL_TYPES.get(i));
switch (oi.getCategory()) {
case PRIMITIVE:
AbstractPrimitiveJavaObjectInspector primitiveOi =
@@ -350,15 +315,9 @@ public class TableStoreHiveStorageHandlerITCase {
String.join(
"\n",
Arrays.asList(
- "CREATE EXTERNAL TABLE test_table (",
- " a INT",
- ")",
+ "CREATE EXTERNAL TABLE test_table",
"STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '" + path + "'",
- "TBLPROPERTIES (",
- " 'table-store.catalog' = 'test_catalog',",
- " 'table-store.file.format' = 'avro'",
- ")")));
+ "LOCATION '" + path + "'")));
Assert.assertEquals(
Arrays.asList("1", "5"),
hiveShell.executeQuery("SELECT * FROM test_table WHERE a = 1 OR a = 5"));
@@ -442,16 +401,9 @@ public class TableStoreHiveStorageHandlerITCase {
String.join(
"\n",
Arrays.asList(
- "CREATE EXTERNAL TABLE test_table (",
- " dt DATE,",
- " ts TIMESTAMP",
- ")",
+ "CREATE EXTERNAL TABLE test_table",
"STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '" + path + "'",
- "TBLPROPERTIES (",
- " 'table-store.catalog' = 'test_catalog',",
- " 'table-store.file.format' = 'avro'",
- ")")));
+ "LOCATION '" + path + "'")));
Assert.assertEquals(
Collections.singletonList("1971-01-11\t2022-05-17 17:29:20.0"),
hiveShell.executeQuery("SELECT * FROM test_table WHERE dt = '1971-01-11'"));
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
index fad20dc5..44972f00 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
@@ -18,28 +18,34 @@
package org.apache.flink.table.store.hive;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.RowDataContainer;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.hive.objectinspector.TableStoreRowDataObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Properties;
-import java.util.stream.Collectors;
import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.FIELD_COMMENTS;
import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.FIELD_NAMES;
-import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.TYPE_INFOS;
+import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.ROW_TYPE;
import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.generate;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link TableStoreSerDe}. */
public class TableStoreSerDeTest {
+ @TempDir java.nio.file.Path tempDir;
+
@Test
public void testInitialize() throws Exception {
TableStoreSerDe serDe = createInitializedSerDe();
@@ -66,12 +72,17 @@ public class TableStoreSerDeTest {
}
private TableStoreSerDe createInitializedSerDe() throws Exception {
+ new SchemaManager(new Path(tempDir.toString()))
+ .commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(),
+ ""));
+
Properties properties = new Properties();
- properties.setProperty("columns", String.join(",", FIELD_NAMES));
- properties.setProperty(
- "columns.types",
- TYPE_INFOS.stream().map(TypeInfo::getTypeName).collect(Collectors.joining(":")));
- properties.setProperty("columns.comments", String.join("\0", FIELD_COMMENTS));
+ properties.setProperty("location", tempDir.toString());
TableStoreSerDe serDe = new TableStoreSerDe();
serDe.initialize(null, properties);
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
index 2a202fde..152962c7 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -34,8 +33,7 @@ public class TableStoreCharObjectInspectorTest {
@Test
public void testCategoryAndClass() {
- TableStoreCharObjectInspector oi =
- new TableStoreCharObjectInspector(TypeInfoFactory.getCharTypeInfo(10));
+ TableStoreCharObjectInspector oi = new TableStoreCharObjectInspector(10);
assertThat(oi.getCategory()).isEqualTo(ObjectInspector.Category.PRIMITIVE);
assertThat(oi.getPrimitiveCategory())
@@ -47,8 +45,7 @@ public class TableStoreCharObjectInspectorTest {
@Test
public void testGetPrimitiveJavaObject() {
- TableStoreCharObjectInspector oi =
- new TableStoreCharObjectInspector(TypeInfoFactory.getCharTypeInfo(10));
+ TableStoreCharObjectInspector oi = new TableStoreCharObjectInspector(10);
StringData input1 = StringData.fromString("testString");
HiveChar expected1 = new HiveChar("testString", 10);
@@ -61,8 +58,7 @@ public class TableStoreCharObjectInspectorTest {
@Test
public void testGetPrimitiveWritableObject() {
- TableStoreCharObjectInspector oi =
- new TableStoreCharObjectInspector(TypeInfoFactory.getCharTypeInfo(10));
+ TableStoreCharObjectInspector oi = new TableStoreCharObjectInspector(10);
StringData input1 = StringData.fromString("testString");
HiveCharWritable expected1 = new HiveCharWritable(new HiveChar("testString", 10));
@@ -75,8 +71,7 @@ public class TableStoreCharObjectInspectorTest {
@Test
public void testCopyObject() {
- TableStoreCharObjectInspector oi =
- new TableStoreCharObjectInspector(TypeInfoFactory.getCharTypeInfo(10));
+ TableStoreCharObjectInspector oi = new TableStoreCharObjectInspector(10);
StringData input1 = StringData.fromString("testString");
Object copy1 = oi.copyObject(input1);
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
index bbc9f055..575fba11 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
@@ -36,8 +35,7 @@ public class TableStoreDecimalObjectInspectorTest {
@Test
public void testCategoryAndClass() {
- TableStoreDecimalObjectInspector oi =
- new TableStoreDecimalObjectInspector(TypeInfoFactory.getDecimalTypeInfo(5, 3));
+ TableStoreDecimalObjectInspector oi = new TableStoreDecimalObjectInspector(5, 3);
assertThat(oi.getCategory()).isEqualTo(ObjectInspector.Category.PRIMITIVE);
assertThat(oi.getPrimitiveCategory())
@@ -49,8 +47,7 @@ public class TableStoreDecimalObjectInspectorTest {
@Test
public void testGetPrimitiveJavaObject() {
- TableStoreDecimalObjectInspector oi =
- new TableStoreDecimalObjectInspector(TypeInfoFactory.getDecimalTypeInfo(5, 3));
+ TableStoreDecimalObjectInspector oi = new TableStoreDecimalObjectInspector(5, 3);
DecimalData input = DecimalData.fromBigDecimal(new BigDecimal("12.345"), 5, 3);
HiveDecimal expected = HiveDecimal.create("12.345");
@@ -60,8 +57,7 @@ public class TableStoreDecimalObjectInspectorTest {
@Test
public void testGetPrimitiveWritableObject() {
- TableStoreDecimalObjectInspector oi =
- new TableStoreDecimalObjectInspector(TypeInfoFactory.getDecimalTypeInfo(5, 3));
+ TableStoreDecimalObjectInspector oi = new TableStoreDecimalObjectInspector(5, 3);
DecimalData input = DecimalData.fromBigDecimal(new BigDecimal("12.345"), 5, 3);
HiveDecimalWritable expected = new HiveDecimalWritable(HiveDecimal.create("12.345"));
@@ -71,8 +67,7 @@ public class TableStoreDecimalObjectInspectorTest {
@Test
public void testCopyObject() {
- TableStoreDecimalObjectInspector oi =
- new TableStoreDecimalObjectInspector(TypeInfoFactory.getDecimalTypeInfo(5, 3));
+ TableStoreDecimalObjectInspector oi = new TableStoreDecimalObjectInspector(5, 3);
DecimalData input1 = DecimalData.fromBigDecimal(new BigDecimal("12.345"), 5, 3);
Object copy1 = oi.copyObject(input1);
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
index 8d1f4372..e1d5593d 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
@@ -18,11 +18,11 @@
package org.apache.flink.table.store.hive.objectinspector;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.StringData;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -35,7 +35,7 @@ public class TableStoreListObjectInspectorTest {
@Test
public void testCategoryAndTypeName() {
TableStoreListObjectInspector oi =
- new TableStoreListObjectInspector(TypeInfoFactory.stringTypeInfo);
+ new TableStoreListObjectInspector(DataTypes.STRING().getLogicalType());
assertThat(oi.getCategory()).isEqualTo(ObjectInspector.Category.LIST);
assertThat(oi.getTypeName()).isEqualTo("array<string>");
@@ -44,7 +44,7 @@ public class TableStoreListObjectInspectorTest {
@Test
public void testGetListAndElement() {
TableStoreListObjectInspector oi =
- new TableStoreListObjectInspector(TypeInfoFactory.stringTypeInfo);
+ new TableStoreListObjectInspector(DataTypes.STRING().getLogicalType());
StringData[] stringDataArray =
new StringData[] {
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
index cad419ae..6b1da65f 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
@@ -18,11 +18,11 @@
package org.apache.flink.table.store.hive.objectinspector;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
@@ -37,7 +37,7 @@ public class TableStoreMapObjectInspectorTest {
public void testCategoryAndTypeName() {
TableStoreMapObjectInspector oi =
new TableStoreMapObjectInspector(
- TypeInfoFactory.stringTypeInfo, TypeInfoFactory.longTypeInfo);
+ DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType());
assertThat(oi.getCategory()).isEqualTo(ObjectInspector.Category.MAP);
assertThat(oi.getTypeName()).isEqualTo("map<string,bigint>");
@@ -47,7 +47,7 @@ public class TableStoreMapObjectInspectorTest {
public void testGetMapAndValue() {
TableStoreMapObjectInspector oi =
new TableStoreMapObjectInspector(
- TypeInfoFactory.stringTypeInfo, TypeInfoFactory.longTypeInfo);
+ DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType());
StringData[] keyArray =
new StringData[] {
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
index 37d3df10..a0979a98 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
@@ -29,7 +29,7 @@ import java.util.List;
import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.FIELD_COMMENTS;
import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.FIELD_NAMES;
-import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.TYPE_INFOS;
+import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.LOGICAL_TYPES;
import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.TYPE_NAMES;
import static org.apache.flink.table.store.hive.RandomGenericRowDataGenerator.generate;
import static org.assertj.core.api.Assertions.assertThat;
@@ -40,7 +40,7 @@ public class TableStoreRowDataObjectInspectorTest {
@Test
public void testGetStructFieldRef() {
TableStoreRowDataObjectInspector oi =
- new TableStoreRowDataObjectInspector(FIELD_NAMES, TYPE_INFOS, FIELD_COMMENTS);
+ new TableStoreRowDataObjectInspector(FIELD_NAMES, LOGICAL_TYPES, FIELD_COMMENTS);
List<? extends StructField> structFields = oi.getAllStructFieldRefs();
List<ObjectInspector.Category> expectedOiCategories =
Arrays.asList(
@@ -77,7 +77,7 @@ public class TableStoreRowDataObjectInspectorTest {
@Test
public void testGetTypeName() {
TableStoreRowDataObjectInspector oi =
- new TableStoreRowDataObjectInspector(FIELD_NAMES, TYPE_INFOS, FIELD_COMMENTS);
+ new TableStoreRowDataObjectInspector(FIELD_NAMES, LOGICAL_TYPES, FIELD_COMMENTS);
String expected =
"struct<"
+ String.join(
@@ -107,7 +107,7 @@ public class TableStoreRowDataObjectInspectorTest {
@Test
public void testGetStructFieldData() {
TableStoreRowDataObjectInspector oi =
- new TableStoreRowDataObjectInspector(FIELD_NAMES, TYPE_INFOS, FIELD_COMMENTS);
+ new TableStoreRowDataObjectInspector(FIELD_NAMES, LOGICAL_TYPES, FIELD_COMMENTS);
GenericRowData rowData = generate();
List<Object> structFieldsData = oi.getStructFieldsDataAsList(rowData);
for (int i = 0; i < structFieldsData.size(); i++) {
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
index d9ce7dea..7c022a2e 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
+++ b/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -34,8 +33,7 @@ public class TableStoreVarcharObjectInspectorTest {
@Test
public void testCategoryAndClass() {
- TableStoreVarcharObjectInspector oi =
- new TableStoreVarcharObjectInspector(TypeInfoFactory.getVarcharTypeInfo(10));
+ TableStoreVarcharObjectInspector oi = new TableStoreVarcharObjectInspector(10);
assertThat(oi.getCategory()).isEqualTo(ObjectInspector.Category.PRIMITIVE);
assertThat(oi.getPrimitiveCategory())
@@ -47,8 +45,7 @@ public class TableStoreVarcharObjectInspectorTest {
@Test
public void testGetPrimitiveJavaObject() {
- TableStoreVarcharObjectInspector oi =
- new TableStoreVarcharObjectInspector(TypeInfoFactory.getVarcharTypeInfo(10));
+ TableStoreVarcharObjectInspector oi = new TableStoreVarcharObjectInspector(10);
StringData input1 = StringData.fromString("testString");
HiveVarchar expected1 = new HiveVarchar("testString", 10);
@@ -61,8 +58,7 @@ public class TableStoreVarcharObjectInspectorTest {
@Test
public void testGetPrimitiveWritableObject() {
- TableStoreVarcharObjectInspector oi =
- new TableStoreVarcharObjectInspector(TypeInfoFactory.getVarcharTypeInfo(10));
+ TableStoreVarcharObjectInspector oi = new TableStoreVarcharObjectInspector(10);
StringData input1 = StringData.fromString("testString");
HiveVarcharWritable expected1 = new HiveVarcharWritable(new HiveVarchar("testString", 10));
@@ -75,8 +71,7 @@ public class TableStoreVarcharObjectInspectorTest {
@Test
public void testCopyObject() {
- TableStoreVarcharObjectInspector oi =
- new TableStoreVarcharObjectInspector(TypeInfoFactory.getVarcharTypeInfo(10));
+ TableStoreVarcharObjectInspector oi = new TableStoreVarcharObjectInspector(10);
StringData input1 = StringData.fromString("testString");
Object copy1 = oi.copyObject(input1);