You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/03/20 04:47:59 UTC

[flink] branch release-1.14 updated: [FLINK-26618][sql-client] Fix 'remove jar' statement is not aligned with pipeline jars

This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 4fc5b96  [FLINK-26618][sql-client] Fix 'remove jar' statement is not aligned with pipeline jars
4fc5b96 is described below

commit 4fc5b96976843501580f7f335cf39c5c3d8e04d4
Author: Paul Lin <li...@corp.netease.com>
AuthorDate: Sun Mar 20 12:47:17 2022 +0800

    [FLINK-26618][sql-client] Fix 'remove jar' statement is not aligned with pipeline jars
    
    This closes #19132.
---
 .../client/gateway/context/SessionContext.java     | 34 +++++++++++++---------
 .../flink-sql-client/src/test/resources/sql/set.q  |  8 +++++
 2 files changed, 29 insertions(+), 13 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
index 08d37e1..d0bf903 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
@@ -262,6 +262,9 @@ public class SessionContext {
         }
 
         Set<URL> newDependencies = new HashSet<>(dependencies);
+        // merge the jars in config with the jars maintained in session
+        Set<URL> jarsInConfig = getJarsInConfig();
+        newDependencies.addAll(jarsInConfig);
         newDependencies.add(jarURL);
         updateClassLoaderAndDependencies(newDependencies);
 
@@ -280,6 +283,9 @@ public class SessionContext {
         }
 
         Set<URL> newDependencies = new HashSet<>(dependencies);
+        // merge the jars in config with the jars maintained in session
+        Set<URL> jarsInConfig = getJarsInConfig();
+        newDependencies.addAll(jarsInConfig);
         newDependencies.remove(jarURL);
         updateClassLoaderAndDependencies(newDependencies);
 
@@ -324,22 +330,10 @@ public class SessionContext {
     }
 
     private void updateClassLoaderAndDependencies(Collection<URL> newDependencies) {
-        // merge the jar in config with the jar maintained in session
-        Set<URL> jarsInConfig;
-        try {
-            jarsInConfig =
-                    new HashSet<>(
-                            ConfigUtils.decodeListFromConfig(
-                                    sessionConfiguration, PipelineOptions.JARS, URL::new));
-        } catch (MalformedURLException e) {
-            throw new SqlExecutionException(
-                    "Failed to parse the option `pipeline.jars` in configuration.", e);
-        }
-        jarsInConfig.addAll(newDependencies);
         ConfigUtils.encodeCollectionToConfig(
                 sessionConfiguration,
                 PipelineOptions.JARS,
-                new ArrayList<>(jarsInConfig),
+                new ArrayList<>(newDependencies),
                 URL::toString);
 
         // TODO: update the the classloader in CatalogManager.
@@ -374,4 +368,18 @@ public class SessionContext {
                     e);
         }
     }
+
+    private Set<URL> getJarsInConfig() {
+        Set<URL> jarsInConfig;
+        try {
+            jarsInConfig =
+                    new HashSet<>(
+                            ConfigUtils.decodeListFromConfig(
+                                    sessionConfiguration, PipelineOptions.JARS, URL::new));
+        } catch (MalformedURLException e) {
+            throw new SqlExecutionException(
+                    "Failed to parse the option `pipeline.jars` in configuration.", e);
+        }
+        return jarsInConfig;
+    }
 }
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index a81b009..bac4a6d 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -160,3 +160,11 @@ SELECT id, func1(str) FROM (VALUES (1, 'Hello World')) AS T(id, str) ;
 +----+-------------+--------------------------------+
 Received a total of 1 row
 !ok
+
+REMOVE JAR '$VAR_UDF_JAR_PATH';
+[INFO] The specified jar is removed from session classloader.
+!info
+
+SHOW JARS;
+Empty set
+!ok