You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/26 07:12:37 UTC

[GitHub] [flink] Myracle opened a new pull request, #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Myracle opened a new pull request, #20361:
URL: https://github.com/apache/flink/pull/20361

   ## What is the purpose of the change
   
   *Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side for more common use.*
   
   
   ## Brief change log
   
     - *Add methods addJar and listJars to the TableEnvironment.*
     - *Call the methods from SessionContext in SQL client.*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added ITCase AddRemoteJarITCase.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r937462826


##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = "add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;
+    private org.apache.hadoop.fs.Path hdPath;
+    private org.apache.hadoop.fs.FileSystem hdfs;
+
+    public AddRemoteJarITCase(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    @BeforeClass
+    public static void verifyOS() {
+        Assume.assumeTrue(
+                "HDFS cluster cannot be started on Windows without extensions.",
+                !OperatingSystem.isWindows());
+    }
+
+    @Before
+    public void createHDFS() {
+        try {
+            Configuration hdConf = new Configuration();
+
+            File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();

Review Comment:
   The subDir of current test is code path. I prefer to use target.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r939529472


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java:
##########
@@ -19,22 +19,23 @@
 package org.apache.flink.table.operations.command;
 
 import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.resource.ResourceUri;
 
 /** Operation to describe an ADD JAR statement. */
 public class AddJarOperation implements Operation {
 
-    private final String path;
+    private final ResourceUri resourceUri;

Review Comment:
   This change looks unnecessary to me. This is an AddJarOperation instead of AddResourceOperation.  The resource in ADD JAR must be a JAR resource type. 



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java:
##########
@@ -142,6 +146,23 @@ public void testUserDefinedTemporaryCatalogFunctionByUsingJar() throws Exception
         testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
     }
 
+    @Test
+    public void testUsingAddJar() throws Exception {
+        tEnv().executeSql(String.format("ADD JAR '%s'", jarPath));
+
+        TableResult tableResult = tEnv().executeSql("SHOW JARS");
+        assertThat(
+                        CollectionUtil.iteratorToList(tableResult.collect())
+                                .equals(
+                                        Collections.singletonList(
+                                                Row.of(new Path(jarPath).getPath()))))
+                .isTrue();

Review Comment:
   Please use `assertThat(..).isEqualsTo(..)` for a better readable exception message when mismatch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r937401173


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java:
##########
@@ -220,6 +220,7 @@ public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOExcept
         AutoClosableProcess.create(commands.toArray(new String[0]))
                 .setStdInputs(job.getSqlLines().toArray(new String[0]))
                 .setStdoutProcessor(LOG::info) // logging the SQL statements and error message
+                .setEnv(job.getEnvProcessor())

Review Comment:
   It is needed in UsingRemoteJarITCase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20361:
URL: https://github.com/apache/flink/pull/20361#issuecomment-1195101786

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "17cb8f4798aed78ed0ba43fe094dc6c6f763d10e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "17cb8f4798aed78ed0ba43fe094dc6c6f763d10e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 17cb8f4798aed78ed0ba43fe094dc6c6f763d10e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r929988353


##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,110 @@
+package org.apache.flink.table.client.gateway.context;
+
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM;
+import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for adding remote jar. */
+public class AddRemoteJarITCase {
+    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+

Review Comment:
   We should not introduce this test here, the local jar tests has been covered by existing test. Regarding to remote jar test, it should be introduced in e2e module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong merged pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
wuchong merged PR #20361:
URL: https://github.com/apache/flink/pull/20361


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r937466999


##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = "add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;
+    private org.apache.hadoop.fs.Path hdPath;
+    private org.apache.hadoop.fs.FileSystem hdfs;
+
+    public AddRemoteJarITCase(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    @BeforeClass
+    public static void verifyOS() {
+        Assume.assumeTrue(

Review Comment:
   Please refer to https://issues.apache.org/jira/browse/FLINK-6558



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on PR #20361:
URL: https://github.com/apache/flink/pull/20361#issuecomment-1206362233

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r929988353


##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,110 @@
+package org.apache.flink.table.client.gateway.context;
+
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM;
+import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for adding remote jar. */
+public class AddRemoteJarITCase {
+    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+

Review Comment:
   We should not introduce this test, the local jar tests has been covered by existing test. Regarding to remote jar test, it should be introduced in e2e module.



##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java:
##########
@@ -156,13 +156,7 @@ public void testAddJarWithRelativePath() throws IOException {
 
     @Test
     public void testAddIllegalJar() {
-        validateAddJarWithException("/path/to/illegal.jar", "JAR file does not exist");
-    }
-
-    @Test
-    public void testAddRemoteJar() {
-        validateAddJarWithException(

Review Comment:
   This test should not be removed.



##########
flink-table/flink-sql-client/pom.xml:
##########
@@ -515,6 +522,26 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>

Review Comment:
   I think introducing hdfs dependency is a little heavy, we should test remote jar in e2e module, so I think these related test should be placed in flink-end-to-end-tests-sql module.
   
   Regarding to how to use hdfs cluster in e2e test, you can refer to the flink-end-to-end-tests-hbase module.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) {
         return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
     }
 
+    @Override
+    public void addJar(String jarPath) {

Review Comment:
   Please add UT about `ADD Jar` and `SHOW JARS` in `TableEnvironmentITCase`, add IT case about `ADD Jar` in `TableEnvironmentITCase` which you can refer to the related tests in `FunctionITCase`.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -575,6 +576,20 @@ void createFunction(
      */
     boolean dropTemporaryFunction(String path);
 
+    /**
+     * Add jar to the classLoader for use.
+     *
+     * @param jarPath The jar path to be added.
+     */
+    void addJar(String jarPath);

Review Comment:
   We should not introducing these two method, they are public api, we cannot introduce public api arbitrarily, it should be discussed in community. We should reuse the `AddJarOperation` and `ShowJarsOperation`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] beyond1920 commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r929880836


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) {
         return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
     }
 
+    @Override
+    public void addJar(String jarPath) {
+        try {
+            resourceManager.registerJarResources(

Review Comment:
   Do we need save jar into catalog?
   Or added Jar always work in session scope?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) {
         return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
     }
 
+    @Override
+    public void addJar(String jarPath) {
+        try {
+            resourceManager.registerJarResources(
+                    Collections.singletonList(new ResourceUri(ResourceType.JAR, jarPath)));
+        } catch (IOException e) {
+            throw new TableException(String.format("Could not register the specified jar [%s].", jarPath), e);
+        }
+    }
+
+    @Override
+    public List<String> listJars() {

Review Comment:
   AddJar and listJars could work for DataStream user or TableAPI user.
   How to use this function for SQL user?
   We also need to support `AddJarOperation` in `executeInternal` method, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r937322762


##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlUdfBaseITCase.java:
##########
@@ -0,0 +1,173 @@
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** Base class for sql udf. */
+@RunWith(Parameterized.class)
+public abstract class SqlUdfBaseITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(PlannerScalaFreeITCase.class);

Review Comment:
   ```suggestion
       private static final Logger LOG = LoggerFactory.getLogger(SqlITCaseBase.class);
   ```



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlUdfBaseITCase.java:
##########
@@ -0,0 +1,173 @@
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** Base class for sql udf. */
+@RunWith(Parameterized.class)
+public abstract class SqlUdfBaseITCase extends TestLogger {

Review Comment:
   This module is not only for tests sql udf, so this class name is not correct. `SqlITCaseBase` maybe more suitable.



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlUdfBaseITCase.java:
##########
@@ -0,0 +1,173 @@
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** Base class for sql udf. */
+@RunWith(Parameterized.class)
+public abstract class SqlUdfBaseITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(PlannerScalaFreeITCase.class);
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule public final FlinkResource flink;
+
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
+    private final String executionMode;
+    private final String sqlPath;
+    private final int resultSize;
+    private final String[] resultItems;
+
+    private Path result;
+
+    protected static final Path SQL_TOOL_BOX_JAR = TestUtils.getResource(".*SqlToolbox.jar");
+
+    public SqlUdfBaseITCase(
+            String executionMode,
+            String sqlPath,
+            int resultSize,
+            String[] resultItems,
+            Configuration configuration) {
+        this.executionMode = executionMode;
+        this.sqlPath = sqlPath;
+        this.resultSize = resultSize;
+        this.resultItems = resultItems;
+        this.flink =
+                new LocalStandaloneFlinkResourceFactory()
+                        .create(
+                                FlinkResourceSetup.builder()
+                                        .addConfiguration(configuration)
+                                        .build());
+    }
+
+    @Before
+    public void before() {
+        Path tmpPath = tmp.getRoot().toPath();
+        LOG.info("The current temporary path: {}", tmpPath);
+        this.result = tmpPath.resolve("result");
+    }
+
+    @Test
+    public void testSQLUdf() throws Exception {

Review Comment:
   Please move this test to `PlannerScalaFreeITCase`, it aims to test visibility of the planner's classes in distributed runtime, and don't change test method name.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -1225,6 +1228,23 @@ public void testWithBoolNotNullTypeHint() {
                 tEnv().executeSql("SELECT BoolEcho(x=1 and y is null) FROM SourceTable").collect());
     }
 
+    @Test
+    public void testUsingAddJar() throws Exception {
+        tEnv().executeSql(String.format("ADD JAR '%s'", jarPath));
+
+        TableResult tableResult = tEnv().executeSql("SHOW JARS");
+        assertThat(
+                        CollectionUtil.iteratorToList(tableResult.collect())
+                                .equals(
+                                        Collections.singletonList(
+                                                Row.of(new Path(jarPath).getPath()))))
+                .isTrue();
+
+        testUserDefinedFunctionByUsingJar(
+                String.format("create function lowerUdf as '%s' LANGUAGE JAVA", udfClassName),
+                null);

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java:
##########
@@ -142,6 +147,23 @@ public void testUserDefinedTemporaryCatalogFunctionByUsingJar() throws Exception
         testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
     }
 
+    @Test
+    public void testUsingAddJar() throws Exception {
+        tEnv().executeSql(String.format("ADD JAR '%s'", jarPath));
+
+        TableResult tableResult = tEnv().executeSql("SHOW JARS");
+        assertThat(
+                        CollectionUtil.iteratorToList(tableResult.collect())
+                                .equals(
+                                        Collections.singletonList(
+                                                Row.of(new Path(jarPath).getPath()))))
+                .isTrue();
+
+        testUserDefinedFunctionByUsingJar(
+                String.format("create function lowerUdf as '%s' LANGUAGE JAVA", udfClassName),
+                null);

Review Comment:
   why not delete the function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r937269900


##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = "add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;

Review Comment:
   Ok, I think your concern make sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] beyond1920 commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r929880836


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) {
         return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
     }
 
+    @Override
+    public void addJar(String jarPath) {
+        try {
+            resourceManager.registerJarResources(

Review Comment:
   Do we need save jar into catalog?
   Or added Jar only work in session scope?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r938372682


##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/SqlITCaseBase.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Base class for sql ITCase. */
+@RunWith(Parameterized.class)
+public abstract class SqlITCaseBase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(SqlITCaseBase.class);
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule public final FlinkResource flink;
+
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
+    private final String executionMode;
+
+    private Path result;
+
+    protected static final Path SQL_TOOL_BOX_JAR = TestUtils.getResource(".*SqlToolbox.jar");
+
+    public SqlITCaseBase(String executionMode, Configuration configuration) {
+        this.executionMode = executionMode;
+        this.flink =
+                new LocalStandaloneFlinkResourceFactory()
+                        .create(
+                                FlinkResourceSetup.builder()
+                                        .addConfiguration(configuration)
+                                        .build());
+    }
+
+    @Before
+    public void before() throws Exception {
+        Path tmpPath = tmp.getRoot().toPath();
+        LOG.info("The current temporary path: {}", tmpPath);
+        this.result = tmpPath.resolve(String.format("result-%s", UUID.randomUUID()));
+    }
+
+    public void runAndCheckSQL(
+            String sqlPath, Map<String, String> varsMap, int resultSize, List<String> resultItems)
+            throws Exception {
+        try (ClusterController clusterController = flink.startCluster(1)) {
+            List<String> sqlLines = initializeSqlLines(sqlPath, varsMap);
+
+            executeSqlStatements(clusterController, sqlLines);
+
+            // Wait until all the results flushed to the json file.
+            LOG.info("Verify the json result.");
+            checkJsonResultFile(resultSize, resultItems);
+            LOG.info("The codegen SQL client test run successfully.");

Review Comment:
   ```suggestion
               LOG.info("The SQL client test run successfully.");
   ```



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCaseBase.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** ITCase for adding remote jar. */
+public class UsingRemoteJarITCaseBase extends SqlITCaseBase {
+    private static final String SQL_PATH = "remote_jar_e2e.sql";
+    private static final int RESULT_SIZE = 1;
+    private static final List<String> RESULT_ITEMS =
+            Collections.singletonList(
+                    "{\"before\":null,\"after\":{\"id\":1,\"content\":\"Hello Flink\"},\"op\":\"c\"}");
+
+    private static final Path HADOOP_CLASSPATH = TestUtils.getResource(".*hadoop.classpath");
+
+    private MiniDFSCluster hdfsCluster;
+    private org.apache.hadoop.fs.Path hdPath;
+    private org.apache.hadoop.fs.FileSystem hdfs;
+
+    public UsingRemoteJarITCaseBase(String executionMode) {
+        super(executionMode, new org.apache.flink.configuration.Configuration());

Review Comment:
   Why here need to new a default `Configuration ` instead of reuse the configuration in parent class?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java:
##########
@@ -142,6 +147,23 @@ public void testUserDefinedTemporaryCatalogFunctionByUsingJar() throws Exception
         testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
     }
 
+    @Test
+    public void testUsingAddJar() throws Exception {
+        tEnv().executeSql(String.format("ADD JAR '%s'", jarPath));
+
+        TableResult tableResult = tEnv().executeSql("SHOW JARS");
+        assertThat(
+                        CollectionUtil.iteratorToList(tableResult.collect())
+                                .equals(
+                                        Collections.singletonList(
+                                                Row.of(new Path(jarPath).getPath()))))
+                .isTrue();
+
+        testUserDefinedFunctionByUsingJar(
+                String.format("create function lowerUdf as '%s' LANGUAGE JAVA", udfClassName),
+                null);

Review Comment:
   Please delete it, we has verified the result before drop it.



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCaseBase.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** ITCase for adding remote jar. */
+public class UsingRemoteJarITCaseBase extends SqlITCaseBase {

Review Comment:
   ```suggestion
   public class UsingRemoteJarITCase extends SqlITCaseBase {
   ```



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/PlannerScalaFreeITCaseBase.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.flink.ClusterController;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * End to End tests for table planner scala-free since 1.15. Due to scala-free of table planner
+ * introduced, the class in table planner is not visible in distribution runtime, if we use these
+ * class in execution time, ClassNotFound exception will be thrown. ITCase in table planner can not
+ * cover it, so we should add E2E test for these case.
+ */
+public class PlannerScalaFreeITCaseBase extends SqlITCaseBase {

Review Comment:
   ```suggestion
   public class PlannerScalaFreeITCase extends SqlITCaseBase {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r934269500


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java:
##########
@@ -29,10 +31,13 @@ public class SQLJobSubmission {
 
     private final List<String> sqlLines;
     private final List<String> jars;
+    private final Consumer<Map<String, String>> envProcessor;

Review Comment:
   Revert these changes?



##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java:
##########
@@ -220,6 +220,7 @@ public void submitSQLJob(SQLJobSubmission job, Duration timeout) throws IOExcept
         AutoClosableProcess.create(commands.toArray(new String[0]))
                 .setStdInputs(job.getSqlLines().toArray(new String[0]))
                 .setStdoutProcessor(LOG::info) // logging the SQL statements and error message
+                .setEnv(job.getEnvProcessor())

Review Comment:
   This change is not needed now?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -155,6 +164,23 @@ class TableEnvironmentTest {
     verifyTableEnvironmentExecutionExplain(tEnv)
   }
 
+  @Test
+  def testAddAndShowJar(): Unit = {
+    val jarPath = UserClassLoaderJarTestUtils
+      .createJarFile(
+        tempFolder.newFolder(String.format("test-jar-%s", UUID.randomUUID)),
+        "test-classloader-udf.jar",
+        GENERATED_LOWER_UDF_CLASS,
+        String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS)
+      )
+      .getPath
+
+    tableEnv.executeSql("add JAR '" + jarPath + "'")

Review Comment:
   please use String.format and use upper or lower case uniformly.



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java:
##########
@@ -274,6 +272,7 @@ public void removeJar(String jarPath) {
         classLoader.removeURL(jarURL);
     }
 
+    // TODO: Only related to removeJar in test. Remove it once it has no use.

Review Comment:
   Ditto



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = "add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;
+    private org.apache.hadoop.fs.Path hdPath;
+    private org.apache.hadoop.fs.FileSystem hdfs;
+
+    public AddRemoteJarITCase(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    @BeforeClass
+    public static void verifyOS() {
+        Assume.assumeTrue(
+                "HDFS cluster cannot be started on Windows without extensions.",
+                !OperatingSystem.isWindows());
+    }
+
+    @Before
+    public void createHDFS() {
+        try {
+            Configuration hdConf = new Configuration();
+
+            File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
+            FileUtil.fullyDelete(baseDir);
+            hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+            MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+            hdfsCluster = builder.build();
+
+            hdPath = new org.apache.hadoop.fs.Path("/test.jar");
+            hdfs = hdPath.getFileSystem(hdConf);
+
+            File localUdfJar =
+                    UserClassLoaderJarTestUtils.createJarFile(
+                            tempFolder.newFolder(String.format("test-jar-%s", UUID.randomUUID())),
+                            "test-classloader-udf.jar",
+                            GENERATED_LOWER_UDF_CLASS,
+                            String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
+            hdfs.copyFromLocalFile(new org.apache.hadoop.fs.Path(localUdfJar.toURI()), hdPath);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail("Test failed " + e.getMessage());
+        }
+
+        Path tmpPath = tempFolder.getRoot().toPath();
+        LOG.info("The current temporary path: {}", tmpPath);
+        this.result = tmpPath.resolve("result");
+    }
+
+    @After
+    public void destroyHDFS() {
+        try {
+            hdfs.delete(hdPath, false);
+            hdfsCluster.shutdown();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testAddRemoteJar() throws Exception {
+        String remoteJarPath =
+                "hdfs://"
+                        + hdfsCluster.getURI().getHost()
+                        + ":"
+                        + hdfsCluster.getNameNodePort()
+                        + "/"
+                        + hdPath;
+
+        try (ClusterController clusterController = flink.startCluster(1)) {
+            Map<String, String> varsMap = new HashMap<>();
+            varsMap.put("$RESULT", this.result.toAbsolutePath().toString());
+            varsMap.put("$MODE", this.executionMode);
+            varsMap.put("$JAR_PATH", remoteJarPath);
+
+            List<String> sqlLines = initializeSqlLines(varsMap);
+
+            executeSqlStatements(clusterController, sqlLines);
+
+            checkJsonResultFile();
+        }
+    }
+
+    private List<String> initializeSqlLines(Map<String, String> vars) throws IOException {

Review Comment:
   Here many duplicated method with `PlannerScalaFreeITCase`, so I think we can extract an base class to reuse the common method.



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = "add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;
+    private org.apache.hadoop.fs.Path hdPath;
+    private org.apache.hadoop.fs.FileSystem hdfs;
+
+    public AddRemoteJarITCase(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    @BeforeClass
+    public static void verifyOS() {
+        Assume.assumeTrue(
+                "HDFS cluster cannot be started on Windows without extensions.",
+                !OperatingSystem.isWindows());
+    }
+
+    @Before
+    public void createHDFS() {
+        try {
+            Configuration hdConf = new Configuration();
+
+            File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();

Review Comment:
   Use subDir of current test directory directly?



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = "add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;
+    private org.apache.hadoop.fs.Path hdPath;
+    private org.apache.hadoop.fs.FileSystem hdfs;
+
+    public AddRemoteJarITCase(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    @BeforeClass
+    public static void verifyOS() {
+        Assume.assumeTrue(

Review Comment:
   Why in window, hdfs doesn't work?



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = "add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;

Review Comment:
   Wether can we crate hdfs cluster like `HBaseResource` ? I think this also would be benefit to other hdfs related test.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1132,6 +1155,10 @@ public TableResultInternal executeInternal(Operation operation) {
             return dropSystemFunction((DropTempSystemFunctionOperation) operation);
         } else if (operation instanceof AlterCatalogFunctionOperation) {
             return alterCatalogFunction((AlterCatalogFunctionOperation) operation);
+        } else if (operation instanceof AddJarOperation) {
+            return addJar((AddJarOperation) operation);
+        } else if (operation instanceof ShowJarsOperation) {
+            return buildShowResult("jars", listJars());

Review Comment:
   After investigate hive[1] and spark[2], I think we don't need to return the title of "jars",  keep the origin behavior, just return the jar list enough.
   
   1. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli#LanguageManualCli-HiveCLI
   2. https://spark.apache.org/docs/latest/sql-ref-syntax-aux-resource-mgmt-list-jar.html
   
   However, TableEnvironment requires us to specify the schema and data when we need to return result to client, so here I think we have to specify the schema name to `jars`.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AddAndShowJarITCase.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Add and show jar ITCase. */
+public class AddAndShowJarITCase extends StreamingTestBase {
+    private String jarPath;
+
+    @Before
+    @Override
+    public void before() throws Exception {
+        super.before();
+        jarPath =
+                UserClassLoaderJarTestUtils.createJarFile(
+                                TEMPORARY_FOLDER.newFolder(
+                                        String.format("test-jar-%s", UUID.randomUUID())),
+                                "test-classloader-udf.jar",
+                                GENERATED_LOWER_UDF_CLASS,
+                                String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS))
+                        .getPath();
+    }
+
+    @Test
+    public void testAddAndShowJar() {

Review Comment:
   Ditto



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -155,6 +164,23 @@ class TableEnvironmentTest {
     verifyTableEnvironmentExecutionExplain(tEnv)
   }
 
+  @Test
+  def testAddAndShowJar(): Unit = {
+    val jarPath = UserClassLoaderJarTestUtils
+      .createJarFile(
+        tempFolder.newFolder(String.format("test-jar-%s", UUID.randomUUID)),
+        "test-classloader-udf.jar",
+        GENERATED_LOWER_UDF_CLASS,
+        String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS)
+      )
+      .getPath
+
+    tableEnv.executeSql("add JAR '" + jarPath + "'")
+    val tableResult = tableEnv.executeSql("SHOW JARS")
+
+    checkData(util.Arrays.asList(Row.of(jarPath)).iterator(), tableResult.collect())

Review Comment:
   Convert the result to list, then assert it? `CollectionUtil.iteratorToList(tableResult.collect())`.
   Moreover, please also assert the `ResultKind` such as `    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
   `



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {

Review Comment:
   Please use junit5 for new added test, refer to https://flink.apache.org/contributing/code-style-and-quality-common.html#testing



##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java:
##########
@@ -156,7 +156,8 @@ public void testAddJarWithRelativePath() throws IOException {
 
     @Test
     public void testAddIllegalJar() {
-        validateAddJarWithException("/path/to/illegal.jar", "JAR file does not exist");
+        validateAddJarWithException(

Review Comment:
   Please port the related `add jar` and `show jars` test backed by TableEnvironment Because SessionContext didn't support this now.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -192,31 +192,32 @@ private void checkJarResources(List<ResourceUri> resourceUris) throws IOExceptio
         }
 
         for (ResourceUri resourceUri : resourceUris) {
-            // here can check whether the resource path is valid
-            Path path = new Path(resourceUri.getUri());
-            // file name should end with .jar suffix
-            String fileExtension = Files.getFileExtension(path.getName());
-            if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
-                throw new ValidationException(
-                        String.format(
-                                "The registering jar resource [%s] must ends with '.jar' suffix.",
-                                path));
-            }
+            checkJarPath(new Path(resourceUri.getUri()));
+        }
+    }
 
-            FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
-            // check resource exists firstly
-            if (!fs.exists(path)) {
-                throw new FileNotFoundException(
-                        String.format("Jar resource [%s] not found.", path));
-            }
+    protected void checkJarPath(Path path) throws IOException {
+        // file name should end with .jar suffix
+        String fileExtension = Files.getFileExtension(path.getName());
+        if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
+            throw new ValidationException(
+                    String.format(
+                            "The registering jar resource [%s] must ends with '.jar' suffix.",
+                            path));
+        }
 
-            // register directory is not allowed for resource
-            if (fs.getFileStatus(path).isDir()) {
-                throw new ValidationException(
-                        String.format(
-                                "The registering jar resource [%s] is a directory that is not allowed.",
-                                path));
-            }
+        FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
+        // check resource exists firstly
+        if (!fs.exists(path)) {
+            throw new FileNotFoundException(String.format("Jar resource [%s] not found.", path));
+        }
+
+        // register directory is not allowed for resource
+        if (fs.getFileStatus(path).isDir()) {
+            throw new ValidationException(
+                    String.format(
+                            "The registering jar resource [%s] is a directory that is not allowed.",

Review Comment:
   This method also called by `unregisterJarResource` method, this message is not correct now, so change it to `registering or unregistering`?



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java:
##########
@@ -248,6 +248,7 @@ public static SessionContext create(DefaultContext defaultContext, String sessio
                 executionContext);
     }
 
+    // TODO: Only related to removeJar in test. Remove it once it has no use.

Review Comment:
   I think we should remove this method, and remove the related tests. Due to we port the add jar implementation to TableEnvironment, so we should also port the tests backed by TableEnvironment.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -192,31 +192,32 @@ private void checkJarResources(List<ResourceUri> resourceUris) throws IOExceptio
         }
 
         for (ResourceUri resourceUri : resourceUris) {
-            // here can check whether the resource path is valid
-            Path path = new Path(resourceUri.getUri());
-            // file name should end with .jar suffix
-            String fileExtension = Files.getFileExtension(path.getName());
-            if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
-                throw new ValidationException(
-                        String.format(
-                                "The registering jar resource [%s] must ends with '.jar' suffix.",
-                                path));
-            }
+            checkJarPath(new Path(resourceUri.getUri()));
+        }
+    }
 
-            FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
-            // check resource exists firstly
-            if (!fs.exists(path)) {
-                throw new FileNotFoundException(
-                        String.format("Jar resource [%s] not found.", path));
-            }
+    protected void checkJarPath(Path path) throws IOException {
+        // file name should end with .jar suffix
+        String fileExtension = Files.getFileExtension(path.getName());
+        if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
+            throw new ValidationException(
+                    String.format(
+                            "The registering jar resource [%s] must ends with '.jar' suffix.",

Review Comment:
   This method also called by `unregisterJarResource` method, this message is not correct now, so change it to `registering  or unregistering`?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AddAndShowJarITCase.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Add and show jar ITCase. */
+public class AddAndShowJarITCase extends BatchTestBase {
+    private String jarPath;
+
+    @Before
+    @Override
+    public void before() throws Exception {
+        super.before();
+        jarPath =
+                UserClassLoaderJarTestUtils.createJarFile(
+                                TEMPORARY_FOLDER.newFolder(
+                                        String.format("test-jar-%s", UUID.randomUUID())),
+                                "test-classloader-udf.jar",
+                                GENERATED_LOWER_UDF_CLASS,
+                                String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS))
+                        .getPath();
+    }
+
+    @Test
+    public void testAddAndShowJar() {

Review Comment:
   The purpose of this test is not just to test the add jar, we also want to test the class in jar can also work from specified query, so we should test one query that refer to the UDF class in the jar. 
   I think we don't need to introduce this test class, remove this test to `FunctionITCase`.



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = "add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =

Review Comment:
   Please implement this udf in **flink-sql-client-test** module, it will be packaged into SqlToolbox.jar, then we can use it directly.



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {

Review Comment:
   Change this class name to `UsingRemoteJarITCase`? We also add `create function ... using jar ...` related tests in the future.



##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = "add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;
+    private org.apache.hadoop.fs.Path hdPath;
+    private org.apache.hadoop.fs.FileSystem hdfs;
+
+    public AddRemoteJarITCase(String executionMode) {
+        this.executionMode = executionMode;
+    }
+
+    @BeforeClass
+    public static void verifyOS() {
+        Assume.assumeTrue(
+                "HDFS cluster cannot be started on Windows without extensions.",
+                !OperatingSystem.isWindows());
+    }
+
+    @Before
+    public void createHDFS() {
+        try {
+            Configuration hdConf = new Configuration();
+
+            File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
+            FileUtil.fullyDelete(baseDir);
+            hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+            MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+            hdfsCluster = builder.build();
+
+            hdPath = new org.apache.hadoop.fs.Path("/test.jar");
+            hdfs = hdPath.getFileSystem(hdConf);
+
+            File localUdfJar =
+                    UserClassLoaderJarTestUtils.createJarFile(
+                            tempFolder.newFolder(String.format("test-jar-%s", UUID.randomUUID())),
+                            "test-classloader-udf.jar",
+                            GENERATED_LOWER_UDF_CLASS,
+                            String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
+            hdfs.copyFromLocalFile(new org.apache.hadoop.fs.Path(localUdfJar.toURI()), hdPath);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail("Test failed " + e.getMessage());
+        }
+
+        Path tmpPath = tempFolder.getRoot().toPath();
+        LOG.info("The current temporary path: {}", tmpPath);
+        this.result = tmpPath.resolve("result");
+    }
+
+    @After
+    public void destroyHDFS() {
+        try {
+            hdfs.delete(hdPath, false);
+            hdfsCluster.shutdown();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testAddRemoteJar() throws Exception {
+        String remoteJarPath =
+                "hdfs://"

Review Comment:
   Using String.format()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r935041158


##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(AddRemoteJarITCase.class);
+
+    private static final String ADD_REMOTE_JAR_E2E_SQL = "add_remote_jar_e2e.sql";
+
+    private static final String EXECUTE_SQL_RESULT =
+            "{\"before\":null,\"after\":{\"id\":1,\"content\":\"hello world\"},\"op\":\"c\"}";
+
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+
+    public static final String GENERATED_LOWER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toLowerCase();\n"
+                    + "  }\n"
+                    + "}\n";
+
+    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
+
+    @Parameterized.Parameters(name = "executionMode")
+    public static Collection<String> data() {
+        return Arrays.asList("streaming", "batch");
+    }
+
+    @Rule
+    public final FlinkResource flink =
+            new LocalStandaloneFlinkResourceFactory().create(FlinkResourceSetup.builder().build());
+
+    private final String executionMode;
+    private Path result;
+
+    private MiniDFSCluster hdfsCluster;

Review Comment:
   I agree with you that a more common way is benefit. But the related tests with  HbaseResource and KafkaResource  are all ignored for that the tests maybe get stuck. If we download hdfs and setup a local hdfs cluster by command, I am afraid the process maybe get stuck someday. Since many ITCase use MiniDFSCluster, current implementation is a stable test. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r935108846


##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java:
##########
@@ -248,6 +248,7 @@ public static SessionContext create(DefaultContext defaultContext, String sessio
                 executionContext);
     }
 
+    // TODO: Only related to removeJar in test. Remove it once it has no use.

Review Comment:
   It is a choice. But the removeJar needs add jar firstly. If the addJar method is removed, I has to construct another addJar in sessionContestTest. What do you think about this way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r938495597


##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/AddRemoteJarITCase.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.ClusterController;
+import org.apache.flink.tests.util.flink.FlinkResource;
+import org.apache.flink.tests.util.flink.FlinkResourceSetup;
+import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/** ITCase for adding remote jar. */
+@RunWith(Parameterized.class)
+public class AddRemoteJarITCase extends TestLogger {

Review Comment:
   Some features written in junit4 can not be used in junit5. So we decide not to use junit5 for this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r937680737


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java:
##########
@@ -29,10 +31,13 @@ public class SQLJobSubmission {
 
     private final List<String> sqlLines;
     private final List<String> jars;
+    private final Consumer<Map<String, String>> envProcessor;

Review Comment:
   This is needed for UsingRemoteJarITCaseBase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r935046346


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala:
##########
@@ -155,6 +164,23 @@ class TableEnvironmentTest {
     verifyTableEnvironmentExecutionExplain(tEnv)
   }
 
+  @Test
+  def testAddAndShowJar(): Unit = {
+    val jarPath = UserClassLoaderJarTestUtils
+      .createJarFile(
+        tempFolder.newFolder(String.format("test-jar-%s", UUID.randomUUID)),
+        "test-classloader-udf.jar",
+        GENERATED_LOWER_UDF_CLASS,
+        String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS)
+      )
+      .getPath
+
+    tableEnv.executeSql("add JAR '" + jarPath + "'")
+    val tableResult = tableEnv.executeSql("SHOW JARS")
+
+    checkData(util.Arrays.asList(Row.of(jarPath)).iterator(), tableResult.collect())

Review Comment:
   It is ok to assert the ResultKind. Since there is already a method checkData, why not just use it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r937398787


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) {
         return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
     }
 
+    @Override
+    public void addJar(String jarPath) {
+        try {
+            resourceManager.registerJarResources(
+                    Collections.singletonList(new ResourceUri(ResourceType.JAR, jarPath)));
+        } catch (IOException e) {
+            throw new TableException(String.format("Could not register the specified jar [%s].", jarPath), e);
+        }
+    }
+
+    @Override
+    public List<String> listJars() {

Review Comment:
   As @lsyldliu, the public api will be discussed later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r937678126


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java:
##########
@@ -142,6 +147,23 @@ public void testUserDefinedTemporaryCatalogFunctionByUsingJar() throws Exception
         testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
     }
 
+    @Test
+    public void testUsingAddJar() throws Exception {
+        tEnv().executeSql(String.format("ADD JAR '%s'", jarPath));
+
+        TableResult tableResult = tEnv().executeSql("SHOW JARS");
+        assertThat(
+                        CollectionUtil.iteratorToList(tableResult.collect())
+                                .equals(
+                                        Collections.singletonList(
+                                                Row.of(new Path(jarPath).getPath()))))
+                .isTrue();
+
+        testUserDefinedFunctionByUsingJar(
+                String.format("create function lowerUdf as '%s' LANGUAGE JAVA", udfClassName),
+                null);

Review Comment:
   Using this function to verify the execute result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r937512920


##########
flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/log4j2-test.properties:
##########
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO

Review Comment:
   Please revert this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] beyond1920 commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r929884249


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) {
         return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
     }
 
+    @Override
+    public void addJar(String jarPath) {
+        try {
+            resourceManager.registerJarResources(
+                    Collections.singletonList(new ResourceUri(ResourceType.JAR, jarPath)));
+        } catch (IOException e) {
+            throw new TableException(String.format("Could not register the specified jar [%s].", jarPath), e);
+        }
+    }
+
+    @Override
+    public List<String> listJars() {

Review Comment:
   AddJar and listJars could work for TableAPI user.
   How to use this function for SQL user?
   We also need to support `AddJarOperation` in `executeInternal` method, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r935108846


##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java:
##########
@@ -248,6 +248,7 @@ public static SessionContext create(DefaultContext defaultContext, String sessio
                 executionContext);
     }
 
+    // TODO: Only related to removeJar in test. Remove it once it has no use.

Review Comment:
   I add VisibleForTesting to addJar and listJar in SessionContext. Also, I move the related tests to tableEnv.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
wuchong commented on PR #20361:
URL: https://github.com/apache/flink/pull/20361#issuecomment-1207583833

   The failed case is tracked by FLINK-28844.
   Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r937400491


##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java:
##########
@@ -156,13 +156,7 @@ public void testAddJarWithRelativePath() throws IOException {
 
     @Test
     public void testAddIllegalJar() {
-        validateAddJarWithException("/path/to/illegal.jar", "JAR file does not exist");
-    }
-
-    @Test
-    public void testAddRemoteJar() {
-        validateAddJarWithException(

Review Comment:
   The e2e test UsingRemoteJarITCase will replace it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myracle commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
Myracle commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r937399265


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) {
         return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
     }
 
+    @Override
+    public void addJar(String jarPath) {
+        try {
+            resourceManager.registerJarResources(

Review Comment:
   It is not related to catalog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20361: [FLINK-27790][Table SQL / API] Port ADD JAR /SHOW JARS syntax implementation from SqlClient to TableEnvironment side

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20361:
URL: https://github.com/apache/flink/pull/20361#discussion_r929999650


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -461,6 +463,23 @@ public boolean dropTemporaryFunction(String path) {
         return functionCatalog.dropTemporaryCatalogFunction(unresolvedIdentifier, true);
     }
 
+    @Override
+    public void addJar(String jarPath) {

Review Comment:
   Please add UT about `ADD Jar` and `SHOW JARS` in `TableEnvironmentTest`, add IT case about `ADD Jar` in `TableEnvironmentITCase` which you can refer to the related tests in `FunctionITCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org