You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/04 05:32:37 UTC

[flink] branch master updated: [FLINK-13385]Align Hive data type mapping with FLIP-37

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new aa14e7e  [FLINK-13385]Align Hive data type mapping with FLIP-37
aa14e7e is described below

commit aa14e7e3f33cbebdfaeb7e00f7280255cd5fb1f9
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Fri Jul 26 20:30:36 2019 +0800

    [FLINK-13385]Align Hive data type mapping with FLIP-37
    
    Align Hive data type mapping with FLIP-37.
    
    This closes #9239.
---
 docs/dev/table/catalog.md                          |  44 ++--
 .../table/catalog/hive/util/HiveTypeUtil.java      | 268 +++++++++++++--------
 .../catalog/hive/HiveCatalogDataTypeTest.java      |  25 +-
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  |   2 +-
 4 files changed, 203 insertions(+), 136 deletions(-)

diff --git a/docs/dev/table/catalog.md b/docs/dev/table/catalog.md
index 0a4dd3d..acccd83 100644
--- a/docs/dev/table/catalog.md
+++ b/docs/dev/table/catalog.md
@@ -157,27 +157,28 @@ Currently `HiveCatalog` supports most Flink data types with the following mappin
 
 |  Flink Data Type  |  Hive Data Type  |
 |---|---|
-| CHAR(p)       |  char(p)* |
-| VARCHAR(p)    |  varchar(p)** |
-| STRING        |  string |
-| BOOLEAN       |  boolean |
-| BYTE          |  tinyint |
-| SHORT         |  smallint |
-| INT           |  int |
-| BIGINT        |  long |
-| FLOAT         |  float |
-| DOUBLE        |  double |
-| DECIMAL(p, s) |  decimal(p, s) |
-| DATE          |  date |
-| TIMESTAMP_WITHOUT_TIME_ZONE |  Timestamp |
+| CHAR(p)       |  CHAR(p)* |
+| VARCHAR(p)    |  VARCHAR(p)** |
+| STRING        |  STRING |
+| BOOLEAN       |  BOOLEAN |
+| TINYINT       |  TINYINT |
+| SMALLINT      |  SMALLINT |
+| INT           |  INT |
+| BIGINT        |  LONG |
+| FLOAT         |  FLOAT |
+| DOUBLE        |  DOUBLE |
+| DECIMAL(p, s) |  DECIMAL(p, s) |
+| DATE          |  DATE |
+| TIMESTAMP_WITHOUT_TIME_ZONE |  TIMESTAMP |
 | TIMESTAMP_WITH_TIME_ZONE |  N/A |
 | TIMESTAMP_WITH_LOCAL_TIME_ZONE |  N/A |
-| INTERVAL |  N/A |
-| BINARY        |  binary |
-| VARBINARY(p)  |  binary |
-| ARRAY\<E>     |  list\<E> |
-| MAP<K, V>     |  map<K, V> |
-| ROW           |  struct |
+| INTERVAL      |   N/A*** |
+| BINARY        |   N/A |
+| VARBINARY(p)  |   N/A |
+| BYTES         |   BINARY |
+| ARRAY\<E>     |  ARRAY\<E> |
+| MAP<K, V>     |  MAP<K, V> ****|
+| ROW           |  STRUCT |
 | MULTISET      |  N/A |
 
 
@@ -189,11 +190,14 @@ The following limitations in Hive's data types impact the mapping between Flink
 
 \** maximum length is 65535
 
+\*** `INTERVAL` type can not be mapped to hive `INTERVAL` for now.
+
+\**** Hive map key type only allows primitive types, while Flink map key can be any data type.
+
 ## User-configured Catalog
 
 Catalogs are pluggable. Users can develop custom catalogs by implementing the `Catalog` interface, which defines a set of APIs for reading and writing catalog meta-objects such as database, tables, partitions, views, and functions.
 
-
 Catalog Registration
 --------------------
 
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
index b9f8a57..68c3ede 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
@@ -21,16 +21,25 @@ package org.apache.flink.table.catalog.hive.util;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.types.AtomicDataType;
-import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
 import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
