You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/13 20:58:04 UTC

[flink] branch master updated: [FLINK-13534][hive] Unable to query Hive table with decimal column

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f33a6f0  [FLINK-13534][hive] Unable to query Hive table with decimal column
f33a6f0 is described below

commit f33a6f0fd6d46d666d95707ac320115ada21cb0f
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Aug 5 12:26:40 2019 +0800

    [FLINK-13534][hive] Unable to query Hive table with decimal column
    
    Fix the issue that Flink cannot access Hive table with decimal columns.
    
    This closes #9390.
---
 .../connectors/hive/HiveTableInputFormat.java      | 24 +++++------------
 .../connectors/hive/HiveTableOutputFormat.java     | 28 ++++++++++---------
 .../flink/connectors/hive/HiveTableSink.java       | 20 +++++---------
 .../flink/connectors/hive/HiveTableSource.java     |  8 +-----
 .../functions/hive/conversion/HiveInspectors.java  |  2 +-
 .../connectors/hive/TableEnvHiveConnectorTest.java | 31 +++++++++++++++++++++-
 6 files changed, 61 insertions(+), 52 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
index 8965320..8a38fb3 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
@@ -20,13 +20,9 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
 import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
@@ -64,8 +60,7 @@ import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
  * The HiveTableInputFormat are inspired by the HCatInputFormat and HadoopInputFormatBase.
  * It's used to read from hive partition/non-partition table.
  */
