You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/06 06:29:02 UTC

[GitHub] [flink] HuangZhenQiu opened a new pull request, #20176: [FLINK-27660][table] add table api for registering function with resource

HuangZhenQiu opened a new pull request, #20176:
URL: https://github.com/apache/flink/pull/20176

   ## What is the purpose of the change
   Add table api for registering function with remote resource, so that we the advance function creation can be supported in DDL.
   
   
   ## Brief change log
   1. Change on Table API with new create function with resources
   2. Add test case in TableEnvironmentImplTest
   
   
   ## Verifying this change
   
   The PR is tested in the new test function in TableEnvironmentImplTest.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1190073959

   @HuangZhenQiu The previous PR is already merge into master branch, you can continue this work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1193303873

   > cc @lsyldliu , could you help to review this?
   
   OKay


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20176:
URL: https://github.com/apache/flink/pull/20176#discussion_r952493941


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java:
##########
@@ -125,6 +128,27 @@ public void testUserDefinedTemporarySystemFunctionByUsingJar() throws Exception
         testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
     }
 
+    @Test
+    public void testCreateTemporarySystemFunctionWithTableAPI() {

Review Comment:
   According to the related discussion in https://github.com/apache/flink/pull/20635, please delete these tests in batch mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20176:
URL: https://github.com/apache/flink/pull/20176#discussion_r934454226


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -706,6 +714,40 @@ class TableEnvironmentTest {
     assertFalse(tableEnv.listUserDefinedFunctions().contains("f2"))
   }
 
+  @Test
+  def testExecuteSqlWithCreateFunctionWithResource(): Unit = {

Review Comment:
   I think this test is no need, it has been covered by tests in `FunctionITCase`.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -690,6 +684,50 @@ private void registerFunctionJarResources(String functionName, List<ResourceUri>
         }
     }
 
+    private void registerCatalogFunction(
+            ObjectIdentifier identifier, CatalogFunction catalogFunction, boolean ignoreIfExists) {
+        final ObjectIdentifier normalizedIdentifier =
+                FunctionIdentifier.normalizeObjectIdentifier(identifier);
+
+        final Catalog catalog =
+                catalogManager
+                        .getCatalog(normalizedIdentifier.getCatalogName())
+                        .orElseThrow(IllegalStateException::new);

Review Comment:
    throw new ValidationException(String.format("Catalog %s not exists", catalogName));



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -237,6 +239,16 @@ public void testCreateTemporarySystemFunctionByUsingJar() {
         assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f10");
     }
 
+    @Test
+    public void testCreateTemporarySystemFunctionWithTableAPI() {

Review Comment:
   Please refer to the `testUserDefinedTemporarySystemFunctionByUsingJar`, also add some tests that use UDF registered by table api.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -198,45 +207,30 @@ public void registerCatalogFunction(
                     t);
         }
 
-        final Catalog catalog =
-                catalogManager
-                        .getCatalog(normalizedIdentifier.getCatalogName())
-                        .orElseThrow(IllegalStateException::new);
-        final ObjectPath path = identifier.toObjectPath();
-
-        // we force users to deal with temporary catalog functions first
-        if (tempCatalogFunctions.containsKey(normalizedIdentifier)) {
-            if (ignoreIfExists) {
-                return;
-            }
-            throw new ValidationException(
-                    String.format(
-                            "Could not register catalog function. A temporary function '%s' does already exist. "
-                                    + "Please drop the temporary function first.",
-                            identifier.asSummaryString()));
-        }
+        registerCatalogFunction(identifier, catalogFunction, ignoreIfExists);
+    }
 
-        if (catalog.functionExists(path)) {
-            if (ignoreIfExists) {
-                return;
-            }
-            throw new ValidationException(
-                    String.format(
-                            "Could not register catalog function. A function '%s' does already exist.",
-                            identifier.asSummaryString()));
-        }
+    public void registerCatalogFunction(
+            UnresolvedIdentifier unresolvedIdentifier,
+            String className,
+            List<ResourceUri> resourceUris,
+            boolean ignoreIfExists) {
 
+        final ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
         final CatalogFunction catalogFunction =
-                new CatalogFunctionImpl(functionClass.getName(), FunctionLanguage.JAVA);
+                new CatalogFunctionImpl(className, FunctionLanguage.JAVA, resourceUris);
+
         try {
-            catalog.createFunction(path, catalogFunction, ignoreIfExists);
+            validateAndPrepareFunction(className, catalogFunction);

Review Comment:
   I think we shouldn't validate wether the class is can be load here, it is just a metadata operation for `CatalogFunction`, we will loaded the class when query use the UDF, this is consistent with the behavior in https://github.com/apache/flink/blob/0bc50fffe4a331f45b72926c83e778b7ed7fc2e6/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L1729



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1221255509

   > @lsyldliu , could you help to continue review this PR when you are free?
   
   LGTM overall just left one comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20176:
URL: https://github.com/apache/flink/pull/20176#discussion_r959135546


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java:
##########
@@ -92,10 +97,48 @@ public void testUserDefinedTemporarySystemFunctionByUsingJar() throws Exception
                         udfClassName, jarPath);
 
         String dropFunctionDDL = "drop temporary system function lowerUdf";
-        testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+        testUserDefinedFunctionByUsingJar(env -> env.executeSql(functionDDL), dropFunctionDDL);
     }
 
-    private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, String dropFunctionDDL)
+    @Test
+    public void testUserDefinedRegularCatalogFunctionByUsingJar() throws Exception {

Review Comment:
   These tests in master have been deleted, please rebase master again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20176:
URL: https://github.com/apache/flink/pull/20176#discussion_r964657510


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -516,13 +518,51 @@ void createFunction(
             boolean ignoreIfExists);
 
     /**
-     * Drops a catalog function registered in the given path.
+     * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
+     * specific class name and user defined resource uri.
      *
-     * @param path The path under which the function has been registered. See also the {@link
+     * <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
+     * defined function by only provide a full path class name and and available resource list in
+     * which each url may be local or remote. User doesn't need to initialize the function instance

Review Comment:
   How about explaining the meaning of "resource"? 
   
   ```suggestion
        * <p>Compared to {@link #createFunction(String, Class)}, this method allows registering a user
        * defined function by only providing a full path class name and a list of resources that contain the
        * implementation of the function along with its dependencies. Users don't need to initialize the 
        * function instance in advance. The resource file can be a local or remote JAR file.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangZhenQiu commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
HuangZhenQiu commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1177020994

   @lsyldliu Sure. I will rebase later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangZhenQiu commented on a diff in pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
HuangZhenQiu commented on code in PR #20176:
URL: https://github.com/apache/flink/pull/20176#discussion_r954573757


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java:
##########
@@ -125,6 +128,27 @@ public void testUserDefinedTemporarySystemFunctionByUsingJar() throws Exception
         testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
     }
 
+    @Test
+    public void testCreateTemporarySystemFunctionWithTableAPI() {

Review Comment:
   Make sense. I removed table api related test cases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangZhenQiu commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
HuangZhenQiu commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1190455535

   @lsyldliu Sure. I will work on it tonight


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1175838876

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4f6752d9319d76d9b3f7a3af448fccfa3c8c83bc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4f6752d9319d76d9b3f7a3af448fccfa3c8c83bc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4f6752d9319d76d9b3f7a3af448fccfa3c8c83bc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong merged pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
wuchong merged PR #20176:
URL: https://github.com/apache/flink/pull/20176


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20176:
URL: https://github.com/apache/flink/pull/20176#discussion_r950666316


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -555,6 +621,46 @@ private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, String
         tEnv().executeSql(dropFunctionDDL);
     }
 
+    private void testUserDefinedFunctionByUsingJar(FunctionCreator creator, String dropFunctionDDL)

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java:
##########
@@ -199,4 +265,48 @@ private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, String
         // delete the function
         tEnv().executeSql(dropFunctionDDL);
     }
+
+    private void testUserDefinedFunctionByUsingJar(FunctionCreator creator, String dropFunctionDDL)

Review Comment:
   I think we can delete `testUserDefinedFunctionByUsingJar(String createFunctionDDL, String dropFunctionDDL)` method, all related test use this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
wuchong commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1192377693

   cc @lsyldliu , could you help to review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangZhenQiu commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
HuangZhenQiu commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1200702958

   @lsyldliu Thanks for the review. Revised the code accordingly Would you please take a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1232424548

   @HuangZhenQiu Regarding the failed test, please rebase master branch again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20176:
URL: https://github.com/apache/flink/pull/20176#discussion_r930660736


##########
flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java:
##########
@@ -104,6 +105,32 @@ public static MutableURLClassLoader create(
         }
     }
 
+    public static MutableURLClassLoader buildUserCodeClassLoader(

Review Comment:
   We don't need to introduce this method, please reuse `public static MutableURLClassLoader create(
               final URL[] urls, final ClassLoader parent, final Configuration configuration)`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -515,6 +517,95 @@ void createFunction(
             Class<? extends UserDefinedFunction> functionClass,
             boolean ignoreIfExists);
 
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary system function by the specific
+     * class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method allow
+     * registering a user defined function by only provide a full path class name and an available
+     * resource which may be local or remote. User doesn't need to initialize the function instance
+     * in advance.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
+     * exists, it will be inaccessible in the current session. To make the permanent function
+     * available again one can drop the corresponding temporary system function.
+     *
+     * @param name The name under which the function will be registered globally.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
+     * specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
+     * defined function by only provide a full path class name and an available resource which may
+     * be local or remote. User doesn't need to initialize the function instance in advance.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createFunction(String path, String className, List<ResourceUri> resourceUris);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
+     * specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
+     * defined function by only provide a full path class name and an available resource which may
+     * be local or remote. User doesn't need to initialize the function instance in advance.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     * @param ignoreIfExists If a function exists under the given path and this flag is set, no
+     *     operation is executed. An exception is thrown otherwise.
+     */
+    void createFunction(

Review Comment:
   Adjusting this method position after `void createFunction(
               String path,
               Class<? extends UserDefinedFunction> functionClass,
               boolean ignoreIfExists);`



##########
flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java:
##########
@@ -142,7 +142,7 @@ private PackagedProgram(
                         ? Collections.emptyList()
                         : extractContainedLibraries(this.jarFile);
         this.userCodeClassLoader =
-                ClientUtils.buildUserCodeClassLoader(

Review Comment:
   Please don't change this.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -434,6 +436,33 @@ public void createFunction(
                 unresolvedIdentifier, functionClass, ignoreIfExists);
     }
 
+    @Override
+    public void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris) {
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        createTemporarySystemFunction(name, clazz);
+    }
+
+    @Override
+    public void createFunction(String path, String className, List<ResourceUri> resourceUris) {
+        createFunction(path, className, resourceUris, false);
+    }
+
+    @Override
+    public void createFunction(
+            String path, String className, List<ResourceUri> resourceUris, boolean ignoreIfExists) {
+        final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        functionCatalog.registerCatalogFunction(unresolvedIdentifier, clazz, ignoreIfExists);

Review Comment:
   ```
   functionCatalog.registerCatalogFunction(unresolvedIdentifier, className, resourceUris, ignoreIfExists);
   
   ```
   
   In `FunctionCatalog` we can introduce such methods:
   ```
       public void registerCatalogFunction(
               UnresolvedIdentifier unresolvedIdentifier,
               String className,
               List<ResourceUri> resourceUris,
               boolean ignoreIfExists) {
           final ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
           final ObjectIdentifier normalizedIdentifier =
                   FunctionIdentifier.normalizeObjectIdentifier(identifier);
           registerCatalogFunction(normalizedIdentifier, className, resourceUris, ignoreIfExists);
       }
   ```
   
   ```
   private void registerCatalogFunction(
               ObjectIdentifier normalizedIdentifier,
               String className,
               List<ResourceUri> resourceUris,
               boolean ignoreIfExists) {
           final Catalog catalog =
                   catalogManager
                           .getCatalog(normalizedIdentifier.getCatalogName())
                           .orElseThrow(IllegalStateException::new);
           final ObjectPath path = normalizedIdentifier.toObjectPath();
   
           // we force users to deal with temporary catalog functions first
           if (tempCatalogFunctions.containsKey(normalizedIdentifier)) {
               if (ignoreIfExists) {
                   return;
               }
               throw new ValidationException(
                       String.format(
                               "Could not register catalog function. A temporary function '%s' does already exist. "
                                       + "Please drop the temporary function first.",
                               normalizedIdentifier.asSummaryString()));
           }
   
           if (catalog.functionExists(path)) {
               if (ignoreIfExists) {
                   return;
               }
               throw new ValidationException(
                       String.format(
                               "Could not register catalog function. A function '%s' does already exist.",
                               normalizedIdentifier.asSummaryString()));
           }
   
           final CatalogFunction catalogFunction =
                   new CatalogFunctionImpl(className, FunctionLanguage.JAVA, resourceUris);
           try {
               catalog.createFunction(path, catalogFunction, ignoreIfExists);
           } catch (Throwable t) {
               throw new TableException(
                       String.format(
                               "Could not register catalog function '%s'.",
                               normalizedIdentifier.asSummaryString()),
                       t);
           }
       }
   ```



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -37,21 +37,32 @@ import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks
 import org.apache.flink.table.planner.utils.TableTestUtil.{replaceNodeIdInOperator, replaceStageId, replaceStreamNodeId}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
+import org.apache.flink.util.UserClassLoaderJarTestUtils
 
 import _root_.java.util
 import _root_.scala.collection.JavaConverters._
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql.SqlExplainLevel
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
-import org.junit.{Rule, Test}
+import org.junit.{ClassRule, Rule, Test}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
-import org.junit.rules.ExpectedException
+import org.junit.rules.{ExpectedException, TemporaryFolder}
+
+import scala.annotation.meta.getter
 
 class TableEnvironmentTest {
 
+  @(Rule @getter)
+  val tempFolder: TemporaryFolder = new TemporaryFolder()
+
   // used for accurate exception information checking.
   val expectedException: ExpectedException = ExpectedException.none()
 
+  val udfClass = "LowerUDF"
+
+  val udfCode =

Review Comment:
   ditto



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -434,6 +436,33 @@ public void createFunction(
                 unresolvedIdentifier, functionClass, ignoreIfExists);
     }
 
+    @Override
+    public void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris) {
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        createTemporarySystemFunction(name, clazz);
+    }
+
+    @Override
+    public void createFunction(String path, String className, List<ResourceUri> resourceUris) {
+        createFunction(path, className, resourceUris, false);
+    }
+
+    @Override
+    public void createFunction(
+            String path, String className, List<ResourceUri> resourceUris, boolean ignoreIfExists) {
+        final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        functionCatalog.registerCatalogFunction(unresolvedIdentifier, clazz, ignoreIfExists);
+    }
+
+    @Override
+    public void createTemporaryFunction(
+            String path, String className, List<ResourceUri> resourceUris) {
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        createTemporarySystemFunction(path, clazz);

Review Comment:
   ```
   functionCatalog.registerTemporaryCatalogFunction(
                   unresolvedIdentifier,
                   new CatalogFunctionImpl(className, FunctionLanguage.JAVA, resourceUris),
                   false);
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -515,6 +517,95 @@ void createFunction(
             Class<? extends UserDefinedFunction> functionClass,
             boolean ignoreIfExists);
 
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary system function by the specific
+     * class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method allow
+     * registering a user defined function by only provide a full path class name and an available
+     * resource which may be local or remote. User doesn't need to initialize the function instance
+     * in advance.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
+     * exists, it will be inaccessible in the current session. To make the permanent function
+     * available again one can drop the corresponding temporary system function.
+     *
+     * @param name The name under which the function will be registered globally.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
+     * specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
+     * defined function by only provide a full path class name and an available resource which may
+     * be local or remote. User doesn't need to initialize the function instance in advance.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createFunction(String path, String className, List<ResourceUri> resourceUris);

Review Comment:
   Adjusting this method position after `void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1760,8 +1809,13 @@ private Optional<CatalogBaseTable> getTemporaryTable(ObjectIdentifier identifier
     private TableResultInternal createCatalogFunction(
             CreateCatalogFunctionOperation createCatalogFunctionOperation) {
         String exMsg = getDDLOpExecuteErrorMsg(createCatalogFunctionOperation.asSummaryString());
+        List<ResourceUri> resourceUris =
+                createCatalogFunctionOperation.getCatalogFunction().getFunctionResources();
         try {
             if (createCatalogFunctionOperation.isTemporary()) {
+                if (!resourceUris.isEmpty()) {

Review Comment:
   We don't need to register jar resource here, it will be registered in `FunctionCatalog`, please revert these changes.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -515,6 +517,95 @@ void createFunction(
             Class<? extends UserDefinedFunction> functionClass,
             boolean ignoreIfExists);
 
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary system function by the specific
+     * class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method allow
+     * registering a user defined function by only provide a full path class name and an available
+     * resource which may be local or remote. User doesn't need to initialize the function instance
+     * in advance.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
+     * exists, it will be inaccessible in the current session. To make the permanent function
+     * available again one can drop the corresponding temporary system function.
+     *
+     * @param name The name under which the function will be registered globally.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createTemporarySystemFunction(

Review Comment:
   Adjusting this method position after `void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -515,6 +517,95 @@ void createFunction(
             Class<? extends UserDefinedFunction> functionClass,
             boolean ignoreIfExists);
 
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary system function by the specific
+     * class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method allow
+     * registering a user defined function by only provide a full path class name and an available
+     * resource which may be local or remote. User doesn't need to initialize the function instance

Review Comment:
    and available resource uri list which may be local or remote



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -434,6 +436,33 @@ public void createFunction(
                 unresolvedIdentifier, functionClass, ignoreIfExists);
     }
 
+    @Override
+    public void createTemporarySystemFunction(

Review Comment:
   I think we should not register the resource and load the class in `TableEnvironment` in advance. Due to `FunctionCatalog` is responsible for register function, so we should let `FunctionCatalog` register resource and load classes uniformly, instead of do this in multiple place. We should try best to reuse these code.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -434,6 +436,33 @@ public void createFunction(
                 unresolvedIdentifier, functionClass, ignoreIfExists);
     }
 
+    @Override
+    public void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris) {
+        final Class clazz = loadClassFromResource(className, resourceUris);
+        createTemporarySystemFunction(name, clazz);

Review Comment:
   Here I think we can call `FunctionCatalog` related method directly:
   
   ```
   functionCatalog.registerTemporarySystemFunction(name, className, FunctionLanguage.JAVA, resourceUris, false);
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -515,6 +517,95 @@ void createFunction(
             Class<? extends UserDefinedFunction> functionClass,
             boolean ignoreIfExists);
 
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary system function by the specific
+     * class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method allow
+     * registering a user defined function by only provide a full path class name and an available
+     * resource which may be local or remote. User doesn't need to initialize the function instance
+     * in advance.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
+     * exists, it will be inaccessible in the current session. To make the permanent function
+     * available again one can drop the corresponding temporary system function.
+     *
+     * @param name The name under which the function will be registered globally.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createTemporarySystemFunction(
+            String name, String className, List<ResourceUri> resourceUris);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
+     * specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
+     * defined function by only provide a full path class name and an available resource which may
+     * be local or remote. User doesn't need to initialize the function instance in advance.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     */
+    void createFunction(String path, String className, List<ResourceUri> resourceUris);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
+     * specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
+     * defined function by only provide a full path class name and an available resource which may
+     * be local or remote. User doesn't need to initialize the function instance in advance.
+     *
+     * <p>Compared to system functions with a globally defined name, catalog functions are always
+     * (implicitly or explicitly) identified by a catalog and database.
+     *
+     * <p>There must not be another function (temporary or permanent) registered under the same
+     * path.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list of udf resource uris in local or remote.
+     * @param ignoreIfExists If a function exists under the given path and this flag is set, no
+     *     operation is executed. An exception is thrown otherwise.
+     */
+    void createFunction(
+            String path, String className, List<ResourceUri> resourceUris, boolean ignoreIfExists);
+
+    /**
+     * Registers a {@link UserDefinedFunction} class as a temporary catalog function in the given
+     * path by the specific class name and user defined resource uri.
+     *
+     * <p>Compared to {@link #createTemporaryFunction(String, Class)}, this method allow registering
+     * a user defined function by only provide a full path class name and an available resource uri
+     * which may be local or remote. User doesn't need to initialize the function instance in
+     * advance.
+     *
+     * <p>Compared to {@link #createTemporarySystemFunction(String, String, List)} with a globally
+     * defined name, catalog functions are always (implicitly or explicitly) identified by a catalog
+     * and database.
+     *
+     * <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
+     * exists, it will be inaccessible in the current session. To make the permanent function
+     * available again one can drop the corresponding temporary function.
+     *
+     * @param path The path under which the function will be registered. See also the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param className The class name of UDF to be registered.
+     * @param resourceUris The list udf resource uri in local or remote.
+     */
+    void createTemporaryFunction(String path, String className, List<ResourceUri> resourceUris);

Review Comment:
   Adjusting this method position after `void createTemporaryFunction(String path, UserDefinedFunction functionInstance);`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1847,12 +1907,23 @@ private TableResultInternal dropCatalogFunction(
 
     private TableResultInternal createSystemFunction(CreateTempSystemFunctionOperation operation) {
         String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
+        List<ResourceUri> resourceUris = operation.getCatalogFunction().getFunctionResources();
+
         try {
+            if (!resourceUris.isEmpty()) {

Review Comment:
   We don't need to register jar resource here, it will be registered in `FunctionCatalog`, please see https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java#L655



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1750,6 +1779,26 @@ public void registerTableSinkInternal(String name, TableSink<?> tableSink) {
         }
     }
 
+    private Class loadClassFromResource(String className, List<ResourceUri> resourceUris) {

Review Comment:
   Revert this method.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -37,21 +37,32 @@ import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks
 import org.apache.flink.table.planner.utils.TableTestUtil.{replaceNodeIdInOperator, replaceStageId, replaceStreamNodeId}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
+import org.apache.flink.util.UserClassLoaderJarTestUtils
 
 import _root_.java.util
 import _root_.scala.collection.JavaConverters._
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql.SqlExplainLevel
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
-import org.junit.{Rule, Test}
+import org.junit.{ClassRule, Rule, Test}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
-import org.junit.rules.ExpectedException
+import org.junit.rules.{ExpectedException, TemporaryFolder}
+
+import scala.annotation.meta.getter
 
 class TableEnvironmentTest {
 
+  @(Rule @getter)
+  val tempFolder: TemporaryFolder = new TemporaryFolder()
+
   // used for accurate exception information checking.
   val expectedException: ExpectedException = ExpectedException.none()
 
+  val udfClass = "LowerUDF"

Review Comment:
   Please reuse related code in `UserDefinedFunctions`.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -706,6 +717,40 @@ class TableEnvironmentTest {
     assertFalse(tableEnv.listUserDefinedFunctions().contains("f2"))
   }
 
+  @Test
+  def testExecuteSqlWithCreateFunctionWithResource(): Unit = {
+    val userJar = UserClassLoaderJarTestUtils.createJarFile(
+      tempFolder.newFolder("test-jar"),
+      "test-classloader.jar",
+      udfClass,
+      udfCode)
+
+    val tableResult1 = tableEnv.executeSql(

Review Comment:
   I think this test is not correct, the aim of this pr is to support register function by table api, so we should test register function by table api instead of sql. You should add tests in `FunctionITCase`, and test function work in streaming and batch mode, please refer to related test in `FunctionITCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1177006660

   @HuangZhenQiu After https://github.com/apache/flink/pull/20001 merge, you can rebase master, and I will review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangZhenQiu commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
HuangZhenQiu commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1179171635

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on pull request #20176: [FLINK-27660][table] add table api for registering function with resource

Posted by GitBox <gi...@apache.org>.
wuchong commented on PR #20176:
URL: https://github.com/apache/flink/pull/20176#issuecomment-1217386365

   @lsyldliu , could you help to continue review this PR when you are free? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org