@@ -48,7 +57,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -81,107 +89,8 @@ public class HiveTypeUtil {
 	 */
 	public static TypeInfo toHiveTypeInfo(DataType dataType) {
 		checkNotNull(dataType, "type cannot be null");
-
-		LogicalTypeRoot type = dataType.getLogicalType().getTypeRoot();
-
-		if (dataType instanceof AtomicDataType) {
-			if (type.equals(LogicalTypeRoot.BOOLEAN)) {
-				return TypeInfoFactory.booleanTypeInfo;
-			} else if (type.equals(LogicalTypeRoot.TINYINT)) {
-				return TypeInfoFactory.byteTypeInfo;
-			} else if (type.equals(LogicalTypeRoot.SMALLINT)) {
-				return TypeInfoFactory.shortTypeInfo;
-			} else if (type.equals(LogicalTypeRoot.INTEGER)) {
-				return TypeInfoFactory.intTypeInfo;
-			} else if (type.equals(LogicalTypeRoot.BIGINT)) {
-				return TypeInfoFactory.longTypeInfo;
-			} else if (type.equals(LogicalTypeRoot.FLOAT)) {
-				return TypeInfoFactory.floatTypeInfo;
-			} else if (type.equals(LogicalTypeRoot.DOUBLE)) {
-				return TypeInfoFactory.doubleTypeInfo;
-			} else if (type.equals(LogicalTypeRoot.DATE)) {
-				return TypeInfoFactory.dateTypeInfo;
-			} else if (type.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
-				return TypeInfoFactory.timestampTypeInfo;
-			} else if (type.equals(LogicalTypeRoot.BINARY) || type.equals(LogicalTypeRoot.VARBINARY)) {
-				// Hive doesn't support variable-length binary string
-				return TypeInfoFactory.binaryTypeInfo;
-			} else if (type.equals(LogicalTypeRoot.CHAR)) {
-				CharType charType = (CharType) dataType.getLogicalType();
-
-				if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH) {
-					throw new CatalogException(
-						String.format("HiveCatalog doesn't support char type with length of '%d'. " +
-								"The maximum length is %d",
-							charType.getLength(), HiveChar.MAX_CHAR_LENGTH));
-				}
-
-				return TypeInfoFactory.getCharTypeInfo(charType.getLength());
-			} else if (type.equals(LogicalTypeRoot.VARCHAR)) {
-				VarCharType varCharType = (VarCharType) dataType.getLogicalType();
-
-				// Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE)
-				// We don't have more information in LogicalTypeRoot to distringuish StringType and a VARCHAR(Integer.MAX_VALUE) instance
-				// Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType
-				if (varCharType.getLength() == Integer.MAX_VALUE) {
-					return TypeInfoFactory.stringTypeInfo;
-				}
-
-				if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH) {
-					throw new CatalogException(
-						String.format("HiveCatalog doesn't support varchar type with length of '%d'. " +
-								"The maximum length is %d",
-							varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
-				}
-
-				return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
-			} else if (type.equals(LogicalTypeRoot.DECIMAL)) {
-				DecimalType decimalType = (DecimalType) dataType.getLogicalType();
-
-				// Flink and Hive share the same precision and scale range
-				// Flink already validates the type so we don't need to validate again here
-				return TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale());
-			}
-
-			// Flink's primitive types that Hive 2.3.4 doesn't support: Time, TIMESTAMP_WITH_LOCAL_TIME_ZONE
-		}
-
-		if (dataType instanceof CollectionDataType) {
-
-			if (type.equals(LogicalTypeRoot.ARRAY)) {
-				DataType elementType = ((CollectionDataType) dataType).getElementDataType();
-
-				return TypeInfoFactory.getListTypeInfo(toHiveTypeInfo(elementType));
-			}
-
-			// Flink's collection types that Hive 2.3.4 doesn't support: multiset
-		}
-
-		if (dataType instanceof KeyValueDataType) {
-			KeyValueDataType keyValueDataType = (KeyValueDataType) dataType;
-			DataType keyType = keyValueDataType.getKeyDataType();
-			DataType valueType = keyValueDataType.getValueDataType();
-
-			return TypeInfoFactory.getMapTypeInfo(toHiveTypeInfo(keyType), toHiveTypeInfo(valueType));
-		}
-
-		if (dataType instanceof FieldsDataType) {
-			FieldsDataType fieldsDataType = (FieldsDataType) dataType;
-			// need to retrieve field names in order
-			List<String> names = ((RowType) fieldsDataType.getLogicalType()).getFieldNames();
-
-			Map<String, DataType> nameToType = fieldsDataType.getFieldDataTypes();
-			List<TypeInfo> typeInfos = new ArrayList<>(names.size());
-
-			for (String name : names) {
-				typeInfos.add(toHiveTypeInfo(nameToType.get(name)));
-			}
-
-			return TypeInfoFactory.getStructTypeInfo(names, typeInfos);
-		}
-
-		throw new UnsupportedOperationException(
-			String.format("Flink doesn't support converting type %s to Hive type yet.", dataType.toString()));
+		LogicalType logicalType = dataType.getLogicalType();
+		return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType));
 	}
 
 	/**
@@ -269,4 +178,149 @@ public class HiveTypeUtil {
 					String.format("Flink doesn't support Hive primitive type %s yet", hiveType));
 		}
 	}
+
+	private static class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor<TypeInfo> {
+		private final DataType dataType;
+
+		public TypeInfoLogicalTypeVisitor(DataType dataType) {
+			this.dataType = dataType;
+		}
+
+		@Override
+		public TypeInfo visit(CharType charType) {
+			if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH) {
+				throw new CatalogException(
+						String.format("HiveCatalog doesn't support char type with length of '%d'. " +
+									"The maximum length is %d",
+									charType.getLength(), HiveChar.MAX_CHAR_LENGTH));
+			}
+			return TypeInfoFactory.getCharTypeInfo(charType.getLength());
+		}
+
+		@Override
+		public TypeInfo visit(VarCharType varCharType) {
+			// Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE)
+			// We don't have more information in LogicalTypeRoot to distinguish StringType and a VARCHAR(Integer.MAX_VALUE) instance
+			// Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType
+			if (varCharType.getLength() == Integer.MAX_VALUE) {
+				return TypeInfoFactory.stringTypeInfo;
+			}
+			if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH) {
+				throw new CatalogException(
+						String.format("HiveCatalog doesn't support varchar type with length of '%d'. " +
+									"The maximum length is %d",
+									varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
+			}
+			return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
+		}
+
+		@Override
+		public TypeInfo visit(BooleanType booleanType) {
+			return TypeInfoFactory.booleanTypeInfo;
+		}
+
+		@Override
+		public TypeInfo visit(VarBinaryType varBinaryType) {
+			// Flink's BytesType is defined as VARBINARY(Integer.MAX_VALUE)
+			// We don't have more information in LogicalTypeRoot to distinguish BytesType and a VARBINARY(Integer.MAX_VALUE) instance
+			// Thus always treat VARBINARY(Integer.MAX_VALUE) as BytesType
+			if (varBinaryType.getLength() == VarBinaryType.MAX_LENGTH) {
+				return TypeInfoFactory.binaryTypeInfo;
+			}
+			return defaultMethod(varBinaryType);
+		}
+
+		@Override
+		public TypeInfo visit(DecimalType decimalType) {
+			// Flink and Hive share the same precision and scale range
+			// Flink already validates the type so we don't need to validate again here
+			return TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale());
+		}
+
+		@Override
+		public TypeInfo visit(TinyIntType tinyIntType) {
+			return TypeInfoFactory.byteTypeInfo;
+		}
+
+		@Override
+		public TypeInfo visit(SmallIntType smallIntType) {
+			return TypeInfoFactory.shortTypeInfo;
+		}
+
+		@Override
+		public TypeInfo visit(IntType intType) {
+			return TypeInfoFactory.intTypeInfo;
+		}
+
+		@Override
+		public TypeInfo visit(BigIntType bigIntType) {
+			return TypeInfoFactory.longTypeInfo;
+		}
+
+		@Override
+		public TypeInfo visit(FloatType floatType) {
+			return TypeInfoFactory.floatTypeInfo;
+		}
+
+		@Override
+		public TypeInfo visit(DoubleType doubleType) {
+			return TypeInfoFactory.doubleTypeInfo;
+		}
+
+		@Override
+		public TypeInfo visit(DateType dateType) {
+			return TypeInfoFactory.dateTypeInfo;
+		}
+
+		@Override
+		public TypeInfo visit(TimestampType timestampType) {
+			return TypeInfoFactory.timestampTypeInfo;
+		}
+
+		@Override
+		public TypeInfo visit(ArrayType arrayType) {
+			LogicalType elementType = arrayType.getElementType();
+			TypeInfo elementTypeInfo = elementType.accept(new TypeInfoLogicalTypeVisitor(dataType));
+			if (null != elementTypeInfo) {
+				return TypeInfoFactory.getListTypeInfo(elementTypeInfo);
+			} else {
+				return defaultMethod(arrayType);
+			}
+		}
+
+		@Override
+		public TypeInfo visit(MapType mapType) {
+			LogicalType keyType  = mapType.getKeyType();
+			LogicalType valueType = mapType.getValueType();
+			TypeInfo keyTypeInfo = keyType.accept(new TypeInfoLogicalTypeVisitor(dataType));
+			TypeInfo valueTypeInfo = valueType.accept(new TypeInfoLogicalTypeVisitor(dataType));
+			if (null == keyTypeInfo || null == valueTypeInfo) {
+				return defaultMethod(mapType);
+			} else {
+				return TypeInfoFactory.getMapTypeInfo(keyTypeInfo, valueTypeInfo);
+			}
+		}
+
+		@Override
+		public TypeInfo visit(RowType rowType) {
+			List<String> names = rowType.getFieldNames();
+			List<TypeInfo> typeInfos = new ArrayList<>(names.size());
+			for (String name : names) {
+				TypeInfo typeInfo =
+						rowType.getTypeAt(rowType.getFieldIndex(name)).accept(new TypeInfoLogicalTypeVisitor(dataType));
+				if (null != typeInfo) {
+					typeInfos.add(typeInfo);
+				} else {
+					return defaultMethod(rowType);
+				}
+			}
+			return TypeInfoFactory.getStructTypeInfo(names, typeInfos);
+		}
+
+		@Override
+		protected TypeInfo defaultMethod(LogicalType logicalType) {
+			throw new UnsupportedOperationException(
+					String.format("Flink doesn't support converting type %s to Hive type yet.", dataType.toString()));
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
index e9d40fe..b14f1fb 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BinaryType;
-import org.apache.flink.table.types.logical.VarBinaryType;
 
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
@@ -40,7 +39,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.util.Arrays;
 import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
@@ -122,20 +120,31 @@ public class HiveCatalogDataTypeTest {
 	}
 
 	@Test
-	public void testNonExactlyMatchedDataTypes() throws Exception {
+	public void testNonSupportedBinaryDataTypes() throws Exception {
 		DataType[] types = new DataType[] {
-			DataTypes.BINARY(BinaryType.MAX_LENGTH),
-			DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH)
+				DataTypes.BINARY(BinaryType.MAX_LENGTH)
 		};
 
 		CatalogTable table = createCatalogTable(types);
 
 		catalog.createDatabase(db1, createDb(), false);
+
+		exception.expect(UnsupportedOperationException.class);
 		catalog.createTable(path1, table, false);
+	}
+
+	@Test
+	public void testNonSupportedVarBinaryDataTypes() throws Exception {
+		DataType[] types = new DataType[] {
+				DataTypes.VARBINARY(20)
+		};
 
-		Arrays.equals(
-			new DataType[] {DataTypes.BYTES(), DataTypes.BYTES()},
-			catalog.getTable(path1).getSchema().getFieldDataTypes());
+		CatalogTable table = createCatalogTable(types);
+
+		catalog.createDatabase(db1, createDb(), false);
+
+		exception.expect(UnsupportedOperationException.class);
+		catalog.createTable(path1, table, false);
 	}
 
 	@Test
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 95e09b4..0fe10d7 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -87,7 +87,7 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogTestBase {
 											.field("fourth", DataTypes.DATE())
 											.field("fifth", DataTypes.DOUBLE())
 											.field("sixth", DataTypes.BIGINT())
-											.field("seventh", DataTypes.VARBINARY(200))
+											.field("seventh", DataTypes.BYTES())
 											.build();
 		CatalogTable catalogTable = new CatalogTableImpl(tableSchema, getBatchTableProperties(), TEST_COMMENT);
 		catalog.createTable(path1, catalogTable, false);