-public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveTableInputSplit>
-		implements ResultTypeQueryable {
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveTableInputSplit> {
 	private static final long serialVersionUID = 6351448428766433164L;
 	private static Logger logger = LoggerFactory.getLogger(HiveTableInputFormat.class);
 
@@ -78,7 +73,8 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
 	protected transient boolean fetched = false;
 	protected transient boolean hasNext;
 
-	private RowTypeInfo rowTypeInfo;
+	// arity of each row, including partition columns
+	private int rowArity;
 
 	//Necessary info to init deserializer
 	private List<String> partitionColNames;
@@ -102,8 +98,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
 
 		this.jobConf = new JobConf(jobConf);
 		this.partitionColNames = catalogTable.getPartitionKeys();
-		TableSchema tableSchema = catalogTable.getSchema();
-		this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
+		rowArity = catalogTable.getSchema().getFieldCount();
 	}
 
 	@Override
@@ -212,7 +207,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
 		if (reachedEnd()) {
 			return null;
 		}
-		Row row = new Row(rowTypeInfo.getArity());
+		Row row = new Row(rowArity);
 		try {
 			//Use HiveDeserializer to deserialize an object out of a Writable blob
 			Object hiveRowStruct = deserializer.deserialize(value);
@@ -234,11 +229,6 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
 		return row;
 	}
 
-	@Override
-	public TypeInformation getProducedType() {
-		return rowTypeInfo;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Custom serialization methods
 	// --------------------------------------------------------------------------------------------
@@ -246,7 +236,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
 	private void writeObject(ObjectOutputStream out) throws IOException {
 		super.write(out);
 		jobConf.write(out);
-		out.writeObject(rowTypeInfo);
+		out.writeObject(rowArity);
 		out.writeObject(partitionColNames);
 		out.writeObject(partitions);
 	}
@@ -263,7 +253,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
 		if (currentUserCreds != null) {
 			jobConf.getCredentials().addAll(currentUserCreds);
 		}
-		rowTypeInfo = (RowTypeInfo) in.readObject();
+		rowArity = (int) in.readObject();
 		partitionColNames = (List<String>) in.readObject();
 		partitions = (List<HiveTablePartition>) in.readObject();
 	}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index 9e1ee46..ec0b41d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.io.InitializeOnMaster;
 import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
 import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
 import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -38,7 +37,6 @@ import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -114,7 +112,9 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 	private transient JobConf jobConf;
 	private transient ObjectPath tablePath;
 	private transient List<String> partitionColumns;
-	private transient RowTypeInfo rowTypeInfo;
+	// Ideally we should maintain a TableSchema here, but it's not Serializable
+	private transient String[] fieldNames;
+	private transient DataType[] fieldTypes;
 	private transient HiveTablePartition hiveTablePartition;
 	private transient Properties tableProperties;
 	private transient boolean overwrite;
@@ -159,7 +159,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 		this.tablePath = tablePath;
 		this.partitionColumns = table.getPartitionKeys();
 		TableSchema tableSchema = table.getSchema();
-		this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
+		this.fieldNames = tableSchema.getFieldNames();
+		this.fieldTypes = tableSchema.getFieldDataTypes();
 		this.hiveTablePartition = hiveTablePartition;
 		this.tableProperties = tableProperties;
 		this.overwrite = overwrite;
@@ -177,7 +178,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 		out.writeObject(isPartitioned);
 		out.writeObject(isDynamicPartition);
 		out.writeObject(overwrite);
-		out.writeObject(rowTypeInfo);
+		out.writeObject(fieldNames);
+		out.writeObject(fieldTypes);
 		out.writeObject(hiveTablePartition);
 		out.writeObject(partitionColumns);
 		out.writeObject(tablePath);
@@ -200,7 +202,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 		isPartitioned = (boolean) in.readObject();
 		isDynamicPartition = (boolean) in.readObject();
 		overwrite = (boolean) in.readObject();
-		rowTypeInfo = (RowTypeInfo) in.readObject();
+		fieldNames = (String[]) in.readObject();
+		fieldTypes = (DataType[]) in.readObject();
 		hiveTablePartition = (HiveTablePartition) in.readObject();
 		partitionColumns = (List<String>) in.readObject();
 		tablePath = (ObjectPath) in.readObject();
@@ -286,26 +289,25 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 		if (!isDynamicPartition) {
 			staticWriter = writerForLocation(hiveTablePartition.getStorageDescriptor().getLocation());
 		} else {
-			dynamicPartitionOffset = rowTypeInfo.getArity() - partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
+			dynamicPartitionOffset = fieldNames.length - partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
 		}
 
-		numNonPartitionColumns = isPartitioned ? rowTypeInfo.getArity() - partitionColumns.size() : rowTypeInfo.getArity();
+		numNonPartitionColumns = isPartitioned ? fieldNames.length - partitionColumns.size() : fieldNames.length;
 		hiveConversions = new HiveObjectConversion[numNonPartitionColumns];
 		List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
 		for (int i = 0; i < numNonPartitionColumns; i++) {
-			DataType dataType = LegacyTypeInfoDataTypeConverter.toDataType(rowTypeInfo.getTypeAt(i));
-			ObjectInspector objectInspector = HiveInspectors.getObjectInspector(dataType);
+			ObjectInspector objectInspector = HiveInspectors.getObjectInspector(fieldTypes[i]);
 			objectInspectors.add(objectInspector);
-			hiveConversions[i] = HiveInspectors.getConversion(objectInspector, dataType.getLogicalType());
+			hiveConversions[i] = HiveInspectors.getConversion(objectInspector, fieldTypes[i].getLogicalType());
 		}
 
 		if (!isPartitioned) {
 			rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
-				Arrays.asList(rowTypeInfo.getFieldNames()),
+				Arrays.asList(fieldNames),
 				objectInspectors);
 		} else {
 			rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
-				Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - partitionColumns.size()),
+				Arrays.asList(fieldNames).subList(0, fieldNames.length - partitionColumns.size()),
 				objectInspectors);
 			defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
 					HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index a36479d..2974de0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -20,7 +20,6 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -32,6 +31,7 @@ import org.apache.flink.table.sinks.OutputFormatTableSink;
 import org.apache.flink.table.sinks.OverwritableTableSink;
 import org.apache.flink.table.sinks.PartitionableTableSink;
 import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -64,7 +64,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
 	private final JobConf jobConf;
 	private final CatalogTable catalogTable;
 	private final ObjectPath tablePath;
-	private final RowTypeInfo rowTypeInfo;
+	private final TableSchema tableSchema;
 	private final String hiveVersion;
 
 	private Map<String, String> staticPartitionSpec = Collections.emptyMap();
@@ -77,8 +77,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
 		this.catalogTable = table;
 		hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
 				"Hive version is not defined");
