You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/03/20 04:47:59 UTC
[flink] branch release-1.14 updated: [FLINK-26618][sql-client] Fix 'remove jar' statement is not aligned with pipeline jars
This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 4fc5b96 [FLINK-26618][sql-client] Fix 'remove jar' statement is not aligned with pipeline jars
4fc5b96 is described below
commit 4fc5b96976843501580f7f335cf39c5c3d8e04d4
Author: Paul Lin <li...@corp.netease.com>
AuthorDate: Sun Mar 20 12:47:17 2022 +0800
[FLINK-26618][sql-client] Fix 'remove jar' statement is not aligned with pipeline jars
This closes #19132.
---
.../client/gateway/context/SessionContext.java | 34 +++++++++++++---------
.../flink-sql-client/src/test/resources/sql/set.q | 8 +++++
2 files changed, 29 insertions(+), 13 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
index 08d37e1..d0bf903 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
@@ -262,6 +262,9 @@ public class SessionContext {
}
Set<URL> newDependencies = new HashSet<>(dependencies);
+ // merge the jars in config with the jars maintained in session
+ Set<URL> jarsInConfig = getJarsInConfig();
+ newDependencies.addAll(jarsInConfig);
newDependencies.add(jarURL);
updateClassLoaderAndDependencies(newDependencies);
@@ -280,6 +283,9 @@ public class SessionContext {
}
Set<URL> newDependencies = new HashSet<>(dependencies);
+ // merge the jars in config with the jars maintained in session
+ Set<URL> jarsInConfig = getJarsInConfig();
+ newDependencies.addAll(jarsInConfig);
newDependencies.remove(jarURL);
updateClassLoaderAndDependencies(newDependencies);
@@ -324,22 +330,10 @@ public class SessionContext {
}
private void updateClassLoaderAndDependencies(Collection<URL> newDependencies) {
- // merge the jar in config with the jar maintained in session
- Set<URL> jarsInConfig;
- try {
- jarsInConfig =
- new HashSet<>(
- ConfigUtils.decodeListFromConfig(
- sessionConfiguration, PipelineOptions.JARS, URL::new));
- } catch (MalformedURLException e) {
- throw new SqlExecutionException(
- "Failed to parse the option `pipeline.jars` in configuration.", e);
- }
- jarsInConfig.addAll(newDependencies);
ConfigUtils.encodeCollectionToConfig(
sessionConfiguration,
PipelineOptions.JARS,
- new ArrayList<>(jarsInConfig),
+ new ArrayList<>(newDependencies),
URL::toString);
// TODO: update the the classloader in CatalogManager.
@@ -374,4 +368,18 @@ public class SessionContext {
e);
}
}
+
+ private Set<URL> getJarsInConfig() {
+ Set<URL> jarsInConfig;
+ try {
+ jarsInConfig =
+ new HashSet<>(
+ ConfigUtils.decodeListFromConfig(
+ sessionConfiguration, PipelineOptions.JARS, URL::new));
+ } catch (MalformedURLException e) {
+ throw new SqlExecutionException(
+ "Failed to parse the option `pipeline.jars` in configuration.", e);
+ }
+ return jarsInConfig;
+ }
}
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index a81b009..bac4a6d 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -160,3 +160,11 @@ SELECT id, func1(str) FROM (VALUES (1, 'Hello World')) AS T(id, str) ;
+----+-------------+--------------------------------+
Received a total of 1 row
!ok
+
+REMOVE JAR '$VAR_UDF_JAR_PATH';
+[INFO] The specified jar is removed from session classloader.
+!info
+
+SHOW JARS;
+Empty set
+!ok