You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/10/28 23:38:23 UTC

[flink] branch master updated: [FLINK-14397][hive] Failed to run Hive UDTF with array arguments

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e18320b  [FLINK-14397][hive] Failed to run Hive UDTF with array arguments
e18320b is described below

commit e18320b76047af4e15297e3e89b6c46ef3dae9bf
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Oct 17 21:15:55 2019 +0800

    [FLINK-14397][hive] Failed to run Hive UDTF with array arguments
    
    Fix the issue that calling Hive UDTF with array arguments causes cast exception.
    
    This closes #9927.
---
 .../functions/hive/conversion/HiveInspectors.java  | 60 +++++++++++++++++++++-
 .../connectors/hive/TableEnvHiveConnectorTest.java | 38 ++++++++++++++
 .../flink/table/catalog/hive/HiveTestUtils.java    | 32 ++++++++++++
 3 files changed, 128 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 22ce60b..e7e88b2 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -96,7 +96,9 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
+import java.lang.reflect.Array;
 import java.math.BigDecimal;
+import java.sql.Date;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -312,9 +314,11 @@ public class HiveInspectors {
 			ListObjectInspector listInspector = (ListObjectInspector) inspector;
 			List<?> list = listInspector.getList(data);
 
-			Object[] result = new Object[list.size()];
+			// flink expects a specific array type (e.g. Integer[] instead of Object[]), so we have to get the element class
+			ObjectInspector elementInspector = listInspector.getListElementObjectInspector();
+			Object[] result = (Object[]) Array.newInstance(getClassFromObjectInspector(elementInspector), list.size());
 			for (int i = 0; i < list.size(); i++) {
-				result[i] = toFlinkObject(listInspector.getListElementObjectInspector(), list.get(i));
+				result[i] = toFlinkObject(elementInspector, list.get(i));
 			}
 			return result;
 		}
@@ -450,4 +454,56 @@ public class HiveInspectors {
 	public static DataType toFlinkType(ObjectInspector inspector) {
 		return HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(inspector.getTypeName()));
 	}
+
+	// given a Hive ObjectInspector, get the class for corresponding Flink object
+	private static Class<?> getClassFromObjectInspector(ObjectInspector inspector) {
+		switch (inspector.getCategory()) {
+			case PRIMITIVE: {
+				PrimitiveObjectInspector primitiveOI = (PrimitiveObjectInspector) inspector;
+				switch (primitiveOI.getPrimitiveCategory()) {
+					case STRING:
+					case CHAR:
+					case VARCHAR:
+						return String.class;
+					case INT:
+						return Integer.class;
+					case LONG:
+						return Long.class;
+					case BYTE:
+						return Byte.class;
+					case SHORT:
+						return Short.class;
+					case FLOAT:
+						return Float.class;
+					case DOUBLE:
+						return Double.class;
+					case DECIMAL:
+						return BigDecimal.class;
+					case BOOLEAN:
+						return Boolean.class;
+					case BINARY:
+						return byte[].class;
+					case DATE:
+						return Date.class;
+					case TIMESTAMP:
+					case INTERVAL_DAY_TIME:
+					case INTERVAL_YEAR_MONTH:
+					default:
+						throw new IllegalArgumentException(
+								"Unsupported primitive type " + primitiveOI.getPrimitiveCategory().name());
+
+				}
+			}
+			case LIST:
+				ListObjectInspector listInspector = (ListObjectInspector) inspector;
+				Class elementClz = getClassFromObjectInspector(listInspector.getListElementObjectInspector());
+				return Array.newInstance(elementClz, 0).getClass();
+			case MAP:
+				return Map.class;
+			case STRUCT:
+				return Row.class;
+			default:
+				throw new IllegalArgumentException("Unsupported type " + inspector.getCategory().name());
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 784bcc8..9484e55 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -35,14 +35,17 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 import scala.collection.JavaConverters;
 
@@ -260,6 +263,41 @@ public class TableEnvHiveConnectorTest {
 		}
 	}
 
+	@Test
+	public void testUDTF() throws Exception {
+		// W/o https://issues.apache.org/jira/browse/HIVE-11878 Hive registers the App classloader as the classloader
+		// for the UDTF and closes the App classloader when we tear down the session. This causes problems for JUnit code
+		// and shutdown hooks that have to run after the test finishes, because App classloader can no longer load new
+		// classes. And will crash the forked JVM, thus failing the test phase.
+		// Therefore disable such tests for older Hive versions.
+		String hiveVersion = HiveShimLoader.getHiveVersion();
+		Assume.assumeTrue(hiveVersion.compareTo("2.0.0") >= 0 || hiveVersion.compareTo("1.3.0") >= 0);
+		hiveShell.execute("create database db1");
+		try {
+			hiveShell.execute("create table db1.simple (i int,a array<int>)");
+			hiveShell.execute("create table db1.nested (a array<map<int, string>>)");
+			hiveShell.execute("create function hiveudtf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'");
+			hiveShell.insertInto("db1", "simple").addRow(3, Arrays.asList(1, 2, 3)).commit();
+			Map<Integer, String> map1 = new HashMap<>();
+			map1.put(1, "a");
+			map1.put(2, "b");
+			Map<Integer, String> map2 = new HashMap<>();
+			map2.put(3, "c");
+			hiveShell.insertInto("db1", "nested").addRow(Arrays.asList(map1, map2)).commit();
+
+			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+			List<Row> results = HiveTestUtils.collectTable(tableEnv,
+					tableEnv.sqlQuery("select x from db1.simple, lateral table(hiveudtf(a)) as T(x)"));
+			assertEquals("[1, 2, 3]", results.toString());
+			results = HiveTestUtils.collectTable(tableEnv,
+					tableEnv.sqlQuery("select x from db1.nested, lateral table(hiveudtf(a)) as T(x)"));
+			assertEquals("[{1=a, 2=b}, {3=c}]", results.toString());
+		} finally {
+			hiveShell.execute("drop database db1 cascade");
+			hiveShell.execute("drop function hiveudtf");
+		}
+	}
+
 	private TableEnvironment getTableEnvWithHiveCatalog() {
 		TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
 		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index deb9105..e274633 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -18,12 +18,23 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTest;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.planner.sinks.CollectRowTableSink;
+import org.apache.flink.table.planner.sinks.CollectTableSink;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -33,6 +44,9 @@ import java.io.IOException;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
@@ -108,4 +122,22 @@ public class HiveTestUtils {
 		tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
 		return tableEnv;
 	}
+
+	public static List<Row> collectTable(TableEnvironment tableEnv, Table table) throws Exception {
+		CollectTableSink sink = new CollectRowTableSink();
+		TableSchema tableSchema = table.getSchema();
+		sink = (CollectTableSink) sink.configure(tableSchema.getFieldNames(), tableSchema.getFieldTypes());
+		final String id = new AbstractID().toString();
+		TypeSerializer serializer = TypeConversions.fromDataTypeToLegacyInfo(sink.getConsumedDataType())
+				.createSerializer(new ExecutionConfig());
+		sink.init(serializer, id);
+		String sinkName = UUID.randomUUID().toString();
+		tableEnv.registerTableSink(sinkName, sink);
+		final String builtInCatalogName = EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
+		final String builtInDBName = EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
+		tableEnv.insertInto(table, builtInCatalogName, builtInDBName, sinkName);
+		JobExecutionResult result = tableEnv.execute("collect-table");
+		ArrayList<byte[]> data = result.getAccumulatorResult(id);
+		return SerializedListAccumulator.deserializeList(data, serializer);
+	}
 }