You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/10 21:29:54 UTC

[flink] branch master updated: [FLINK-13128][hive] make HiveGenericUDAF expose accumulator type in order to create its corresponding AggregateFunctionDefinition

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dcd5c2  [FLINK-13128][hive] make HiveGenericUDAF expose accumulator type in order to create its corresponding AggregateFunctionDefinition
1dcd5c2 is described below

commit 1dcd5c2eb9a9eeab8146136a3ed40fc1c185a823
Author: bowen.li <bo...@gmail.com>
AuthorDate: Fri Jul 5 15:34:42 2019 -0700

    [FLINK-13128][hive] make HiveGenericUDAF expose accumulator type in order to create its corresponding AggregateFunctionDefinition
---
 .../table/functions/hive/HiveGenericUDAF.java      | 45 +++++++++++++---------
 1 file changed, 26 insertions(+), 19 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index 5782455..6015ef6 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -58,7 +58,8 @@ public class HiveGenericUDAF
 
 	private transient GenericUDAFEvaluator partialEvaluator;
 	private transient GenericUDAFEvaluator finalEvaluator;
-	private transient ObjectInspector finalResult;
+	private transient ObjectInspector partialResultObjectInspector;
+	private transient ObjectInspector finalResultObjectInspector;
 	private transient HiveObjectConversion[] conversions;
 	private transient boolean allIdentityConverter;
 	private transient boolean initialized;
@@ -89,13 +90,13 @@ public class HiveGenericUDAF
 		// PARTIAL1: from original data to partial aggregation data:
 		// 		iterate() and terminatePartial() will be called.
 		this.partialEvaluator = createEvaluator(inputInspectors);
-		ObjectInspector partialResult = partialEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors);
+		this.partialResultObjectInspector = partialEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors);
 
 		// FINAL: from partial aggregation to full aggregation:
 		// 		merge() and terminate() will be called.
 		this.finalEvaluator = createEvaluator(inputInspectors);
-		this.finalResult = finalEvaluator.init(
-			GenericUDAFEvaluator.Mode.FINAL, new ObjectInspector[]{ partialResult });
+		this.finalResultObjectInspector = finalEvaluator.init(
+			GenericUDAFEvaluator.Mode.FINAL, new ObjectInspector[]{ partialResultObjectInspector });
 
 		conversions = new HiveObjectConversion[inputInspectors.length];
 		for (int i = 0; i < inputInspectors.length; i++) {
@@ -175,7 +176,7 @@ public class HiveGenericUDAF
 	@Override
 	public Object getValue(GenericUDAFEvaluator.AggregationBuffer accumulator) {
 		try {
-			return HiveInspectors.toFlinkObject(finalResult, finalEvaluator.terminate(accumulator));
+			return HiveInspectors.toFlinkObject(finalResultObjectInspector, finalEvaluator.terminate(accumulator));
 		} catch (HiveException e) {
 			throw new FlinkHiveUDFException(
 				String.format("Failed to get final result on %s", hiveFunctionWrapper.getClassName()), e);
@@ -191,21 +192,12 @@ public class HiveGenericUDAF
 	@Override
 	public DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes) {
 		try {
-			ObjectInspector[] inputs = HiveInspectors.toInspectors(constantArguments, argTypes);
-			GenericUDAFEvaluator evaluator = createEvaluator(inputs);
-
-			// The ObjectInspector for the parameters:
-			// In PARTIAL1 mode, the parameters are original data;
-			// In FINAL mode, the parameters are just partial aggregations
-			// (in that case, the array will always have a single element).
-
-			ObjectInspector partialObjectInspector = evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputs);
-
-			ObjectInspector finalObjectInspector = evaluator.init(
-				GenericUDAFEvaluator.Mode.FINAL,
-				new ObjectInspector[]{ partialObjectInspector });
+			if (!initialized) {
+				setArgumentTypesAndConstants(constantArguments, argTypes);
+				init();
+			}
 
-			return HiveTypeUtil.toFlinkType(finalObjectInspector);
+			return HiveTypeUtil.toFlinkType(finalResultObjectInspector);
 		} catch (Exception e) {
 			throw new FlinkHiveUDFException(
 				String.format("Failed to get Hive result type from %s", hiveFunctionWrapper.getClassName()), e);
@@ -217,4 +209,19 @@ public class HiveGenericUDAF
 		return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
 			getHiveResultType(this.constantArguments, this.argTypes));
 	}
+
+	@Override
+	public TypeInformation getAccumulatorType() {
+		try {
+			if (!initialized) {
+				init();
+			}
+
+			return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+				HiveTypeUtil.toFlinkType(partialResultObjectInspector));
+		} catch (Exception e) {
+			throw new FlinkHiveUDFException(
+				String.format("Failed to get Hive accumulator type from %s", hiveFunctionWrapper.getClassName()), e);
+		}
+	}
 }