You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/13 20:58:04 UTC
[flink] branch master updated: [FLINK-13534][hive] Unable to query
Hive table with decimal column
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f33a6f0 [FLINK-13534][hive] Unable to query Hive table with decimal column
f33a6f0 is described below
commit f33a6f0fd6d46d666d95707ac320115ada21cb0f
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Aug 5 12:26:40 2019 +0800
[FLINK-13534][hive] Unable to query Hive table with decimal column
Fix the issue that Flink cannot access Hive table with decimal columns.
This closes #9390.
---
.../connectors/hive/HiveTableInputFormat.java | 24 +++++------------
.../connectors/hive/HiveTableOutputFormat.java | 28 ++++++++++---------
.../flink/connectors/hive/HiveTableSink.java | 20 +++++---------
.../flink/connectors/hive/HiveTableSource.java | 8 +-----
.../functions/hive/conversion/HiveInspectors.java | 2 +-
.../connectors/hive/TableEnvHiveConnectorTest.java | 31 +++++++++++++++++++++-
6 files changed, 61 insertions(+), 52 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
index 8965320..8a38fb3 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
@@ -20,13 +20,9 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
@@ -64,8 +60,7 @@ import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
* The HiveTableInputFormat are inspired by the HCatInputFormat and HadoopInputFormatBase.
* It's used to read from hive partition/non-partition table.
*/
-public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveTableInputSplit>
- implements ResultTypeQueryable {
+public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveTableInputSplit> {
private static final long serialVersionUID = 6351448428766433164L;
private static Logger logger = LoggerFactory.getLogger(HiveTableInputFormat.class);
@@ -78,7 +73,8 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
protected transient boolean fetched = false;
protected transient boolean hasNext;
- private RowTypeInfo rowTypeInfo;
+ // arity of each row, including partition columns
+ private int rowArity;
//Necessary info to init deserializer
private List<String> partitionColNames;
@@ -102,8 +98,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
this.jobConf = new JobConf(jobConf);
this.partitionColNames = catalogTable.getPartitionKeys();
- TableSchema tableSchema = catalogTable.getSchema();
- this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
+ rowArity = catalogTable.getSchema().getFieldCount();
}
@Override
@@ -212,7 +207,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
if (reachedEnd()) {
return null;
}
- Row row = new Row(rowTypeInfo.getArity());
+ Row row = new Row(rowArity);
try {
//Use HiveDeserializer to deserialize an object out of a Writable blob
Object hiveRowStruct = deserializer.deserialize(value);
@@ -234,11 +229,6 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
return row;
}
- @Override
- public TypeInformation getProducedType() {
- return rowTypeInfo;
- }
-
// --------------------------------------------------------------------------------------------
// Custom serialization methods
// --------------------------------------------------------------------------------------------
@@ -246,7 +236,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
private void writeObject(ObjectOutputStream out) throws IOException {
super.write(out);
jobConf.write(out);
- out.writeObject(rowTypeInfo);
+ out.writeObject(rowArity);
out.writeObject(partitionColNames);
out.writeObject(partitions);
}
@@ -263,7 +253,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
if (currentUserCreds != null) {
jobConf.getCredentials().addAll(currentUserCreds);
}
- rowTypeInfo = (RowTypeInfo) in.readObject();
+ rowArity = (int) in.readObject();
partitionColNames = (List<String>) in.readObject();
partitions = (List<HiveTablePartition>) in.readObject();
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index 9e1ee46..ec0b41d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
@@ -38,7 +37,6 @@ import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -114,7 +112,9 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
private transient JobConf jobConf;
private transient ObjectPath tablePath;
private transient List<String> partitionColumns;
- private transient RowTypeInfo rowTypeInfo;
+ // Ideally we should maintain a TableSchema here, but it's not Serializable
+ private transient String[] fieldNames;
+ private transient DataType[] fieldTypes;
private transient HiveTablePartition hiveTablePartition;
private transient Properties tableProperties;
private transient boolean overwrite;
@@ -159,7 +159,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
this.tablePath = tablePath;
this.partitionColumns = table.getPartitionKeys();
TableSchema tableSchema = table.getSchema();
- this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
+ this.fieldNames = tableSchema.getFieldNames();
+ this.fieldTypes = tableSchema.getFieldDataTypes();
this.hiveTablePartition = hiveTablePartition;
this.tableProperties = tableProperties;
this.overwrite = overwrite;
@@ -177,7 +178,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
out.writeObject(isPartitioned);
out.writeObject(isDynamicPartition);
out.writeObject(overwrite);
- out.writeObject(rowTypeInfo);
+ out.writeObject(fieldNames);
+ out.writeObject(fieldTypes);
out.writeObject(hiveTablePartition);
out.writeObject(partitionColumns);
out.writeObject(tablePath);
@@ -200,7 +202,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
isPartitioned = (boolean) in.readObject();
isDynamicPartition = (boolean) in.readObject();
overwrite = (boolean) in.readObject();
- rowTypeInfo = (RowTypeInfo) in.readObject();
+ fieldNames = (String[]) in.readObject();
+ fieldTypes = (DataType[]) in.readObject();
hiveTablePartition = (HiveTablePartition) in.readObject();
partitionColumns = (List<String>) in.readObject();
tablePath = (ObjectPath) in.readObject();
@@ -286,26 +289,25 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
if (!isDynamicPartition) {
staticWriter = writerForLocation(hiveTablePartition.getStorageDescriptor().getLocation());
} else {
- dynamicPartitionOffset = rowTypeInfo.getArity() - partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
+ dynamicPartitionOffset = fieldNames.length - partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
}
- numNonPartitionColumns = isPartitioned ? rowTypeInfo.getArity() - partitionColumns.size() : rowTypeInfo.getArity();
+ numNonPartitionColumns = isPartitioned ? fieldNames.length - partitionColumns.size() : fieldNames.length;
hiveConversions = new HiveObjectConversion[numNonPartitionColumns];
List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
for (int i = 0; i < numNonPartitionColumns; i++) {
- DataType dataType = LegacyTypeInfoDataTypeConverter.toDataType(rowTypeInfo.getTypeAt(i));
- ObjectInspector objectInspector = HiveInspectors.getObjectInspector(dataType);
+ ObjectInspector objectInspector = HiveInspectors.getObjectInspector(fieldTypes[i]);
objectInspectors.add(objectInspector);
- hiveConversions[i] = HiveInspectors.getConversion(objectInspector, dataType.getLogicalType());
+ hiveConversions[i] = HiveInspectors.getConversion(objectInspector, fieldTypes[i].getLogicalType());
}
if (!isPartitioned) {
rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
- Arrays.asList(rowTypeInfo.getFieldNames()),
+ Arrays.asList(fieldNames),
objectInspectors);
} else {
rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
- Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - partitionColumns.size()),
+ Arrays.asList(fieldNames).subList(0, fieldNames.length - partitionColumns.size()),
objectInspectors);
defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index a36479d..2974de0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -20,7 +20,6 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
@@ -32,6 +31,7 @@ import org.apache.flink.table.sinks.OutputFormatTableSink;
import org.apache.flink.table.sinks.OverwritableTableSink;
import org.apache.flink.table.sinks.PartitionableTableSink;
import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -64,7 +64,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
private final JobConf jobConf;
private final CatalogTable catalogTable;
private final ObjectPath tablePath;
- private final RowTypeInfo rowTypeInfo;
+ private final TableSchema tableSchema;
private final String hiveVersion;
private Map<String, String> staticPartitionSpec = Collections.emptyMap();
@@ -77,8 +77,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
this.catalogTable = table;
hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
"Hive version is not defined");
- TableSchema tableSchema = table.getSchema();
- rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
+ tableSchema = table.getSchema();
}
@Override
@@ -136,18 +135,13 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
}
@Override
- public String[] getFieldNames() {
- return rowTypeInfo.getFieldNames();
+ public DataType getConsumedDataType() {
+ return getTableSchema().toRowDataType();
}
@Override
- public TypeInformation<?>[] getFieldTypes() {
- return rowTypeInfo.getFieldTypes();
- }
-
- @Override
- public TypeInformation<Row> getOutputType() {
- return rowTypeInfo;
+ public TableSchema getTableSchema() {
+ return tableSchema;
}
// get a staging dir associated with a final dir
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 0563029..451bd93 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -21,7 +21,6 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
@@ -107,12 +106,7 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
@Override
public DataType getProducedDataType() {
- TableSchema tableSchema = catalogTable.getSchema();
- DataTypes.Field[] fields = new DataTypes.Field[tableSchema.getFieldCount()];
- for (int i = 0; i < fields.length; i++) {
- fields[i] = DataTypes.FIELD(tableSchema.getFieldName(i).get(), tableSchema.getFieldDataType(i).get());
- }
- return DataTypes.ROW(fields);
+ return getTableSchema().toRowDataType();
}
@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 6d82d38..3546668 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -203,7 +203,7 @@ public class HiveInspectors {
} else if (inspector instanceof HiveVarcharObjectInspector) {
return o -> new HiveVarchar((String) o, ((VarCharType) dataType).getLength());
} else if (inspector instanceof HiveDecimalObjectInspector) {
- return o -> HiveDecimal.create((BigDecimal) o);
+ return o -> o == null ? null : HiveDecimal.create((BigDecimal) o);
}
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index e8c402a..0845eae 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -138,6 +138,33 @@ public class TableEnvHiveConnectorTest {
hiveShell.execute("drop database db1 cascade");
}
+ @Test
+ public void testDecimal() throws Exception {
+ hiveShell.execute("create database db1");
+ try {
+ hiveShell.execute("create table db1.src1 (x decimal(10,2))");
+ hiveShell.execute("create table db1.src2 (x decimal(10,2))");
+ hiveShell.execute("create table db1.dest (x decimal(10,2))");
+ // populate src1 from Hive
+ hiveShell.execute("insert into db1.src1 values (1.0),(2.12),(5.123),(5.456),(123456789.12)");
+
+ TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+ // populate src2 with same data from Flink
+ tableEnv.sqlUpdate("insert into db1.src2 values (cast(1.0 as decimal(10,2))), (cast(2.12 as decimal(10,2))), " +
+ "(cast(5.123 as decimal(10,2))), (cast(5.456 as decimal(10,2))), (cast(123456789.12 as decimal(10,2)))");
+ tableEnv.execute("test1");
+ // verify src1 and src2 contain same data
+ verifyHiveQueryResult("select * from db1.src2", hiveShell.executeQuery("select * from db1.src1"));
+
+ // populate dest with src1 from Flink -- to test reading decimal type from Hive
+ tableEnv.sqlUpdate("insert into db1.dest select * from db1.src1");
+ tableEnv.execute("test2");
+ verifyHiveQueryResult("select * from db1.dest", hiveShell.executeQuery("select * from db1.src1"));
+ } finally {
+ hiveShell.execute("drop database db1 cascade");
+ }
+ }
+
private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
@@ -146,6 +173,8 @@ public class TableEnvHiveConnectorTest {
}
private void verifyHiveQueryResult(String query, List<String> expected) {
- assertEquals(new HashSet<>(expected), new HashSet<>(hiveShell.executeQuery(query)));
+ List<String> results = hiveShell.executeQuery(query);
+ assertEquals(expected.size(), results.size());
+ assertEquals(new HashSet<>(expected), new HashSet<>(results));
}
}