You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/08 16:56:52 UTC
[flink] branch master updated: [FLINK-13110][hive] add shim of
SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d561a6f [FLINK-13110][hive] add shim of SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4
d561a6f is described below
commit d561a6f8ef7c494c575fbe179f41614a7d73ca43
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Jul 5 17:30:17 2019 +0800
[FLINK-13110][hive] add shim of SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4
This PR adds shim of SimpleGenericUDAFParameterInfo for Hive 1.2.1 and 2.3.4.
This closes #9001.
---
.../apache/flink/table/catalog/hive/client/HiveShim.java | 8 ++++++++
.../flink/table/catalog/hive/client/HiveShimV1.java | 14 ++++++++++++++
.../flink/table/catalog/hive/client/HiveShimV2.java | 14 ++++++++++++++
.../flink/table/functions/hive/HiveGenericUDAF.java | 15 ++++++++++-----
.../table/functions/hive/conversion/HiveInspectors.java | 4 ++++
.../flink/table/functions/hive/HiveGenericUDAFTest.java | 3 ++-
6 files changed, 52 insertions(+), 6 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 320f078..812ece1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.thrift.TException;
import java.io.IOException;
@@ -97,4 +99,10 @@ public interface HiveShim {
*/
void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table)
throws InvalidOperationException, MetaException, TException;
+
+ /**
+ * Creates SimpleGenericUDAFParameterInfo.
+ */
+ SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing,
+ boolean distinct, boolean allColumns);
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
index 830d6e6..8733100 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java
@@ -34,9 +34,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.thrift.TException;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -105,4 +108,15 @@ public class HiveShimV1 implements HiveShim {
table.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, "true");
client.alter_table(databaseName, tableName, table);
}
+
+ @Override
+ public SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) {
+ try {
+ Constructor constructor = SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class,
+ boolean.class, boolean.class);
+ return (SimpleGenericUDAFParameterInfo) constructor.newInstance(params, distinct, allColumns);
+ } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ throw new CatalogException("Failed to create SimpleGenericUDAFParameterInfo", e);
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
index 7df0aaf..2510497 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java
@@ -34,9 +34,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.thrift.TException;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
@@ -95,4 +98,15 @@ public class HiveShimV2 implements HiveShim {
// For Hive-2.3.4, we don't need to tell HMS not to update stats.
client.alter_table(databaseName, tableName, table);
}
+
+ @Override
+ public SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) {
+ try {
+ Constructor constructor = SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class,
+ boolean.class, boolean.class, boolean.class);
+ return (SimpleGenericUDAFParameterInfo) constructor.newInstance(params, isWindowing, distinct, allColumns);
+ } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ throw new CatalogException("Failed to create SimpleGenericUDAFParameterInfo", e);
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index 163fc06..5782455 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.functions.hive;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionContext;
@@ -35,7 +37,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
-import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.util.Arrays;
@@ -62,13 +63,16 @@ public class HiveGenericUDAF
private transient boolean allIdentityConverter;
private transient boolean initialized;
- public HiveGenericUDAF(HiveFunctionWrapper funcWrapper) {
- this(funcWrapper, false);
+ private final String hiveVersion;
+
+ public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, String hiveVersion) {
+ this(funcWrapper, false, hiveVersion);
}
- public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, boolean isUDAFBridgeRequired) {
+ public HiveGenericUDAF(HiveFunctionWrapper funcWrapper, boolean isUDAFBridgeRequired, String hiveVersion) {
this.hiveFunctionWrapper = funcWrapper;
this.isUDAFBridgeRequired = isUDAFBridgeRequired;
+ this.hiveVersion = hiveVersion;
}
@Override
@@ -112,8 +116,9 @@ public class HiveGenericUDAF
resolver = (GenericUDAFResolver2) hiveFunctionWrapper.createFunction();
}
+ HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
return resolver.getEvaluator(
- new SimpleGenericUDAFParameterInfo(
+ hiveShim.createUDAFParameterInfo(
inputInspectors,
// The flag to indicate if the UDAF invocation was from the windowing function call or not.
// TODO: investigate whether this has impact on Flink streaming job with windows
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 0f9dd67..85ca033 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -338,6 +338,10 @@ public class HiveInspectors {
List<? extends StructField> fields = structInspector.getAllStructFieldRefs();
Row row = new Row(fields.size());
+ // StandardStructObjectInspector.getStructFieldData in Hive-1.2.1 only accepts array or list as data
+ if (!data.getClass().isArray() && !(data instanceof List)) {
+ data = new Object[]{data};
+ }
for (int i = 0; i < row.getArity(); i++) {
row.setField(
i,
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
index df09107..2945523 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.functions.hive;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.types.DataType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
@@ -129,7 +130,7 @@ public class HiveGenericUDAFTest {
private static HiveGenericUDAF init(Class hiveUdfClass, Object[] constantArgs, DataType[] argTypes) throws Exception {
HiveFunctionWrapper<GenericUDAFResolver2> wrapper = new HiveFunctionWrapper(hiveUdfClass.getName());
- HiveGenericUDAF udf = new HiveGenericUDAF(wrapper);
+ HiveGenericUDAF udf = new HiveGenericUDAF(wrapper, HiveShimLoader.getHiveVersion());
udf.setArgumentTypesAndConstants(constantArgs, argTypes);
udf.getHiveResultType(constantArgs, argTypes);