You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/10/28 23:38:23 UTC
[flink] branch master updated: [FLINK-14397][hive] Failed to run
Hive UDTF with array arguments
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e18320b [FLINK-14397][hive] Failed to run Hive UDTF with array arguments
e18320b is described below
commit e18320b76047af4e15297e3e89b6c46ef3dae9bf
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Oct 17 21:15:55 2019 +0800
[FLINK-14397][hive] Failed to run Hive UDTF with array arguments
Fix the issue that calling Hive UDTF with array arguments causes cast exception.
This closes #9927.
---
.../functions/hive/conversion/HiveInspectors.java | 60 +++++++++++++++++++++-
.../connectors/hive/TableEnvHiveConnectorTest.java | 38 ++++++++++++++
.../flink/table/catalog/hive/HiveTestUtils.java | 32 ++++++++++++
3 files changed, 128 insertions(+), 2 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 22ce60b..e7e88b2 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -96,7 +96,9 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import java.lang.reflect.Array;
import java.math.BigDecimal;
+import java.sql.Date;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -312,9 +314,11 @@ public class HiveInspectors {
ListObjectInspector listInspector = (ListObjectInspector) inspector;
List<?> list = listInspector.getList(data);
- Object[] result = new Object[list.size()];
+ // flink expects a specific array type (e.g. Integer[] instead of Object[]), so we have to get the element class
+ ObjectInspector elementInspector = listInspector.getListElementObjectInspector();
+ Object[] result = (Object[]) Array.newInstance(getClassFromObjectInspector(elementInspector), list.size());
for (int i = 0; i < list.size(); i++) {
- result[i] = toFlinkObject(listInspector.getListElementObjectInspector(), list.get(i));
+ result[i] = toFlinkObject(elementInspector, list.get(i));
}
return result;
}
@@ -450,4 +454,56 @@ public class HiveInspectors {
public static DataType toFlinkType(ObjectInspector inspector) {
return HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(inspector.getTypeName()));
}
+
+ // given a Hive ObjectInspector, get the class for corresponding Flink object
+ private static Class<?> getClassFromObjectInspector(ObjectInspector inspector) {
+ switch (inspector.getCategory()) {
+ case PRIMITIVE: {
+ PrimitiveObjectInspector primitiveOI = (PrimitiveObjectInspector) inspector;
+ switch (primitiveOI.getPrimitiveCategory()) {
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return String.class;
+ case INT:
+ return Integer.class;
+ case LONG:
+ return Long.class;
+ case BYTE:
+ return Byte.class;
+ case SHORT:
+ return Short.class;
+ case FLOAT:
+ return Float.class;
+ case DOUBLE:
+ return Double.class;
+ case DECIMAL:
+ return BigDecimal.class;
+ case BOOLEAN:
+ return Boolean.class;
+ case BINARY:
+ return byte[].class;
+ case DATE:
+ return Date.class;
+ case TIMESTAMP:
+ case INTERVAL_DAY_TIME:
+ case INTERVAL_YEAR_MONTH:
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported primitive type " + primitiveOI.getPrimitiveCategory().name());
+
+ }
+ }
+ case LIST:
+ ListObjectInspector listInspector = (ListObjectInspector) inspector;
+ Class elementClz = getClassFromObjectInspector(listInspector.getListElementObjectInspector());
+ return Array.newInstance(elementClz, 0).getClass();
+ case MAP:
+ return Map.class;
+ case STRUCT:
+ return Row.class;
+ default:
+ throw new IllegalArgumentException("Unsupported type " + inspector.getCategory().name());
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 784bcc8..9484e55 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -35,14 +35,17 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import scala.collection.JavaConverters;
@@ -260,6 +263,41 @@ public class TableEnvHiveConnectorTest {
}
}
+ @Test
+ public void testUDTF() throws Exception {
+ // W/o https://issues.apache.org/jira/browse/HIVE-11878 Hive registers the App classloader as the classloader
+ // for the UDTF and closes the App classloader when we tear down the session. This causes problems for JUnit code
+ // and shutdown hooks that have to run after the test finishes, because App classloader can no longer load new
+ // classes. And will crash the forked JVM, thus failing the test phase.
+ // Therefore disable such tests for older Hive versions.
+ String hiveVersion = HiveShimLoader.getHiveVersion();
+ Assume.assumeTrue(hiveVersion.compareTo("2.0.0") >= 0 || hiveVersion.compareTo("1.3.0") >= 0);
+ hiveShell.execute("create database db1");
+ try {
+ hiveShell.execute("create table db1.simple (i int,a array<int>)");
+ hiveShell.execute("create table db1.nested (a array<map<int, string>>)");
+ hiveShell.execute("create function hiveudtf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'");
+ hiveShell.insertInto("db1", "simple").addRow(3, Arrays.asList(1, 2, 3)).commit();
+ Map<Integer, String> map1 = new HashMap<>();
+ map1.put(1, "a");
+ map1.put(2, "b");
+ Map<Integer, String> map2 = new HashMap<>();
+ map2.put(3, "c");
+ hiveShell.insertInto("db1", "nested").addRow(Arrays.asList(map1, map2)).commit();
+
+ TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+ List<Row> results = HiveTestUtils.collectTable(tableEnv,
+ tableEnv.sqlQuery("select x from db1.simple, lateral table(hiveudtf(a)) as T(x)"));
+ assertEquals("[1, 2, 3]", results.toString());
+ results = HiveTestUtils.collectTable(tableEnv,
+ tableEnv.sqlQuery("select x from db1.nested, lateral table(hiveudtf(a)) as T(x)"));
+ assertEquals("[{1=a, 2=b}, {3=c}]", results.toString());
+ } finally {
+ hiveShell.execute("drop database db1 cascade");
+ hiveShell.execute("drop function hiveudtf");
+ }
+ }
+
private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index deb9105..e274633 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -18,12 +18,23 @@
package org.apache.flink.table.catalog.hive;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTest;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.planner.sinks.CollectRowTableSink;
+import org.apache.flink.table.planner.sinks.CollectTableSink;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.AbstractID;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -33,6 +44,9 @@ import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
@@ -108,4 +122,22 @@ public class HiveTestUtils {
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
return tableEnv;
}
+
+ public static List<Row> collectTable(TableEnvironment tableEnv, Table table) throws Exception {
+ CollectTableSink sink = new CollectRowTableSink();
+ TableSchema tableSchema = table.getSchema();
+ sink = (CollectTableSink) sink.configure(tableSchema.getFieldNames(), tableSchema.getFieldTypes());
+ final String id = new AbstractID().toString();
+ TypeSerializer serializer = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType())
+ .createSerializer(new ExecutionConfig());
+ sink.init(serializer, id);
+ String sinkName = UUID.randomUUID().toString();
+ tableEnv.registerTableSink(sinkName, sink);
+ final String builtInCatalogName = EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
+ final String builtInDBName = EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
+ tableEnv.insertInto(table, builtInCatalogName, builtInDBName, sinkName);
+ JobExecutionResult result = tableEnv.execute("collect-table");
+ ArrayList<byte[]> data = result.getAccumulatorResult(id);
+ return SerializedListAccumulator.deserializeList(data, serializer);
+ }
}