You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/10 21:29:54 UTC
[flink] branch master updated: [FLINK-13128][hive] make
HiveGenericUDAF expose accumulator type in order to create its
corresponding AggregateFunctionDefinition
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1dcd5c2 [FLINK-13128][hive] make HiveGenericUDAF expose accumulator type in order to create its corresponding AggregateFunctionDefinition
1dcd5c2 is described below
commit 1dcd5c2eb9a9eeab8146136a3ed40fc1c185a823
Author: bowen.li <bo...@gmail.com>
AuthorDate: Fri Jul 5 15:34:42 2019 -0700
[FLINK-13128][hive] make HiveGenericUDAF expose accumulator type in order to create its corresponding AggregateFunctionDefinition
---
.../table/functions/hive/HiveGenericUDAF.java | 45 +++++++++++++---------
1 file changed, 26 insertions(+), 19 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index 5782455..6015ef6 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -58,7 +58,8 @@ public class HiveGenericUDAF
private transient GenericUDAFEvaluator partialEvaluator;
private transient GenericUDAFEvaluator finalEvaluator;
- private transient ObjectInspector finalResult;
+ private transient ObjectInspector partialResultObjectInspector;
+ private transient ObjectInspector finalResultObjectInspector;
private transient HiveObjectConversion[] conversions;
private transient boolean allIdentityConverter;
private transient boolean initialized;
@@ -89,13 +90,13 @@ public class HiveGenericUDAF
// PARTIAL1: from original data to partial aggregation data:
// iterate() and terminatePartial() will be called.
this.partialEvaluator = createEvaluator(inputInspectors);
- ObjectInspector partialResult = partialEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors);
+ this.partialResultObjectInspector = partialEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors);
// FINAL: from partial aggregation to full aggregation:
// merge() and terminate() will be called.
this.finalEvaluator = createEvaluator(inputInspectors);
- this.finalResult = finalEvaluator.init(
- GenericUDAFEvaluator.Mode.FINAL, new ObjectInspector[]{ partialResult });
+ this.finalResultObjectInspector = finalEvaluator.init(
+ GenericUDAFEvaluator.Mode.FINAL, new ObjectInspector[]{ partialResultObjectInspector });
conversions = new HiveObjectConversion[inputInspectors.length];
for (int i = 0; i < inputInspectors.length; i++) {
@@ -175,7 +176,7 @@ public class HiveGenericUDAF
@Override
public Object getValue(GenericUDAFEvaluator.AggregationBuffer accumulator) {
try {
- return HiveInspectors.toFlinkObject(finalResult, finalEvaluator.terminate(accumulator));
+ return HiveInspectors.toFlinkObject(finalResultObjectInspector, finalEvaluator.terminate(accumulator));
} catch (HiveException e) {
throw new FlinkHiveUDFException(
String.format("Failed to get final result on %s", hiveFunctionWrapper.getClassName()), e);
@@ -191,21 +192,12 @@ public class HiveGenericUDAF
@Override
public DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes) {
try {
- ObjectInspector[] inputs = HiveInspectors.toInspectors(constantArguments, argTypes);
- GenericUDAFEvaluator evaluator = createEvaluator(inputs);
-
- // The ObjectInspector for the parameters:
- // In PARTIAL1 mode, the parameters are original data;
- // In FINAL mode, the parameters are just partial aggregations
- // (in that case, the array will always have a single element).
-
- ObjectInspector partialObjectInspector = evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputs);
-
- ObjectInspector finalObjectInspector = evaluator.init(
- GenericUDAFEvaluator.Mode.FINAL,
- new ObjectInspector[]{ partialObjectInspector });
+ if (!initialized) {
+ setArgumentTypesAndConstants(constantArguments, argTypes);
+ init();
+ }
- return HiveTypeUtil.toFlinkType(finalObjectInspector);
+ return HiveTypeUtil.toFlinkType(finalResultObjectInspector);
} catch (Exception e) {
throw new FlinkHiveUDFException(
String.format("Failed to get Hive result type from %s", hiveFunctionWrapper.getClassName()), e);
@@ -217,4 +209,19 @@ public class HiveGenericUDAF
return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
getHiveResultType(this.constantArguments, this.argTypes));
}
+
+ @Override
+ public TypeInformation getAccumulatorType() {
+ try {
+ if (!initialized) {
+ init();
+ }
+
+ return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+ HiveTypeUtil.toFlinkType(partialResultObjectInspector));
+ } catch (Exception e) {
+ throw new FlinkHiveUDFException(
+ String.format("Failed to get Hive accumulator type from %s", hiveFunctionWrapper.getClassName()), e);
+ }
+ }
}