You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/23 04:43:45 UTC
[flink] 02/02: [FLINK-28451][hive] Borrow Kryo from SerializationUtilities in HiveFunctionWrapper
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9a4fd227a5824e707f4dbe52773a260fbb89854a
Author: Jark Wu <ja...@apache.org>
AuthorDate: Fri Jul 22 18:43:23 2022 +0800
[FLINK-28451][hive] Borrow Kryo from SerializationUtilities in HiveFunctionWrapper
This closes #20211
---
.../factories/HiveFunctionDefinitionFactory.java | 4 +-
.../table/functions/hive/HiveFunctionWrapper.java | 56 ++++++++++++++--------
.../apache/flink/table/module/hive/HiveModule.java | 2 +-
.../client/gateway/context/ExecutionContext.java | 2 -
4 files changed, 40 insertions(+), 24 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
index 62458186f60..d3cb9f39763 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
@@ -71,9 +71,9 @@ public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory
}
/**
- * Distinguish if the function is a generic function.
+ * Distinguish if the function is a Flink function.
*
- * @return whether the function is a generic function
+ * @return whether the function is a Flink function
*/
private boolean isFlinkFunction(CatalogFunction catalogFunction, ClassLoader classLoader) {
if (catalogFunction.getFunctionLanguage() == FunctionLanguage.PYTHON) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
index eebd1e2153f..9c2d8053018 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
@@ -30,6 +30,9 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
+import static org.apache.hadoop.hive.ql.exec.SerializationUtilities.borrowKryo;
+import static org.apache.hadoop.hive.ql.exec.SerializationUtilities.releaseKryo;
+
/**
* A wrapper of Hive functions that instantiate function instances and ser/de function instance
* cross process boundary.
@@ -50,6 +53,7 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
private transient UDFType instance = null;
+ @SuppressWarnings("unchecked")
public HiveFunctionWrapper(Class<?> functionClz) {
this.functionClz = (Class<UDFType>) functionClz;
}
@@ -75,6 +79,38 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
this.udfSerializedBytes = serializeObjectToKryo((Serializable) serializableInstance);
}
+ private static byte[] serializeObjectToKryo(Serializable object) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Output output = new Output(baos);
+ Kryo kryo = borrowKryo();
+ try {
+ kryo.writeObject(output, object);
+ } finally {
+ releaseKryo(kryo);
+ }
+ output.close();
+ return baos.toByteArray();
+ }
+
+ private static <T extends Serializable> T deserializeObjectFromKryo(
+ byte[] bytes, Class<T> clazz) {
+ Input inp = new Input(new ByteArrayInputStream(bytes));
+ Kryo kryo = borrowKryo();
+ ClassLoader oldClassLoader = kryo.getClassLoader();
+ kryo.setClassLoader(clazz.getClassLoader());
+ T func;
+
+ try {
+ func = kryo.readObject(inp, clazz);
+ } finally {
+ kryo.setClassLoader(oldClassLoader);
+ releaseKryo(kryo);
+ }
+
+ inp.close();
+ return func;
+ }
+
/**
* Instantiate a Hive function instance.
*
@@ -124,27 +160,9 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
*
* @return the UDF deserialized
*/
+ @SuppressWarnings("unchecked")
private UDFType deserializeUDF() {
return (UDFType)
deserializeObjectFromKryo(udfSerializedBytes, (Class<Serializable>) getUDFClass());
}
-
- private static byte[] serializeObjectToKryo(Serializable object) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- Output output = new Output(baos);
- Kryo kryo = new Kryo();
- kryo.writeObject(output, object);
- output.close();
- return baos.toByteArray();
- }
-
- private static <T extends Serializable> T deserializeObjectFromKryo(
- byte[] bytes, Class<T> clazz) {
- Input inp = new Input(new ByteArrayInputStream(bytes));
- Kryo kryo = new Kryo();
- kryo.setClassLoader(clazz.getClassLoader());
- T func = kryo.readObject(inp, clazz);
- inp.close();
- return func;
- }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index 562e053f1c8..f798a8a4543 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -140,7 +140,7 @@ public class HiveModule implements Module {
if (name.equalsIgnoreCase("internal_interval")) {
return Optional.of(
factory.createFunctionDefinitionFromHiveFunction(
- name, HiveGenericUDFInternalInterval.class.getName()));
+ name, HiveGenericUDFInternalInterval.class.getName(), context));
}
Optional<FunctionInfo> info = hiveShim.getBuiltInFunctionInfo(name);
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index 67b796de25b..c2bf0b1d725 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -36,10 +36,8 @@ import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.util.MutableURLClassLoader;
-import org.apache.flink.util.TemporaryClassLoaderContext;
import java.lang.reflect.Method;
-import java.util.function.Supplier;
import static org.apache.flink.table.client.gateway.context.SessionContext.SessionState;