-		TableSchema tableSchema = table.getSchema();
-		rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
+		tableSchema = table.getSchema();
 	}
 
 	@Override
@@ -136,18 +135,13 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
 	}
 
 	@Override
-	public String[] getFieldNames() {
-		return rowTypeInfo.getFieldNames();
+	public DataType getConsumedDataType() {
+		return getTableSchema().toRowDataType();
 	}
 
 	@Override
-	public TypeInformation<?>[] getFieldTypes() {
-		return rowTypeInfo.getFieldTypes();
-	}
-
-	@Override
-	public TypeInformation<Row> getOutputType() {
-		return rowTypeInfo;
+	public TableSchema getTableSchema() {
+		return tableSchema;
 	}
 
 	// get a staging dir associated with a final dir
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 0563029..451bd93 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -21,7 +21,6 @@ package org.apache.flink.connectors.hive;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -107,12 +106,7 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
 
 	@Override
 	public DataType getProducedDataType() {
-		TableSchema tableSchema = catalogTable.getSchema();
-		DataTypes.Field[] fields = new DataTypes.Field[tableSchema.getFieldCount()];
-		for (int i = 0; i < fields.length; i++) {
-			fields[i] = DataTypes.FIELD(tableSchema.getFieldName(i).get(), tableSchema.getFieldDataType(i).get());
-		}
-		return DataTypes.ROW(fields);
+		return getTableSchema().toRowDataType();
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 6d82d38..3546668 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -203,7 +203,7 @@ public class HiveInspectors {
 			} else if (inspector instanceof HiveVarcharObjectInspector) {
 				return o -> new HiveVarchar((String) o, ((VarCharType) dataType).getLength());
 			} else if (inspector instanceof HiveDecimalObjectInspector) {
-				return o -> HiveDecimal.create((BigDecimal) o);
+				return o -> o == null ? null : HiveDecimal.create((BigDecimal) o);
 			}
 		}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index e8c402a..0845eae 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -138,6 +138,33 @@ public class TableEnvHiveConnectorTest {
 		hiveShell.execute("drop database db1 cascade");
 	}
 
+	@Test
+	public void testDecimal() throws Exception {
+		hiveShell.execute("create database db1");
+		try {
+			hiveShell.execute("create table db1.src1 (x decimal(10,2))");
+			hiveShell.execute("create table db1.src2 (x decimal(10,2))");
+			hiveShell.execute("create table db1.dest (x decimal(10,2))");
+			// populate src1 from Hive
+			hiveShell.execute("insert into db1.src1 values (1.0),(2.12),(5.123),(5.456),(123456789.12)");
+
+			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+			// populate src2 with same data from Flink
+			tableEnv.sqlUpdate("insert into db1.src2 values (cast(1.0 as decimal(10,2))), (cast(2.12 as decimal(10,2))), " +
+					"(cast(5.123 as decimal(10,2))), (cast(5.456 as decimal(10,2))), (cast(123456789.12 as decimal(10,2)))");
+			tableEnv.execute("test1");
+			// verify src1 and src2 contain same data
+			verifyHiveQueryResult("select * from db1.src2", hiveShell.executeQuery("select * from db1.src1"));
+
+			// populate dest with src1 from Flink -- to test reading decimal type from Hive
+			tableEnv.sqlUpdate("insert into db1.dest select * from db1.src1");
+			tableEnv.execute("test2");
+			verifyHiveQueryResult("select * from db1.dest", hiveShell.executeQuery("select * from db1.src1"));
+		} finally {
+			hiveShell.execute("drop database db1 cascade");
+		}
+	}
+
 	private TableEnvironment getTableEnvWithHiveCatalog() {
 		TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
 		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
@@ -146,6 +173,8 @@ public class TableEnvHiveConnectorTest {
 	}
 
 	private void verifyHiveQueryResult(String query, List<String> expected) {
-		assertEquals(new HashSet<>(expected), new HashSet<>(hiveShell.executeQuery(query)));
+		List<String> results = hiveShell.executeQuery(query);
+		assertEquals(expected.size(), results.size());
+		assertEquals(new HashSet<>(expected), new HashSet<>(results));
 	}
 }