You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/21 01:46:48 UTC
[flink] 01/04: [FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 82ab2918e992f747043dbe49d900b36fe28df282
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Wed Sep 7 14:38:04 2022 +0800
[FLINK-29185][hive] Fix ClassNotFoundException for CREATE TEMPORARY FUNCTION USING JAR with Hive dialect
This closes #20776
---
.../table/planner/delegation/hive/HiveParser.java | 2 +-
.../hive/parse/HiveParserDDLSemanticAnalyzer.java | 9 +++++++--
.../flink/connectors/hive/HiveDialectITCase.java | 22 +++++++++++++++-------
.../flink/table/catalog/FunctionCatalog.java | 2 +-
4 files changed, 24 insertions(+), 11 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 713ee146a7a..0dd5613a8e3 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -330,7 +330,7 @@ public class HiveParser extends ParserImpl {
dmlHelper,
frameworkConfig,
plannerContext.getCluster(),
- plannerContext.getFlinkContext().getClassLoader());
+ plannerContext.getFlinkContext());
return Collections.singletonList(ddlAnalyzer.convertToOperation(node));
} else {
return processQuery(context, hiveConf, hiveShim, node);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 6730d5cde76..d104a1de0f3 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -43,6 +43,7 @@ import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
@@ -88,6 +89,7 @@ import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
+import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.delegation.hive.HiveParser;
import org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner;
import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
@@ -204,6 +206,7 @@ public class HiveParserDDLSemanticAnalyzer {
private final FrameworkConfig frameworkConfig;
private final RelOptCluster cluster;
private final ClassLoader classLoader;
+ private final FunctionCatalog functionCatalog;
static {
TokenToTypeName.put(HiveASTParser.TOK_BOOLEAN, serdeConstants.BOOLEAN_TYPE_NAME);
@@ -267,7 +270,7 @@ public class HiveParserDDLSemanticAnalyzer {
HiveParserDMLHelper dmlHelper,
FrameworkConfig frameworkConfig,
RelOptCluster cluster,
- ClassLoader classLoader)
+ FlinkContext flinkContext)
throws SemanticException {
this.queryState = queryState;
this.conf = queryState.getConf();
@@ -281,7 +284,8 @@ public class HiveParserDDLSemanticAnalyzer {
this.dmlHelper = dmlHelper;
this.frameworkConfig = frameworkConfig;
this.cluster = cluster;
- this.classLoader = classLoader;
+ this.classLoader = flinkContext.getClassLoader();
+ this.functionCatalog = flinkContext.getFunctionCatalog();
reservedPartitionValues = new HashSet<>();
// Partition can't have this name
reservedPartitionValues.add(HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME));
@@ -530,6 +534,7 @@ public class HiveParserDDLSemanticAnalyzer {
List<ResourceUri> resources = getResourceList(ast);
if (isTemporaryFunction) {
+ functionCatalog.registerFunctionJarResources(functionName, resources);
FunctionDefinition funcDefinition =
funcDefFactory.createFunctionDefinition(
functionName,
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index aed6242c9f6..1ae21e8f27a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -745,32 +745,40 @@ public class HiveDialectITCase {
public void testCreateFunctionUsingJar() throws Exception {
tableEnv.executeSql("create table src(x int)");
tableEnv.executeSql("insert into src values (1), (2)").await();
- String udfClass = "addOne";
- String udfCode =
- "public class "
- + udfClass
+ String udfCodeTemplate =
+ "public class %s"
+ " extends org.apache.hadoop.hive.ql.exec.UDF {\n"
+ " public int evaluate(int content) {\n"
+ " return content + 1;\n"
+ " }"
+ "}\n";
+ String udfClass = "addOne";
+ String udfCode = String.format(udfCodeTemplate, udfClass);
File jarFile =
UserClassLoaderJarTestUtils.createJarFile(
tempFolder.newFolder("test-jar"), "test-udf.jar", udfClass, udfCode);
// test create function using jar
tableEnv.executeSql(
String.format(
- "create function add_one as 'addOne' using jar '%s'", jarFile.getPath()));
+ "create function add_one as '%s' using jar '%s'",
+ udfClass, jarFile.getPath()));
assertThat(
CollectionUtil.iteratorToList(
tableEnv.executeSql("select add_one(x) from src").collect())
.toString())
.isEqualTo("[+I[2], +I[3]]");
+
// test create temporary function using jar
+ // create a new jarfile with a new class name
+ udfClass = "addOne1";
+ udfCode = String.format(udfCodeTemplate, udfClass);
+ jarFile =
+ UserClassLoaderJarTestUtils.createJarFile(
+ tempFolder.newFolder("test-jar-1"), "test-udf-1.jar", udfClass, udfCode);
tableEnv.executeSql(
String.format(
- "create temporary function t_add_one as 'addOne' using jar '%s'",
- jarFile.getPath()));
+ "create temporary function t_add_one as '%s' using jar '%s'",
+ udfClass, jarFile.getPath()));
assertThat(
CollectionUtil.iteratorToList(
tableEnv.executeSql("select t_add_one(x) from src")
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 50e51a6ce85..bfe73407b62 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -700,7 +700,7 @@ public final class FunctionCatalog {
function);
}
- private void registerFunctionJarResources(String functionName, List<ResourceUri> resourceUris) {
+ public void registerFunctionJarResources(String functionName, List<ResourceUri> resourceUris) {
try {
if (!resourceUris.isEmpty()) {
resourceManager.registerJarResources(resourceUris);