You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/20 09:12:12 UTC

[flink] branch master updated: [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3994788892f [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect
3994788892f is described below

commit 3994788892fc761cf0c2fd09f362d4dab8f14c61
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Wed Sep 7 14:38:04 2022 +0800

    [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect
    
    This closes #20776
---
 .../table/planner/delegation/hive/HiveParser.java  |  2 +-
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  |  9 +++++++--
 .../flink/connectors/hive/HiveDialectITCase.java   | 22 +++++++++++++++-------
 .../flink/table/catalog/FunctionCatalog.java       |  2 +-
 4 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 713ee146a7a..0dd5613a8e3 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -330,7 +330,7 @@ public class HiveParser extends ParserImpl {
                                 dmlHelper,
                                 frameworkConfig,
                                 plannerContext.getCluster(),
-                                plannerContext.getFlinkContext().getClassLoader());
+                                plannerContext.getFlinkContext());
                 return Collections.singletonList(ddlAnalyzer.convertToOperation(node));
             } else {
                 return processQuery(context, hiveConf, hiveShim, node);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 6730d5cde76..d104a1de0f3 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -43,6 +43,7 @@ import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.CatalogViewImpl;
 import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -88,6 +89,7 @@ import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
 import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkContext;
 import org.apache.flink.table.planner.delegation.hive.HiveParser;
 import org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner;
 import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
@@ -204,6 +206,7 @@ public class HiveParserDDLSemanticAnalyzer {
     private final FrameworkConfig frameworkConfig;
     private final RelOptCluster cluster;
     private final ClassLoader classLoader;
+    private final FunctionCatalog functionCatalog;
 
     static {
         TokenToTypeName.put(HiveASTParser.TOK_BOOLEAN, serdeConstants.BOOLEAN_TYPE_NAME);
@@ -267,7 +270,7 @@ public class HiveParserDDLSemanticAnalyzer {
             HiveParserDMLHelper dmlHelper,
             FrameworkConfig frameworkConfig,
             RelOptCluster cluster,
-            ClassLoader classLoader)
+            FlinkContext flinkContext)
             throws SemanticException {
         this.queryState = queryState;
         this.conf = queryState.getConf();
@@ -281,7 +284,8 @@ public class HiveParserDDLSemanticAnalyzer {
         this.dmlHelper = dmlHelper;
         this.frameworkConfig = frameworkConfig;
         this.cluster = cluster;
-        this.classLoader = classLoader;
+        this.classLoader = flinkContext.getClassLoader();
+        this.functionCatalog = flinkContext.getFunctionCatalog();
         reservedPartitionValues = new HashSet<>();
         // Partition can't have this name
         reservedPartitionValues.add(HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME));
@@ -530,6 +534,7 @@ public class HiveParserDDLSemanticAnalyzer {
         List<ResourceUri> resources = getResourceList(ast);
 
         if (isTemporaryFunction) {
+            functionCatalog.registerFunctionJarResources(functionName, resources);
             FunctionDefinition funcDefinition =
                     funcDefFactory.createFunctionDefinition(
                             functionName,
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index aed6242c9f6..1ae21e8f27a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -745,32 +745,40 @@ public class HiveDialectITCase {
     public void testCreateFunctionUsingJar() throws Exception {
         tableEnv.executeSql("create table src(x int)");
         tableEnv.executeSql("insert into src values (1), (2)").await();
-        String udfClass = "addOne";
-        String udfCode =
-                "public class "
-                        + udfClass
+        String udfCodeTemplate =
+                "public class %s"
                         + " extends org.apache.hadoop.hive.ql.exec.UDF {\n"
                         + " public int evaluate(int content) {\n"
                         + "    return content + 1;\n"
                         + " }"
                         + "}\n";
+        String udfClass = "addOne";
+        String udfCode = String.format(udfCodeTemplate, udfClass);
         File jarFile =
                 UserClassLoaderJarTestUtils.createJarFile(
                         tempFolder.newFolder("test-jar"), "test-udf.jar", udfClass, udfCode);
         // test create function using jar
         tableEnv.executeSql(
                 String.format(
-                        "create function add_one as 'addOne' using jar '%s'", jarFile.getPath()));
+                        "create function add_one as '%s' using jar '%s'",
+                        udfClass, jarFile.getPath()));
         assertThat(
                         CollectionUtil.iteratorToList(
                                         tableEnv.executeSql("select add_one(x) from src").collect())
                                 .toString())
                 .isEqualTo("[+I[2], +I[3]]");
+
         // test create temporary function using jar
+        // create a new jarfile with a new class name
+        udfClass = "addOne1";
+        udfCode = String.format(udfCodeTemplate, udfClass);
+        jarFile =
+                UserClassLoaderJarTestUtils.createJarFile(
+                        tempFolder.newFolder("test-jar-1"), "test-udf-1.jar", udfClass, udfCode);
         tableEnv.executeSql(
                 String.format(
-                        "create temporary function t_add_one as 'addOne' using jar '%s'",
-                        jarFile.getPath()));
+                        "create temporary function t_add_one as '%s' using jar '%s'",
+                        udfClass, jarFile.getPath()));
         assertThat(
                         CollectionUtil.iteratorToList(
                                         tableEnv.executeSql("select t_add_one(x) from src")
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index bb26c373ac2..e78cfd0277f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -684,7 +684,7 @@ public final class FunctionCatalog {
                 function);
     }
 
-    private void registerFunctionJarResources(String functionName, List<ResourceUri> resourceUris) {
+    public void registerFunctionJarResources(String functionName, List<ResourceUri> resourceUris) {
         try {
             if (!resourceUris.isEmpty()) {
                 resourceManager.registerJarResources(resourceUris);