You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/03/20 04:48:50 UTC
[flink] branch release-1.15 updated: [FLINK-26618][sql-client] Fix 'remove jar' statement is not aligned with pipeline jars
This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 8dbef3f [FLINK-26618][sql-client] Fix 'remove jar' statement is not aligned with pipeline jars
8dbef3f is described below
commit 8dbef3f91fa763fc12310875b350317e1763e642
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Sun Mar 20 12:47:58 2022 +0800
[FLINK-26618][sql-client] Fix 'remove jar' statement is not aligned with pipeline jars
This closes #19133.
---
.../client/gateway/context/SessionContext.java | 34 +++++++++++++---------
.../flink-sql-client/src/test/resources/sql/set.q | 8 +++++
2 files changed, 29 insertions(+), 13 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
index d392460..c31f6f5 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
@@ -263,6 +263,9 @@ public class SessionContext {
}
Set<URL> newDependencies = new HashSet<>(dependencies);
+ // merge the jars in config with the jars maintained in session
+ Set<URL> jarsInConfig = getJarsInConfig();
+ newDependencies.addAll(jarsInConfig);
newDependencies.add(jarURL);
updateClassLoaderAndDependencies(newDependencies);
@@ -281,6 +284,9 @@ public class SessionContext {
}
Set<URL> newDependencies = new HashSet<>(dependencies);
+ // merge the jars in config with the jars maintained in session
+ Set<URL> jarsInConfig = getJarsInConfig();
+ newDependencies.addAll(jarsInConfig);
newDependencies.remove(jarURL);
updateClassLoaderAndDependencies(newDependencies);
@@ -325,22 +331,10 @@ public class SessionContext {
}
private void updateClassLoaderAndDependencies(Collection<URL> newDependencies) {
- // merge the jar in config with the jar maintained in session
- Set<URL> jarsInConfig;
- try {
- jarsInConfig =
- new HashSet<>(
- ConfigUtils.decodeListFromConfig(
- sessionConfiguration, PipelineOptions.JARS, URL::new));
- } catch (MalformedURLException e) {
- throw new SqlExecutionException(
- "Failed to parse the option `pipeline.jars` in configuration.", e);
- }
- jarsInConfig.addAll(newDependencies);
ConfigUtils.encodeCollectionToConfig(
sessionConfiguration,
PipelineOptions.JARS,
- new ArrayList<>(jarsInConfig),
+ new ArrayList<>(newDependencies),
URL::toString);
// TODO: update the classloader in CatalogManager.
@@ -375,4 +369,18 @@ public class SessionContext {
e);
}
}
+
+ private Set<URL> getJarsInConfig() {
+ Set<URL> jarsInConfig;
+ try {
+ jarsInConfig =
+ new HashSet<>(
+ ConfigUtils.decodeListFromConfig(
+ sessionConfiguration, PipelineOptions.JARS, URL::new));
+ } catch (MalformedURLException e) {
+ throw new SqlExecutionException(
+ "Failed to parse the option `pipeline.jars` in configuration.", e);
+ }
+ return jarsInConfig;
+ }
}
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index 5da0b88..5d214ef 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -170,3 +170,11 @@ SELECT id, func1(str) FROM (VALUES (1, 'Hello World')) AS T(id, str) ;
+----+-------------+--------------------------------+
Received a total of 1 row
!ok
+
+REMOVE JAR '$VAR_UDF_JAR_PATH';
+[INFO] The specified jar is removed from session classloader.
+!info
+
+SHOW JARS;
+Empty set
+!ok