You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/23 04:43:45 UTC

[flink] 02/02: [FLINK-28451][hive] Borrow Kryo from SerializationUtilities in HiveFunctionWrapper

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a4fd227a5824e707f4dbe52773a260fbb89854a
Author: Jark Wu <ja...@apache.org>
AuthorDate: Fri Jul 22 18:43:23 2022 +0800

    [FLINK-28451][hive] Borrow Kryo from SerializationUtilities in HiveFunctionWrapper
    
    This closes #20211
---
 .../factories/HiveFunctionDefinitionFactory.java   |  4 +-
 .../table/functions/hive/HiveFunctionWrapper.java  | 56 ++++++++++++++--------
 .../apache/flink/table/module/hive/HiveModule.java |  2 +-
 .../client/gateway/context/ExecutionContext.java   |  2 -
 4 files changed, 40 insertions(+), 24 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
index 62458186f60..d3cb9f39763 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
@@ -71,9 +71,9 @@ public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory
     }
 
     /**
-     * Distinguish if the function is a generic function.
+     * Distinguish if the function is a Flink function.
      *
-     * @return whether the function is a generic function
+     * @return whether the function is a Flink function
      */
     private boolean isFlinkFunction(CatalogFunction catalogFunction, ClassLoader classLoader) {
         if (catalogFunction.getFunctionLanguage() == FunctionLanguage.PYTHON) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
index eebd1e2153f..9c2d8053018 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
@@ -30,6 +30,9 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Serializable;
 
+import static org.apache.hadoop.hive.ql.exec.SerializationUtilities.borrowKryo;
+import static org.apache.hadoop.hive.ql.exec.SerializationUtilities.releaseKryo;
+
 /**
  * A wrapper of Hive functions that instantiate function instances and ser/de function instance
  * cross process boundary.
@@ -50,6 +53,7 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
 
     private transient UDFType instance = null;
 
+    @SuppressWarnings("unchecked")
     public HiveFunctionWrapper(Class<?> functionClz) {
         this.functionClz = (Class<UDFType>) functionClz;
     }
@@ -75,6 +79,38 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
         this.udfSerializedBytes = serializeObjectToKryo((Serializable) serializableInstance);
     }
 
+    private static byte[] serializeObjectToKryo(Serializable object) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        Output output = new Output(baos);
+        Kryo kryo = borrowKryo();
+        try {
+            kryo.writeObject(output, object);
+        } finally {
+            releaseKryo(kryo);
+        }
+        output.close();
+        return baos.toByteArray();
+    }
+
+    private static <T extends Serializable> T deserializeObjectFromKryo(
+            byte[] bytes, Class<T> clazz) {
+        Input inp = new Input(new ByteArrayInputStream(bytes));
+        Kryo kryo = borrowKryo();
+        ClassLoader oldClassLoader = kryo.getClassLoader();
+        kryo.setClassLoader(clazz.getClassLoader());
+        T func;
+
+        try {
+            func = kryo.readObject(inp, clazz);
+        } finally {
+            kryo.setClassLoader(oldClassLoader);
+            releaseKryo(kryo);
+        }
+
+        inp.close();
+        return func;
+    }
+
     /**
      * Instantiate a Hive function instance.
      *
@@ -124,27 +160,9 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
      *
      * @return the UDF deserialized
      */
+    @SuppressWarnings("unchecked")
     private UDFType deserializeUDF() {
         return (UDFType)
                 deserializeObjectFromKryo(udfSerializedBytes, (Class<Serializable>) getUDFClass());
     }
-
-    private static byte[] serializeObjectToKryo(Serializable object) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        Output output = new Output(baos);
-        Kryo kryo = new Kryo();
-        kryo.writeObject(output, object);
-        output.close();
-        return baos.toByteArray();
-    }
-
-    private static <T extends Serializable> T deserializeObjectFromKryo(
-            byte[] bytes, Class<T> clazz) {
-        Input inp = new Input(new ByteArrayInputStream(bytes));
-        Kryo kryo = new Kryo();
-        kryo.setClassLoader(clazz.getClassLoader());
-        T func = kryo.readObject(inp, clazz);
-        inp.close();
-        return func;
-    }
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index 562e053f1c8..f798a8a4543 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -140,7 +140,7 @@ public class HiveModule implements Module {
         if (name.equalsIgnoreCase("internal_interval")) {
             return Optional.of(
                     factory.createFunctionDefinitionFromHiveFunction(
-                            name, HiveGenericUDFInternalInterval.class.getName()));
+                            name, HiveGenericUDFInternalInterval.class.getName(), context));
         }
 
         Optional<FunctionInfo> info = hiveShim.getBuiltInFunctionInfo(name);
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index 67b796de25b..c2bf0b1d725 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -36,10 +36,8 @@ import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.util.MutableURLClassLoader;
-import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import java.lang.reflect.Method;
-import java.util.function.Supplier;
 
 import static org.apache.flink.table.client.gateway.context.SessionContext.SessionState;