You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/08 16:56:52 UTC

[flink] branch master updated: [FLINK-13110][hive] add shim of SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d561a6f  [FLINK-13110][hive] add shim of SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4
d561a6f is described below

commit d561a6f8ef7c494c575fbe179f41614a7d73ca43
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Jul 5 17:30:17 2019 +0800

    [FLINK-13110][hive] add shim of SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4
    
    This PR adds shim of SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4.
    
    This closes #9001.
---
 .../apache/flink/table/catalog/hive/client/HiveShim.java  |  8 ++++++++
 .../flink/table/catalog/hive/client/HiveShimV1.java       | 14 ++++++++++++++
 .../flink/table/catalog/hive/client/HiveShimV2.java       | 14 ++++++++++++++
 .../flink/table/functions/hive/HiveGenericUDAF.java       | 15 ++++++++++-----
 .../table/functions/hive/conversion/HiveInspectors.java   |  4 ++++
 .../flink/table/functions/hive/HiveGenericUDAFTest.java   |  3 ++-
 6 files changed, 52 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 320f078..812ece1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
@@ -97,4 +99,10 @@ public interface HiveShim {
 	 */
 	void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table)
 			throws InvalidOperationException, MetaException, TException;
+
+	/**
+	 * Creates SimpleGenericUDAFParameterInfo.
+	 */
+	SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing,
+			boolean distinct, boolean allColumns);
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
index 830d6e6..8733100 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
@@ -34,9 +34,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -105,4 +108,15 @@ public class HiveShimV1 implements HiveShim {
 		table.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, "true");
 		client.alter_table(databaseName, tableName, table);
 	}
+
+	@Override
+	public SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) {
+		try {
+			Constructor constructor = SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class,
+					boolean.class, boolean.class);
+			return (SimpleGenericUDAFParameterInfo) constructor.newInstance(params, distinct, allColumns);
+		} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+			throw new CatalogException("Failed to create SimpleGenericUDAFParameterInfo", e);
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
index 7df0aaf..2510497 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
@@ -34,9 +34,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
@@ -95,4 +98,15 @@ public class HiveShimV2 implements HiveShim {
 		// For Hive-2.3.4, we don't need to tell HMS not to update stats.
 		client.alter_table(databaseName, tableName, table);
 	}
+
+	@Override
+	public SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) {
+		try {
+			Constructor constructor = SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class,
+					boolean.class, boolean.class, boolean.class);
+			return (SimpleGenericUDAFParameterInfo) constructor.newInstance(params, isWindowing, distinct, allColumns);
+		} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+			throw new CatalogException("Failed to create SimpleGenericUDAFParameterInfo", e);
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index 163fc06..5782455 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionContext;
@@ -35,7 +37,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
-import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 import java.util.Arrays;
@@ -62,13 +63,16 @@ public class HiveGenericUDAF
 	private transient boolean allIdentityConverter;
 	private transient boolean initialized;
 
-	public HiveGenericUDAF(HiveFunctionWrapper funcWrapper) {
-		this(funcWrapper, false);
+	private final String hiveVersion;
+
+	public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, String hiveVersion) {
+		this(funcWrapper, false, hiveVersion);
 	}
 
-	public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, boolean isUDAFBridgeRequired) {
+	public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, boolean isUDAFBridgeRequired, String hiveVersion) {
 		this.hiveFunctionWrapper = funcWrapper;
 		this.isUDAFBridgeRequired = isUDAFBridgeRequired;
+		this.hiveVersion = hiveVersion;
 	}
 
 	@Override
@@ -112,8 +116,9 @@ public class HiveGenericUDAF
 			resolver = (GenericUDAFResolver2) hiveFunctionWrapper.createFunction();
 		}
 
+		HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
 		return resolver.getEvaluator(
-			new SimpleGenericUDAFParameterInfo(
+			hiveShim.createUDAFParameterInfo(
 				inputInspectors,
 				// The flag to indicate if the UDAF invocation was from the windowing function call or not.
 				// TODO: investigate whether this has impact on Flink streaming job with windows
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 0f9dd67..85ca033 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -338,6 +338,10 @@ public class HiveInspectors {
 			List<? extends StructField> fields = structInspector.getAllStructFieldRefs();
 
 			Row row = new Row(fields.size());
+			// StandardStructObjectInspector.getStructFieldData in Hive-1.2.1 only accepts array or list as data
+			if (!data.getClass().isArray() && !(data instanceof List)) {
+				data = new Object[]{data};
+			}
 			for (int i = 0; i < row.getArity(); i++) {
 				row.setField(
 					i,
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
index df09107..2945523 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.types.DataType;
 
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
@@ -129,7 +130,7 @@ public class HiveGenericUDAFTest {
 	private static HiveGenericUDAF init(Class hiveUdfClass, Object[] constantArgs, DataType[] argTypes) throws Exception {
 		HiveFunctionWrapper<GenericUDAFResolver2> wrapper = new HiveFunctionWrapper(hiveUdfClass.getName());
 
-		HiveGenericUDAF udf = new HiveGenericUDAF(wrapper);
+		HiveGenericUDAF udf = new HiveGenericUDAF(wrapper, HiveShimLoader.getHiveVersion());
 
 		udf.setArgumentTypesAndConstants(constantArgs, argTypes);
 		udf.getHiveResultType(constantArgs, argTypes);