You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/04 05:32:37 UTC
[flink] branch master updated: [FLINK-13385]Align Hive data type
mapping with FLIP-37
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new aa14e7e [FLINK-13385]Align Hive data type mapping with FLIP-37
aa14e7e is described below
commit aa14e7e3f33cbebdfaeb7e00f7280255cd5fb1f9
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Fri Jul 26 20:30:36 2019 +0800
[FLINK-13385]Align Hive data type mapping with FLIP-37
Align Hive data type mapping with FLIP-37.
This closes #9239.
---
docs/dev/table/catalog.md | 44 ++--
.../table/catalog/hive/util/HiveTypeUtil.java | 268 +++++++++++++--------
.../catalog/hive/HiveCatalogDataTypeTest.java | 25 +-
.../catalog/hive/HiveCatalogHiveMetadataTest.java | 2 +-
4 files changed, 203 insertions(+), 136 deletions(-)
diff --git a/docs/dev/table/catalog.md b/docs/dev/table/catalog.md
index 0a4dd3d..acccd83 100644
--- a/docs/dev/table/catalog.md
+++ b/docs/dev/table/catalog.md
@@ -157,27 +157,28 @@ Currently `HiveCatalog` supports most Flink data types with the following mappin
| Flink Data Type | Hive Data Type |
|---|---|
-| CHAR(p) | char(p)* |
-| VARCHAR(p) | varchar(p)** |
-| STRING | string |
-| BOOLEAN | boolean |
-| BYTE | tinyint |
-| SHORT | smallint |
-| INT | int |
-| BIGINT | long |
-| FLOAT | float |
-| DOUBLE | double |
-| DECIMAL(p, s) | decimal(p, s) |
-| DATE | date |
-| TIMESTAMP_WITHOUT_TIME_ZONE | Timestamp |
+| CHAR(p) | CHAR(p)* |
+| VARCHAR(p) | VARCHAR(p)** |
+| STRING | STRING |
+| BOOLEAN | BOOLEAN |
+| TINYINT | TINYINT |
+| SMALLINT | SMALLINT |
+| INT | INT |
+| BIGINT | LONG |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| DECIMAL(p, s) | DECIMAL(p, s) |
+| DATE | DATE |
+| TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP |
| TIMESTAMP_WITH_TIME_ZONE | N/A |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE | N/A |
-| INTERVAL | N/A |
-| BINARY | binary |
-| VARBINARY(p) | binary |
-| ARRAY\<E> | list\<E> |
-| MAP<K, V> | map<K, V> |
-| ROW | struct |
+| INTERVAL | N/A*** |
+| BINARY | N/A |
+| VARBINARY(p) | N/A |
+| BYTES | BINARY |
+| ARRAY\<E> | ARRAY\<E> |
+| MAP<K, V> | MAP<K, V> ****|
+| ROW | STRUCT |
| MULTISET | N/A |
@@ -189,11 +190,14 @@ The following limitations in Hive's data types impact the mapping between Flink
\** maximum length is 65535
+\*** `INTERVAL` type can not be mapped to hive `INTERVAL` for now.
+
+\**** Hive map key type only allows primitive types, while Flink map key can be any data type.
+
## User-configured Catalog
Catalogs are pluggable. Users can develop custom catalogs by implementing the `Catalog` interface, which defines a set of APIs for reading and writing catalog meta-objects such as database, tables, partitions, views, and functions.
-
Catalog Registration
--------------------
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
index b9f8a57..68c3ede 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
@@ -21,16 +21,25 @@ package org.apache.flink.table.catalog.hive.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.types.AtomicDataType;
-import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveVarchar;
@@ -48,7 +57,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -81,107 +89,8 @@ public class HiveTypeUtil {
*/
public static TypeInfo toHiveTypeInfo(DataType dataType) {
checkNotNull(dataType, "type cannot be null");
-
- LogicalTypeRoot type = dataType.getLogicalType().getTypeRoot();
-
- if (dataType instanceof AtomicDataType) {
- if (type.equals(LogicalTypeRoot.BOOLEAN)) {
- return TypeInfoFactory.booleanTypeInfo;
- } else if (type.equals(LogicalTypeRoot.TINYINT)) {
- return TypeInfoFactory.byteTypeInfo;
- } else if (type.equals(LogicalTypeRoot.SMALLINT)) {
- return TypeInfoFactory.shortTypeInfo;
- } else if (type.equals(LogicalTypeRoot.INTEGER)) {
- return TypeInfoFactory.intTypeInfo;
- } else if (type.equals(LogicalTypeRoot.BIGINT)) {
- return TypeInfoFactory.longTypeInfo;
- } else if (type.equals(LogicalTypeRoot.FLOAT)) {
- return TypeInfoFactory.floatTypeInfo;
- } else if (type.equals(LogicalTypeRoot.DOUBLE)) {
- return TypeInfoFactory.doubleTypeInfo;
- } else if (type.equals(LogicalTypeRoot.DATE)) {
- return TypeInfoFactory.dateTypeInfo;
- } else if (type.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
- return TypeInfoFactory.timestampTypeInfo;
- } else if (type.equals(LogicalTypeRoot.BINARY) || type.equals(LogicalTypeRoot.VARBINARY)) {
- // Hive doesn't support variable-length binary string
- return TypeInfoFactory.binaryTypeInfo;
- } else if (type.equals(LogicalTypeRoot.CHAR)) {
- CharType charType = (CharType) dataType.getLogicalType();
-
- if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH) {
- throw new CatalogException(
- String.format("HiveCatalog doesn't support char type with length of '%d'. " +
- "The maximum length is %d",
- charType.getLength(), HiveChar.MAX_CHAR_LENGTH));
- }
-
- return TypeInfoFactory.getCharTypeInfo(charType.getLength());
- } else if (type.equals(LogicalTypeRoot.VARCHAR)) {
- VarCharType varCharType = (VarCharType) dataType.getLogicalType();
-
- // Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE)
- // We don't have more information in LogicalTypeRoot to distringuish StringType and a VARCHAR(Integer.MAX_VALUE) instance
- // Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType
- if (varCharType.getLength() == Integer.MAX_VALUE) {
- return TypeInfoFactory.stringTypeInfo;
- }
-
- if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH) {
- throw new CatalogException(
- String.format("HiveCatalog doesn't support varchar type with length of '%d'. " +
- "The maximum length is %d",
- varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
- }
-
- return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
- } else if (type.equals(LogicalTypeRoot.DECIMAL)) {
- DecimalType decimalType = (DecimalType) dataType.getLogicalType();
-
- // Flink and Hive share the same precision and scale range
- // Flink already validates the type so we don't need to validate again here
- return TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale());
- }
-
- // Flink's primitive types that Hive 2.3.4 doesn't support: Time, TIMESTAMP_WITH_LOCAL_TIME_ZONE
- }
-
- if (dataType instanceof CollectionDataType) {
-
- if (type.equals(LogicalTypeRoot.ARRAY)) {
- DataType elementType = ((CollectionDataType) dataType).getElementDataType();
-
- return TypeInfoFactory.getListTypeInfo(toHiveTypeInfo(elementType));
- }
-
- // Flink's collection types that Hive 2.3.4 doesn't support: multiset
- }
-
- if (dataType instanceof KeyValueDataType) {
- KeyValueDataType keyValueDataType = (KeyValueDataType) dataType;
- DataType keyType = keyValueDataType.getKeyDataType();
- DataType valueType = keyValueDataType.getValueDataType();
-
- return TypeInfoFactory.getMapTypeInfo(toHiveTypeInfo(keyType), toHiveTypeInfo(valueType));
- }
-
- if (dataType instanceof FieldsDataType) {
- FieldsDataType fieldsDataType = (FieldsDataType) dataType;
- // need to retrieve field names in order
- List<String> names = ((RowType) fieldsDataType.getLogicalType()).getFieldNames();
-
- Map<String, DataType> nameToType = fieldsDataType.getFieldDataTypes();
- List<TypeInfo> typeInfos = new ArrayList<>(names.size());
-
- for (String name : names) {
- typeInfos.add(toHiveTypeInfo(nameToType.get(name)));
- }
-
- return TypeInfoFactory.getStructTypeInfo(names, typeInfos);
- }
-
- throw new UnsupportedOperationException(
- String.format("Flink doesn't support converting type %s to Hive type yet.", dataType.toString()));
+ LogicalType logicalType = dataType.getLogicalType();
+ return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType));
}
/**
@@ -269,4 +178,149 @@ public class HiveTypeUtil {
String.format("Flink doesn't support Hive primitive type %s yet", hiveType));
}
}
+
+ private static class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor<TypeInfo> {
+ private final DataType dataType;
+
+ public TypeInfoLogicalTypeVisitor(DataType dataType) {
+ this.dataType = dataType;
+ }
+
+ @Override
+ public TypeInfo visit(CharType charType) {
+ if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH) {
+ throw new CatalogException(
+ String.format("HiveCatalog doesn't support char type with length of '%d'. " +
+ "The maximum length is %d",
+ charType.getLength(), HiveChar.MAX_CHAR_LENGTH));
+ }
+ return TypeInfoFactory.getCharTypeInfo(charType.getLength());
+ }
+
+ @Override
+ public TypeInfo visit(VarCharType varCharType) {
+ // Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE)
+ // We don't have more information in LogicalTypeRoot to distinguish StringType and a VARCHAR(Integer.MAX_VALUE) instance
+ // Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType
+ if (varCharType.getLength() == Integer.MAX_VALUE) {
+ return TypeInfoFactory.stringTypeInfo;
+ }
+ if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH) {
+ throw new CatalogException(
+ String.format("HiveCatalog doesn't support varchar type with length of '%d'. " +
+ "The maximum length is %d",
+ varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
+ }
+ return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
+ }
+
+ @Override
+ public TypeInfo visit(BooleanType booleanType) {
+ return TypeInfoFactory.booleanTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(VarBinaryType varBinaryType) {
+ // Flink's BytesType is defined as VARBINARY(Integer.MAX_VALUE)
+ // We don't have more information in LogicalTypeRoot to distinguish BytesType and a VARBINARY(Integer.MAX_VALUE) instance
+ // Thus always treat VARBINARY(Integer.MAX_VALUE) as BytesType
+ if (varBinaryType.getLength() == VarBinaryType.MAX_LENGTH) {
+ return TypeInfoFactory.binaryTypeInfo;
+ }
+ return defaultMethod(varBinaryType);
+ }
+
+ @Override
+ public TypeInfo visit(DecimalType decimalType) {
+ // Flink and Hive share the same precision and scale range
+ // Flink already validates the type so we don't need to validate again here
+ return TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale());
+ }
+
+ @Override
+ public TypeInfo visit(TinyIntType tinyIntType) {
+ return TypeInfoFactory.byteTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(SmallIntType smallIntType) {
+ return TypeInfoFactory.shortTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(IntType intType) {
+ return TypeInfoFactory.intTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(BigIntType bigIntType) {
+ return TypeInfoFactory.longTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(FloatType floatType) {
+ return TypeInfoFactory.floatTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(DoubleType doubleType) {
+ return TypeInfoFactory.doubleTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(DateType dateType) {
+ return TypeInfoFactory.dateTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(TimestampType timestampType) {
+ return TypeInfoFactory.timestampTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(ArrayType arrayType) {
+ LogicalType elementType = arrayType.getElementType();
+ TypeInfo elementTypeInfo = elementType.accept(new TypeInfoLogicalTypeVisitor(dataType));
+ if (null != elementTypeInfo) {
+ return TypeInfoFactory.getListTypeInfo(elementTypeInfo);
+ } else {
+ return defaultMethod(arrayType);
+ }
+ }
+
+ @Override
+ public TypeInfo visit(MapType mapType) {
+ LogicalType keyType = mapType.getKeyType();
+ LogicalType valueType = mapType.getValueType();
+ TypeInfo keyTypeInfo = keyType.accept(new TypeInfoLogicalTypeVisitor(dataType));
+ TypeInfo valueTypeInfo = valueType.accept(new TypeInfoLogicalTypeVisitor(dataType));
+ if (null == keyTypeInfo || null == valueTypeInfo) {
+ return defaultMethod(mapType);
+ } else {
+ return TypeInfoFactory.getMapTypeInfo(keyTypeInfo, valueTypeInfo);
+ }
+ }
+
+ @Override
+ public TypeInfo visit(RowType rowType) {
+ List<String> names = rowType.getFieldNames();
+ List<TypeInfo> typeInfos = new ArrayList<>(names.size());
+ for (String name : names) {
+ TypeInfo typeInfo =
+ rowType.getTypeAt(rowType.getFieldIndex(name)).accept(new TypeInfoLogicalTypeVisitor(dataType));
+ if (null != typeInfo) {
+ typeInfos.add(typeInfo);
+ } else {
+ return defaultMethod(rowType);
+ }
+ }
+ return TypeInfoFactory.getStructTypeInfo(names, typeInfos);
+ }
+
+ @Override
+ protected TypeInfo defaultMethod(LogicalType logicalType) {
+ throw new UnsupportedOperationException(
+ String.format("Flink doesn't support converting type %s to Hive type yet.", dataType.toString()));
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
index e9d40fe..b14f1fb 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BinaryType;
-import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveVarchar;
@@ -40,7 +39,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import java.util.Arrays;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
@@ -122,20 +120,31 @@ public class HiveCatalogDataTypeTest {
}
@Test
- public void testNonExactlyMatchedDataTypes() throws Exception {
+ public void testNonSupportedBinaryDataTypes() throws Exception {
DataType[] types = new DataType[] {
- DataTypes.BINARY(BinaryType.MAX_LENGTH),
- DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH)
+ DataTypes.BINARY(BinaryType.MAX_LENGTH)
};
CatalogTable table = createCatalogTable(types);
catalog.createDatabase(db1, createDb(), false);
+
+ exception.expect(UnsupportedOperationException.class);
catalog.createTable(path1, table, false);
+ }
+
+ @Test
+ public void testNonSupportedVarBinaryDataTypes() throws Exception {
+ DataType[] types = new DataType[] {
+ DataTypes.VARBINARY(20)
+ };
- Arrays.equals(
- new DataType[] {DataTypes.BYTES(), DataTypes.BYTES()},
- catalog.getTable(path1).getSchema().getFieldDataTypes());
+ CatalogTable table = createCatalogTable(types);
+
+ catalog.createDatabase(db1, createDb(), false);
+
+ exception.expect(UnsupportedOperationException.class);
+ catalog.createTable(path1, table, false);
}
@Test
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 95e09b4..0fe10d7 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -87,7 +87,7 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogTestBase {
.field("fourth", DataTypes.DATE())
.field("fifth", DataTypes.DOUBLE())
.field("sixth", DataTypes.BIGINT())
- .field("seventh", DataTypes.VARBINARY(200))
+ .field("seventh", DataTypes.BYTES())
.build();
CatalogTable catalogTable = new CatalogTableImpl(tableSchema, getBatchTableProperties(), TEST_COMMENT);
catalog.createTable(path1, catalogTable, false);