You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/20 09:36:04 UTC
[flink] branch master updated: [FLINK-27659][table-planner] Support to use jar that is registered by 'CREATE FUNCTION USING JAR' syntax
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cfc10771635 [FLINK-27659][table-planner] Support to use jar that is registered by 'CREATE FUNCTION USING JAR' syntax
cfc10771635 is described below
commit cfc107716357c49b14ed59e0359e656764ea7f99
Author: fengli <ld...@163.com>
AuthorDate: Mon May 30 17:56:58 2022 +0800
[FLINK-27659][table-planner] Support to use jar that is registered by 'CREATE FUNCTION USING JAR' syntax
This closes #20001
---
.../java/org/apache/flink/client/ClientUtils.java | 18 +-
.../flink/util/FlinkUserCodeClassLoaders.java | 29 ++-
.../flink/util/UserClassLoaderJarTestUtils.java | 40 ++--
flink-table/flink-sql-client/pom.xml | 8 +
.../client/gateway/context/ExecutionContext.java | 31 +--
.../client/gateway/context/SessionContext.java | 155 ++++++---------
.../client/resource/ClientResourceManager.java | 51 +++++
.../table/client/util/ClientClassloaderUtil.java | 52 +++++
.../client/util/ClientWrapperClassLoader.java | 107 ++++++++++
.../flink/table/client/cli/CliClientITCase.java | 16 +-
.../client/gateway/context/SessionContextTest.java | 19 +-
.../table/client/gateway/local/DependencyTest.java | 7 +-
.../client/gateway/local/LocalExecutorITCase.java | 17 +-
.../client/util/ClientWrapperClassLoaderTest.java | 151 ++++++++++++++
.../src/test/resources/sql/function.q | 27 +++
.../flink-sql-client/src/test/resources/sql/set.q | 2 +-
.../gateway/service/context/SessionContext.java | 85 ++++----
.../AbstractStreamTableEnvironmentImpl.java | 7 +-
.../java/internal/StreamTableEnvironmentImpl.java | 29 +--
.../internal/StreamTableEnvironmentImplTest.java | 18 +-
.../table/api/internal/TableEnvironmentImpl.java | 62 ++++--
.../flink/table/catalog/FunctionCatalog.java | 50 +++--
.../flink/table/resource/ResourceManager.java | 217 ++++++++++++++-------
.../flink/table/catalog/FunctionCatalogTest.java | 12 +-
.../flink/table/resource/ResourceManagerTest.java | 181 +++++++++--------
.../flink/table/utils/TableEnvironmentMock.java | 26 ++-
.../flink/table}/utils/UserDefinedFunctions.java | 17 +-
.../internal/StreamTableEnvironmentImpl.scala | 30 +--
.../internal/StreamTableEnvironmentImplTest.scala | 17 +-
.../batch/sql/CompactManagedTableITCase.java | 2 +-
.../batch/sql/ForwardHashExchangeITCase.java | 2 +-
.../planner/runtime/batch/sql/FunctionITCase.java | 184 +++++++++++++++++
.../sql/agg/LocalAggregatePushDownITCase.java | 2 +-
.../planner/runtime/stream/sql/FunctionITCase.java | 144 +++++++++++++-
.../flink/table/planner/utils/PlannerMocks.java | 22 ++-
.../planner/plan/utils/RexNodeExtractorTest.scala | 14 +-
.../batch/sql/BatchFileSystemITCaseBase.scala | 5 +-
.../batch/sql/PartitionableSourceITCase.scala | 63 +++---
.../planner/runtime/utils/BatchTestBase.scala | 1 +
.../planner/runtime/utils/StreamingTestBase.scala | 1 +
.../flink/table/planner/utils/TableTestBase.scala | 40 ++--
41 files changed, 1439 insertions(+), 522 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 95a223a2135..bf6febda792 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -26,7 +26,6 @@ import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.client.JobInitializationException;
@@ -44,7 +43,6 @@ import java.net.URLClassLoader;
import java.util.List;
import java.util.Optional;
-import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Utility functions for Flink client. */
@@ -62,21 +60,7 @@ public enum ClientUtils {
for (int i = 0; i < classpaths.size(); i++) {
urls[i + jars.size()] = classpaths.get(i);
}
- final String[] alwaysParentFirstLoaderPatterns =
- CoreOptions.getParentFirstLoaderPatterns(configuration);
- final String classLoaderResolveOrder =
- configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
- FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
- FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
- final boolean checkClassloaderLeak =
- configuration.getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);
- return FlinkUserCodeClassLoaders.create(
- resolveOrder,
- urls,
- parent,
- alwaysParentFirstLoaderPatterns,
- NOOP_EXCEPTION_HANDLER,
- checkClassloaderLeak);
+ return FlinkUserCodeClassLoaders.create(urls, parent, configuration);
}
public static void executeProgram(
diff --git a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
index 0a8f4afa896..39b726eaea0 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
@@ -19,6 +19,7 @@
package org.apache.flink.util;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.slf4j.Logger;
@@ -29,6 +30,8 @@ import java.net.URL;
import java.util.Enumeration;
import java.util.function.Consumer;
+import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
+
/** Gives the URLClassLoader a nicer name for debugging purposes. */
@Internal
public class FlinkUserCodeClassLoaders {
@@ -57,6 +60,25 @@ public class FlinkUserCodeClassLoaders {
return wrapWithSafetyNet(classLoader, checkClassLoaderLeak);
}
+ public static MutableURLClassLoader create(
+ final URL[] urls, final ClassLoader parent, final Configuration configuration) {
+ final String[] alwaysParentFirstLoaderPatterns =
+ CoreOptions.getParentFirstLoaderPatterns(configuration);
+ final String classLoaderResolveOrder =
+ configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+ final FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
+ FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
+ final boolean checkClassloaderLeak =
+ configuration.getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);
+ return create(
+ resolveOrder,
+ urls,
+ parent,
+ alwaysParentFirstLoaderPatterns,
+ NOOP_EXCEPTION_HANDLER,
+ checkClassloaderLeak);
+ }
+
public static MutableURLClassLoader create(
ResolveOrder resolveOrder,
URL[] urls,
@@ -129,13 +151,14 @@ public class FlinkUserCodeClassLoaders {
* delegate is nulled and can be garbage collected. Additional class resolution will be resolved
* solely through the bootstrap classloader and most likely result in ClassNotFound exceptions.
*/
- private static class SafetyNetWrapperClassLoader extends MutableURLClassLoader {
+ @Internal
+ public static class SafetyNetWrapperClassLoader extends MutableURLClassLoader {
private static final Logger LOG =
LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class);
- private volatile FlinkUserCodeClassLoader inner;
+ protected volatile FlinkUserCodeClassLoader inner;
- SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner, ClassLoader parent) {
+ protected SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner, ClassLoader parent) {
super(new URL[0], parent);
this.inner = inner;
}
diff --git a/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java b/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
index 42d3e3419db..46c61236dfe 100644
--- a/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
@@ -28,7 +28,10 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
@@ -43,11 +46,22 @@ public class UserClassLoaderJarTestUtils {
/** Pack the generated class into a JAR and return the path of the JAR. */
public static File createJarFile(File tmpDir, String jarName, String className, String javaCode)
throws IOException {
- // write class source code to file
- File javaFile = Paths.get(tmpDir.toString(), className + ".java").toFile();
- //noinspection ResultOfMethodCallIgnored
- javaFile.createNewFile();
- FileUtils.writeFileUtf8(javaFile, javaCode);
+ return createJarFile(tmpDir, jarName, Collections.singletonMap(className, javaCode));
+ }
+
+ /** Pack the generated classes into a JAR and return the path of the JAR. */
+ public static File createJarFile(
+ File tmpDir, String jarName, Map<String, String> classNameCodes) throws IOException {
+ List<File> javaFiles = new ArrayList<>();
+ for (Map.Entry<String, String> entry : classNameCodes.entrySet()) {
+ // write class source code to file
+ File javaFile = Paths.get(tmpDir.toString(), entry.getKey() + ".java").toFile();
+ //noinspection ResultOfMethodCallIgnored
+ javaFile.createNewFile();
+ FileUtils.writeFileUtf8(javaFile, entry.getValue());
+
+ javaFiles.add(javaFile);
+ }
// compile class source code
DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<>();
@@ -55,7 +69,7 @@ public class UserClassLoaderJarTestUtils {
StandardJavaFileManager fileManager =
compiler.getStandardFileManager(diagnostics, null, null);
Iterable<? extends JavaFileObject> compilationUnit =
- fileManager.getJavaFileObjectsFromFiles(Collections.singletonList(javaFile));
+ fileManager.getJavaFileObjectsFromFiles(javaFiles);
JavaCompiler.CompilationTask task =
compiler.getTask(
null,
@@ -67,14 +81,16 @@ public class UserClassLoaderJarTestUtils {
task.call();
// pack class file to jar
- File classFile = Paths.get(tmpDir.toString(), className + ".class").toFile();
File jarFile = Paths.get(tmpDir.toString(), jarName).toFile();
JarOutputStream jos = new JarOutputStream(new FileOutputStream(jarFile));
- JarEntry jarEntry = new JarEntry(className + ".class");
- jos.putNextEntry(jarEntry);
- byte[] classBytes = FileUtils.readAllBytes(classFile.toPath());
- jos.write(classBytes);
- jos.closeEntry();
+ for (String className : classNameCodes.keySet()) {
+ File classFile = Paths.get(tmpDir.toString(), className + ".class").toFile();
+ JarEntry jarEntry = new JarEntry(className + ".class");
+ jos.putNextEntry(jarEntry);
+ byte[] classBytes = FileUtils.readAllBytes(classFile.toPath());
+ jos.write(classBytes);
+ jos.closeEntry();
+ }
jos.close();
return jarFile;
diff --git a/flink-table/flink-sql-client/pom.xml b/flink-table/flink-sql-client/pom.xml
index f6c3a99f60f..23aaf0c987e 100644
--- a/flink-table/flink-sql-client/pom.xml
+++ b/flink-table/flink-sql-client/pom.xml
@@ -136,6 +136,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index 7ca4a09a30a..bc662f56e91 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -34,10 +34,11 @@ import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.util.MutableURLClassLoader;
import org.apache.flink.util.TemporaryClassLoaderContext;
import java.lang.reflect.Method;
-import java.net.URLClassLoader;
import java.util.function.Supplier;
import static org.apache.flink.table.client.gateway.context.SessionContext.SessionState;
@@ -53,16 +54,17 @@ public class ExecutionContext {
// Members that should be reused in the same session.
private final Configuration flinkConfig;
private final SessionState sessionState;
- private final URLClassLoader classLoader;
+ private final MutableURLClassLoader classLoader;
private final StreamTableEnvironment tableEnv;
public ExecutionContext(
- Configuration flinkConfig, URLClassLoader classLoader, SessionState sessionState) {
+ Configuration flinkConfig,
+ MutableURLClassLoader classLoader,
+ SessionState sessionState) {
this.flinkConfig = flinkConfig;
this.sessionState = sessionState;
this.classLoader = classLoader;
-
this.tableEnv = createTableEnvironment();
}
@@ -108,28 +110,26 @@ public class ExecutionContext {
StreamExecutionEnvironment streamExecEnv =
new StreamExecutionEnvironment(new Configuration(flinkConfig), classLoader);
- final Executor executor = lookupExecutor(streamExecEnv);
+ final Executor executor = lookupExecutor(streamExecEnv, classLoader);
- // Updates the classloader of FunctionCatalog by the new classloader to solve ClassNotFound
- // exception when use an udf created by add jar syntax, temporary solution until FLINK-14055
- // is fixed
- sessionState.functionCatalog.updateClassLoader(classLoader);
return createStreamTableEnvironment(
streamExecEnv,
settings,
executor,
sessionState.catalogManager,
sessionState.moduleManager,
+ sessionState.resourceManager,
sessionState.functionCatalog,
classLoader);
}
- private StreamTableEnvironment createStreamTableEnvironment(
+ private static StreamTableEnvironment createStreamTableEnvironment(
StreamExecutionEnvironment env,
EnvironmentSettings settings,
Executor executor,
CatalogManager catalogManager,
ModuleManager moduleManager,
+ ResourceManager resourceManager,
FunctionCatalog functionCatalog,
ClassLoader userClassLoader) {
@@ -149,20 +149,23 @@ public class ExecutionContext {
return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
+ resourceManager,
functionCatalog,
tableConfig,
env,
planner,
executor,
- settings.isStreamingMode(),
- userClassLoader);
+ settings.isStreamingMode());
}
- private Executor lookupExecutor(StreamExecutionEnvironment executionEnvironment) {
+ private static Executor lookupExecutor(
+ StreamExecutionEnvironment executionEnvironment, ClassLoader userClassLoader) {
try {
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
- classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
+ userClassLoader,
+ ExecutorFactory.class,
+ ExecutorFactory.DEFAULT_IDENTIFIER);
final Method createMethod =
executorFactory
.getClass()
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
index ede67d5c259..161599c0cf2 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
@@ -19,12 +19,9 @@
package org.apache.flink.table.client.gateway.context;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -35,7 +32,12 @@ import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.resource.ClientResourceManager;
+import org.apache.flink.table.client.util.ClientClassloaderUtil;
+import org.apache.flink.table.client.util.ClientWrapperClassLoader;
import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.util.JarUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;
@@ -45,11 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -71,17 +69,14 @@ public class SessionContext {
private final Configuration sessionConfiguration;
private final SessionState sessionState;
- // SafetyNetWrapperClassLoader doesn't override the getURL therefore we need to maintain the
- // dependencies by ourselves.
- private Set<URL> dependencies;
- private URLClassLoader classLoader;
+ private final ClientWrapperClassLoader classLoader;
private ExecutionContext executionContext;
private SessionContext(
DefaultContext defaultContext,
String sessionId,
Configuration sessionConfiguration,
- URLClassLoader classLoader,
+ ClientWrapperClassLoader classLoader,
SessionState sessionState,
ExecutionContext executionContext) {
this.defaultContext = defaultContext;
@@ -90,7 +85,6 @@ public class SessionContext {
this.classLoader = classLoader;
this.sessionState = sessionState;
this.executionContext = executionContext;
- this.dependencies = new HashSet<>(defaultContext.getDependencies());
}
// --------------------------------------------------------------------------------------------
@@ -115,7 +109,7 @@ public class SessionContext {
@VisibleForTesting
Set<URL> getDependencies() {
- return dependencies;
+ return sessionState.resourceManager.getLocalJarResources();
}
// --------------------------------------------------------------------------------------------
@@ -133,9 +127,6 @@ public class SessionContext {
// If rebuild a new Configuration, it loses control of the SessionState if users wants to
// modify the configuration
resetSessionConfigurationToDefault(defaultContext.getFlinkConfig());
- // Reset configuration will revert the `pipeline.jars`. To make the current classloader
- // still work, add the maintained dependencies into the configuration.
- updateClassLoaderAndDependencies(dependencies);
executionContext = new ExecutionContext(sessionConfiguration, classLoader, sessionState);
}
@@ -186,11 +177,7 @@ public class SessionContext {
sessionState.catalogManager.getCatalog(name).ifPresent(Catalog::close);
}
}
- try {
- classLoader.close();
- } catch (IOException e) {
- LOG.debug("Error while closing class loader.", e);
- }
+ classLoader.close();
}
// --------------------------------------------------------------------------------------------
@@ -208,25 +195,30 @@ public class SessionContext {
// Init classloader
// --------------------------------------------------------------------------------------------------------------
- URLClassLoader classLoader =
- ClientUtils.buildUserCodeClassLoader(
- defaultContext.getDependencies(),
- Collections.emptyList(),
- SessionContext.class.getClassLoader(),
+ // here use ClientMutableURLClassLoader to support remove jar
+ final ClientWrapperClassLoader userClassLoader =
+ new ClientWrapperClassLoader(
+ ClientClassloaderUtil.buildUserClassLoader(
+ defaultContext.getDependencies(),
+ SessionContext.class.getClassLoader(),
+ new Configuration(configuration)),
configuration);
// --------------------------------------------------------------------------------------------------------------
// Init session state
// --------------------------------------------------------------------------------------------------------------
- ModuleManager moduleManager = new ModuleManager();
+ final ClientResourceManager resourceManager =
+ new ClientResourceManager(configuration, userClassLoader);
+
+ final ModuleManager moduleManager = new ModuleManager();
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().withConfiguration(configuration).build();
- CatalogManager catalogManager =
+ final CatalogManager catalogManager =
CatalogManager.newBuilder()
- .classLoader(classLoader)
+ .classLoader(userClassLoader)
.config(configuration)
.defaultCatalog(
settings.getBuiltInCatalogName(),
@@ -235,69 +227,63 @@ public class SessionContext {
settings.getBuiltInDatabaseName()))
.build();
- FunctionCatalog functionCatalog =
- new FunctionCatalog(configuration, catalogManager, moduleManager, classLoader);
- SessionState sessionState =
- new SessionState(catalogManager, moduleManager, functionCatalog);
+ final FunctionCatalog functionCatalog =
+ new FunctionCatalog(configuration, resourceManager, catalogManager, moduleManager);
+ final SessionState sessionState =
+ new SessionState(catalogManager, moduleManager, resourceManager, functionCatalog);
// --------------------------------------------------------------------------------------------------------------
// Init ExecutionContext
// --------------------------------------------------------------------------------------------------------------
ExecutionContext executionContext =
- new ExecutionContext(configuration, classLoader, sessionState);
+ new ExecutionContext(configuration, userClassLoader, sessionState);
return new SessionContext(
defaultContext,
sessionId,
configuration,
- classLoader,
+ userClassLoader,
sessionState,
executionContext);
}
public void addJar(String jarPath) {
- URL jarURL = getURLFromPath(jarPath, "SQL Client only supports to add local jars.");
- if (dependencies.contains(jarURL)) {
- return;
+ checkJarPath(jarPath, "SQL Client only supports to add local jars.");
+ try {
+ sessionState.resourceManager.registerJarResources(
+ Collections.singletonList(new ResourceUri(ResourceType.JAR, jarPath)));
+ } catch (Exception e) {
+ LOG.warn(String.format("Could not register the specified jar [%s].", jarPath), e);
}
-
- // merge the jars in config with the jars maintained in session
- Set<URL> jarsInConfig = getJarsInConfig();
-
- Set<URL> newDependencies = new HashSet<>(dependencies);
- newDependencies.addAll(jarsInConfig);
- newDependencies.add(jarURL);
- updateClassLoaderAndDependencies(newDependencies);
-
- // renew the execution context
- executionContext = new ExecutionContext(sessionConfiguration, classLoader, sessionState);
}
public void removeJar(String jarPath) {
- URL jarURL = getURLFromPath(jarPath, "SQL Client only supports to remove local jars.");
- if (!dependencies.contains(jarURL)) {
+ // if is relative path, convert to absolute path
+ URL jarURL = checkJarPath(jarPath, "SQL Client only supports to remove local jars.");
+ // remove jar from resource manager
+ jarURL = sessionState.resourceManager.unregisterJarResource(jarURL.getPath());
+ if (jarURL == null) {
LOG.warn(
String.format(
- "Could not remove the specified jar because the jar path(%s) is not found in session classloader.",
+ "Could not remove the specified jar because the jar path [%s] hadn't registered to classloader.",
jarPath));
return;
}
-
- Set<URL> newDependencies = new HashSet<>(dependencies);
- // merge the jars in config with the jars maintained in session
- Set<URL> jarsInConfig = getJarsInConfig();
- newDependencies.addAll(jarsInConfig);
- newDependencies.remove(jarURL);
-
- updateClassLoaderAndDependencies(newDependencies);
-
- // renew the execution context
- executionContext = new ExecutionContext(sessionConfiguration, classLoader, sessionState);
+ // remove jar from classloader
+ classLoader.removeURL(jarURL);
}
public List<String> listJars() {
- return dependencies.stream().map(URL::getPath).collect(Collectors.toList());
+ List<String> jars =
+ sessionState.resourceManager.getResources().keySet().stream()
+ .filter(
+ resourceUri ->
+ ResourceType.JAR.equals(resourceUri.getResourceType()))
+ .map(ResourceUri::getUri)
+ .collect(Collectors.toList());
+ LOG.info(String.format("Registered jars: %s", jars));
+ return jars;
}
// --------------------------------------------------------------------------------------------
@@ -308,15 +294,18 @@ public class SessionContext {
public static class SessionState {
public final CatalogManager catalogManager;
- public final FunctionCatalog functionCatalog;
public final ModuleManager moduleManager;
+ public final ClientResourceManager resourceManager;
+ public final FunctionCatalog functionCatalog;
public SessionState(
CatalogManager catalogManager,
ModuleManager moduleManager,
+ ClientResourceManager resourceManager,
FunctionCatalog functionCatalog) {
this.catalogManager = catalogManager;
this.moduleManager = moduleManager;
+ this.resourceManager = resourceManager;
this.functionCatalog = functionCatalog;
}
}
@@ -332,25 +321,7 @@ public class SessionContext {
sessionConfiguration.addAll(defaultConf);
}
- private void updateClassLoaderAndDependencies(Collection<URL> newDependencies) {
- // replace jars with the new dependencies
- ConfigUtils.encodeCollectionToConfig(
- sessionConfiguration,
- PipelineOptions.JARS,
- new ArrayList<>(newDependencies),
- URL::toString);
-
- // TODO: update the classloader in CatalogManager.
- classLoader =
- ClientUtils.buildUserCodeClassLoader(
- new ArrayList<>(newDependencies),
- Collections.emptyList(),
- SessionContext.class.getClassLoader(),
- sessionConfiguration);
- dependencies = new HashSet<>(newDependencies);
- }
-
- private URL getURLFromPath(String jarPath, String message) {
+ private URL checkJarPath(String jarPath, String message) {
Path path = new Path(jarPath);
String scheme = path.toUri().getScheme();
if (scheme != null && !scheme.equals("file")) {
@@ -372,18 +343,4 @@ public class SessionContext {
e);
}
}
-
- private Set<URL> getJarsInConfig() {
- Set<URL> jarsInConfig;
- try {
- jarsInConfig =
- new HashSet<>(
- ConfigUtils.decodeListFromConfig(
- sessionConfiguration, PipelineOptions.JARS, URL::new));
- } catch (MalformedURLException e) {
- throw new SqlExecutionException(
- "Failed to parse the option `pipeline.jars` in configuration.", e);
- }
- return jarsInConfig;
- }
}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
new file mode 100644
index 00000000000..00d1e780d02
--- /dev/null
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.resource;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.util.MutableURLClassLoader;
+
+import javax.annotation.Nullable;
+
+import java.net.URL;
+
+/**
+ * The {@link ClientResourceManager} is able to remove the registered JAR resources with the
+ * specified jar path.
+ *
+ * <p>After removing the JAR resource, the {@link ResourceManager} is able to register the JAR
+ * resource with the same JAR path. Please notice that the removal doesn't promise the loaded {@link
+ * Class} from the removed jar is inaccessible.
+ */
+@Internal
+public class ClientResourceManager extends ResourceManager {
+
+ public ClientResourceManager(Configuration config, MutableURLClassLoader userClassLoader) {
+ super(config, userClassLoader);
+ }
+
+ @Nullable
+ public URL unregisterJarResource(String jarPath) {
+ return resourceInfos.remove(new ResourceUri(ResourceType.JAR, jarPath));
+ }
+}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientClassloaderUtil.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientClassloaderUtil.java
new file mode 100644
index 00000000000..84b57a604ed
--- /dev/null
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientClassloaderUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.util;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.util.FlinkUserCodeClassLoader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
+/** Utilities for {@link ClientWrapperClassLoader}. */
+public class ClientClassloaderUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClientClassloaderUtil.class);
+
+ public static FlinkUserCodeClassLoader buildUserClassLoader(
+ List<URL> jarUrls, ClassLoader parentClassLoader, Configuration conf) {
+ LOG.debug(
+ String.format(
+ "Set option '%s' to 'false' to use %s.",
+ CoreOptions.CHECK_LEAKED_CLASSLOADER.key(),
+ ClientWrapperClassLoader.class.getSimpleName()));
+ conf.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+ return (FlinkUserCodeClassLoader)
+ ClientUtils.buildUserCodeClassLoader(
+ jarUrls, Collections.emptyList(), parentClassLoader, conf);
+ }
+
+ private ClientClassloaderUtil() {}
+}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientWrapperClassLoader.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientWrapperClassLoader.java
new file mode 100644
index 00000000000..0cfcb939f14
--- /dev/null
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientWrapperClassLoader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.util;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkUserCodeClassLoader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader;
+
+/**
+ * This class loader extends {@link SafetyNetWrapperClassLoader}, upon the {@code addURL} method, it
+ * also exposes a {@code removeURL} method which used to remove unnecessary jar from current
+ * classloader path. This class loader wraps a {@link FlinkUserCodeClassLoader} and an old
+ * classloader list, the class load is delegated to the inner {@link FlinkUserCodeClassLoader}.
+ *
+ * <p>This is only used to SqlClient for supporting {@code REMOVE JAR} clause currently. When remove
+ * a jar, get the registered jar url list from current {@link FlinkUserCodeClassLoader} firstly,
+ * then create a new instance of {@link FlinkUserCodeClassLoader} which urls doesn't include the
+ * removed jar, and the currentClassLoader point to new instance object, the old object is added to
+ * list to be closed when close {@link ClientWrapperClassLoader}.
+ *
+ * <p>Note: This classloader is not guaranteed to actually remove class or resource, any classes or
+ * resources in the removed jar that are already loaded, are still accessible.
+ */
+@Experimental
+@Internal
+public class ClientWrapperClassLoader extends SafetyNetWrapperClassLoader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClientWrapperClassLoader.class);
+
+ static {
+ ClassLoader.registerAsParallelCapable();
+ }
+
+ private final Configuration configuration;
+ private final List<FlinkUserCodeClassLoader> originClassLoaders;
+
+ public ClientWrapperClassLoader(FlinkUserCodeClassLoader inner, Configuration configuration) {
+ super(inner, inner.getParent());
+ this.configuration = new Configuration(configuration);
+ this.originClassLoaders = new ArrayList<>();
+ }
+
+ public void removeURL(URL url) {
+ Set<URL> registeredUrls = Stream.of(inner.getURLs()).collect(Collectors.toSet());
+ if (!registeredUrls.contains(url)) {
+ LOG.warn(
+ String.format(
+ "Could not remove the specified jar because the jar path [%s] is not found in classloader.",
+ url));
+ return;
+ }
+
+ originClassLoaders.add(inner);
+ // build a new classloader without removed jars
+ registeredUrls.remove(url);
+ inner =
+ ClientClassloaderUtil.buildUserClassLoader(
+ new ArrayList<>(registeredUrls),
+ ClientWrapperClassLoader.class.getClassLoader(),
+ configuration);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ // close other classloader in the list
+ for (FlinkUserCodeClassLoader classLoader : originClassLoaders) {
+ try {
+ classLoader.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close the origin classloader.", e);
+ }
+ }
+
+ originClassLoaders.clear();
+ }
+}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index 45ee390095f..e31ee990058 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.client.cli.utils.TestSqlStatement;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.context.DefaultContext;
import org.apache.flink.table.client.gateway.local.LocalExecutor;
-import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.UserClassLoaderJarTestUtils;
@@ -69,6 +68,10 @@ import java.util.stream.Collectors;
import static org.apache.flink.configuration.JobManagerOptions.ADDRESS;
import static org.apache.flink.configuration.RestOptions.PORT;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CODE;
import static org.assertj.core.api.Assertions.assertThat;
/** Test that runs every {@code xx.q} file in "resources/sql/" path as a test. */
@@ -99,12 +102,19 @@ public class CliClientITCase extends AbstractTestBase {
@BeforeClass
public static void setup() throws IOException {
+ Map<String, String> classNameCodes = new HashMap<>();
+ classNameCodes.put(
+ GENERATED_LOWER_UDF_CLASS,
+ String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
+ classNameCodes.put(
+ GENERATED_UPPER_UDF_CLASS,
+ String.format(GENERATED_UPPER_UDF_CODE, GENERATED_UPPER_UDF_CLASS));
+
File udfJar =
UserClassLoaderJarTestUtils.createJarFile(
tempFolder.newFolder("test-jar"),
"test-classloader-udf.jar",
- UserDefinedFunctions.GENERATED_UDF_CLASS,
- UserDefinedFunctions.GENERATED_UDF_CODE);
+ classNameCodes);
URL udfDependency = udfJar.toURI().toURL();
historyPath = tempFolder.newFile("history").toPath();
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
index 7f9f1574a85..3140aed5dec 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.client.gateway.context;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
import org.apache.flink.util.UserClassLoaderJarTestUtils;
import org.junit.Before;
@@ -43,6 +42,8 @@ import static org.apache.flink.configuration.PipelineOptions.NAME;
import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.HamcrestCondition.matching;
@@ -62,8 +63,8 @@ public class SessionContextTest {
UserClassLoaderJarTestUtils.createJarFile(
tempFolder.newFolder("test-jar"),
"test-classloader-udf.jar",
- UserDefinedFunctions.GENERATED_UDF_CLASS,
- UserDefinedFunctions.GENERATED_UDF_CODE);
+ GENERATED_LOWER_UDF_CLASS,
+ String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
}
@Before
@@ -164,14 +165,6 @@ public class SessionContextTest {
"hdfs://remote:10080/remote.jar", "SQL Client only supports to add local jars.");
}
- @Test
- public void testAddIllegalJarInConfig() {
- Configuration innerConfig = (Configuration) sessionContext.getReadableConfig();
- innerConfig.set(JARS, Collections.singletonList("/path/to/illegal.jar"));
-
- validateAddJarWithException(udfJar.getPath(), "no protocol: /path/to/illegal.jar");
- }
-
@Test
public void testRemoveJarWithFullPath() {
validateRemoveJar(udfJar.getPath());
@@ -224,14 +217,12 @@ public class SessionContextTest {
return sessionContext.getExecutionContext().getTableEnvironment().getConfig();
}
- private void validateAddJar(String jarPath) throws IOException {
+ private void validateAddJar(String jarPath) {
sessionContext.addJar(jarPath);
assertThat(sessionContext.listJars()).containsExactly(udfJar.getPath());
- assertThat(getConfiguration().get(JARS)).containsExactly(udfJar.toURI().toURL().toString());
// reset to the default
sessionContext.reset();
assertThat(sessionContext.listJars()).containsExactly(udfJar.getPath());
- assertThat(getConfiguration().get(JARS)).containsExactly(udfJar.toURI().toURL().toString());
}
private void validateRemoveJar(String jarPath) {
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 6a4dc267612..2de67ff74fc 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -57,8 +57,6 @@ import org.apache.flink.util.CollectionUtil;
import org.junit.Test;
-import java.net.URL;
-import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -80,7 +78,6 @@ public class DependencyTest {
public static final String CATALOG_TYPE_TEST = "DependencyTest";
- private static final String TABLE_FACTORY_JAR_FILE = "table-factories-test-jar.jar";
private static final List<String> INIT_SQL =
Arrays.asList(
String.format(
@@ -151,12 +148,10 @@ public class DependencyTest {
}
private LocalExecutor createLocalExecutor() throws Exception {
- // create executor with dependencies
- final URL dependency = Paths.get("target", TABLE_FACTORY_JAR_FILE).toUri().toURL();
// create default context
DefaultContext defaultContext =
new DefaultContext(
- Collections.singletonList(dependency),
+ Collections.emptyList(),
new Configuration(),
Collections.singletonList(new DefaultCLI()));
LocalExecutor executor = new LocalExecutor(defaultContext);
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index df1ba55cfe5..7c7a5062136 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -34,13 +34,12 @@ import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.context.DefaultContext;
-import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
-import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.TableUDF;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.utils.UserDefinedFunctions;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
@@ -71,7 +70,8 @@ import java.util.stream.Stream;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS;
import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
-import static org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.ScalarUDF;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
import static org.apache.flink.util.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;
@@ -104,8 +104,8 @@ public class LocalExecutorITCase extends TestLogger {
UserClassLoaderJarTestUtils.createJarFile(
tempFolder.newFolder("test-jar"),
"test-classloader-udf.jar",
- UserDefinedFunctions.GENERATED_UDF_CLASS,
- UserDefinedFunctions.GENERATED_UDF_CODE);
+ GENERATED_LOWER_UDF_CLASS,
+ String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
udfDependency = udfJar.toURI().toURL();
}
@@ -565,11 +565,14 @@ public class LocalExecutorITCase extends TestLogger {
private List<String> getInitSQL(final Map<String, String> replaceVars) {
return Stream.of(
String.format(
- "CREATE FUNCTION scalarUDF AS '%s'", ScalarUDF.class.getName()),
+ "CREATE FUNCTION scalarUDF AS '%s'",
+ UserDefinedFunctions.ScalarUDF.class.getName()),
String.format(
"CREATE FUNCTION aggregateUDF AS '%s'",
AggregateFunction.class.getName()),
- String.format("CREATE FUNCTION tableUDF AS '%s'", TableUDF.class.getName()),
+ String.format(
+ "CREATE FUNCTION tableUDF AS '%s'",
+ UserDefinedFunctions.TableUDF.class.getName()),
"CREATE TABLE TableNumber1 (\n"
+ " IntegerField1 INT,\n"
+ " StringField1 STRING,\n"
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/util/ClientWrapperClassLoaderTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/util/ClientWrapperClassLoaderTest.java
new file mode 100644
index 00000000000..941ceaa1388
--- /dev/null
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/util/ClientWrapperClassLoaderTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CODE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for classloading and class loader utilities. */
+public class ClientWrapperClassLoaderTest {
+
+ private static File userJar;
+
+ @BeforeAll
+ public static void prepare(@TempDir File tempDir) throws Exception {
+ Map<String, String> classNameCodes = new HashMap<>();
+ classNameCodes.put(
+ GENERATED_LOWER_UDF_CLASS,
+ String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
+ classNameCodes.put(
+ GENERATED_UPPER_UDF_CLASS,
+ String.format(GENERATED_UPPER_UDF_CODE, GENERATED_UPPER_UDF_CLASS));
+ userJar =
+ UserClassLoaderJarTestUtils.createJarFile(
+ tempDir, "test-classloader.jar", classNameCodes);
+ }
+
+ @Test
+ public void testClassLoadingByAddURL() throws Exception {
+ Configuration configuration = new Configuration();
+ final ClientWrapperClassLoader classLoader =
+ new ClientWrapperClassLoader(
+ ClientClassloaderUtil.buildUserClassLoader(
+ Collections.emptyList(),
+ getClass().getClassLoader(),
+ configuration),
+ configuration);
+
+ // test class loader before add jar url to ClassLoader
+ assertClassNotFoundException(GENERATED_LOWER_UDF_CLASS, classLoader);
+
+ // add jar url to ClassLoader
+ classLoader.addURL(userJar.toURI().toURL());
+
+ assertEquals(1, classLoader.getURLs().length);
+
+ final Class<?> clazz1 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, classLoader);
+ final Class<?> clazz2 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, classLoader);
+
+ assertEquals(clazz1, clazz2);
+
+ classLoader.close();
+ }
+
+ @Test
+ public void testClassLoadingByRemoveURL() throws Exception {
+ URL jarURL = userJar.toURI().toURL();
+ Configuration configuration = new Configuration();
+ final ClientWrapperClassLoader classLoader =
+ new ClientWrapperClassLoader(
+ ClientClassloaderUtil.buildUserClassLoader(
+ Collections.singletonList(jarURL),
+ getClass().getClassLoader(),
+ configuration),
+ configuration);
+
+ final Class<?> clazz1 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, classLoader);
+ final Class<?> clazz2 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, classLoader);
+ assertEquals(clazz1, clazz2);
+
+ // remove jar url
+ classLoader.removeURL(jarURL);
+
+ assertEquals(0, classLoader.getURLs().length);
+
+ // test class loader after remove jar url from ClassLoader
+ assertClassNotFoundException(GENERATED_UPPER_UDF_CLASS, classLoader);
+
+ // add jar url to ClassLoader again
+ classLoader.addURL(jarURL);
+
+ assertEquals(1, classLoader.getURLs().length);
+
+ final Class<?> clazz3 = Class.forName(GENERATED_UPPER_UDF_CLASS, false, classLoader);
+ final Class<?> clazz4 = Class.forName(GENERATED_UPPER_UDF_CLASS, false, classLoader);
+ assertEquals(clazz3, clazz4);
+
+ classLoader.close();
+ }
+
+ @Test
+ public void testParallelCapable() {
+ // It will be true only if all the super classes (except class Object) of the caller are
+ // registered as parallel capable.
+ assertTrue(TestClientWrapperClassLoader.IS_PARALLEL_CAPABLE);
+ }
+
+ private void assertClassNotFoundException(String className, ClassLoader classLoader) {
+ CommonTestUtils.assertThrows(
+ className,
+ ClassNotFoundException.class,
+ () -> Class.forName(className, false, classLoader));
+ }
+
+ private static class TestClientWrapperClassLoader extends ClientWrapperClassLoader {
+
+ public static final boolean IS_PARALLEL_CAPABLE = ClassLoader.registerAsParallelCapable();
+
+ TestClientWrapperClassLoader() {
+ super(
+ ClientClassloaderUtil.buildUserClassLoader(
+ Collections.emptyList(),
+ TestClientWrapperClassLoader.class.getClassLoader(),
+ new Configuration()),
+ new Configuration());
+ }
+ }
+}
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q b/flink-table/flink-sql-client/src/test/resources/sql/function.q
index 924c96f9b76..705bb45351d 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/function.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q
@@ -209,6 +209,33 @@ org.apache.flink.table.api.ValidationException: Alter temporary catalog function
!error
+# ==========================================================================
+# test create function using jar
+# ==========================================================================
+
+REMOVE JAR '$VAR_UDF_JAR_PATH';
+[INFO] The specified jar is removed from session classloader.
+!info
+
+create function upperudf AS 'UpperUDF' using jar '$VAR_UDF_JAR_PATH';
+[INFO] Execute statement succeed.
+!info
+
+# run a query to verify the registered UDF works
+SELECT id, upperudf(str) FROM (VALUES (1, 'hello world'), (2, 'hi')) as T(id, str);
++----+-------------+--------------------------------+
+| op | id | EXPR$1 |
++----+-------------+--------------------------------+
+| +I | 1 | HELLO WORLD |
+| +I | 2 | HI |
++----+-------------+--------------------------------+
+Received a total of 2 rows
+!ok
+
+SHOW JARS;
+$VAR_UDF_JAR_PATH
+!ok
+
# ==========================================================================
# test function with hive catalog
# ==========================================================================
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index a2d01e005f8..06edfa63997 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -173,7 +173,7 @@ set;
'execution.target' = 'remote'
'jobmanager.rpc.address' = 'localhost'
'pipeline.classpaths' = ''
-'pipeline.jars' = '$VAR_PIPELINE_JARS_URL'
+'pipeline.jars' = ''
'rest.port' = '$VAR_REST_PORT'
'sql-client.verbose' = 'true'
'table.exec.legacy-cast-behaviour' = 'DISABLED'
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index 15ad5a7050f..ba2298548e5 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -18,16 +18,15 @@
package org.apache.flink.table.gateway.service.context;
-import org.apache.flink.client.ClientUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
@@ -44,6 +43,9 @@ import org.apache.flink.table.gateway.service.operation.OperationExecutor;
import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.MutableURLClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,11 +54,10 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.ArrayList;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
@@ -177,26 +178,31 @@ public class SessionContext {
Configuration configuration = defaultContext.getFlinkConfig().clone();
configuration.addAll(sessionConf);
+ // every session configure the specific local resource download directory
+ setResourceDownloadTmpDir(sessionConf, sessionId);
// --------------------------------------------------------------------------------------------------------------
// Init classloader
// --------------------------------------------------------------------------------------------------------------
- URLClassLoader classLoader =
- buildClassLoader(Collections.emptySet(), Collections.emptySet(), configuration);
+ final MutableURLClassLoader userClassLoader =
+ FlinkUserCodeClassLoaders.create(
+ new URL[0], SessionContext.class.getClassLoader(), configuration);
// --------------------------------------------------------------------------------------------------------------
// Init session state
// --------------------------------------------------------------------------------------------------------------
- ModuleManager moduleManager = new ModuleManager();
+ final ResourceManager resourceManager = new ResourceManager(configuration, userClassLoader);
+
+ final ModuleManager moduleManager = new ModuleManager();
final EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(configuration);
CatalogManager catalogManager =
CatalogManager.newBuilder()
// Currently, the classloader is only used by DataTypeFactory.
- .classLoader(classLoader)
+ .classLoader(userClassLoader)
.config(configuration)
.defaultCatalog(
settings.getBuiltInCatalogName(),
@@ -205,37 +211,21 @@ public class SessionContext {
settings.getBuiltInDatabaseName()))
.build();
- FunctionCatalog functionCatalog =
- new FunctionCatalog(configuration, catalogManager, moduleManager, classLoader);
+ final FunctionCatalog functionCatalog =
+ new FunctionCatalog(configuration, resourceManager, catalogManager, moduleManager);
SessionState sessionState =
- new SessionState(catalogManager, moduleManager, functionCatalog);
+ new SessionState(catalogManager, moduleManager, resourceManager, functionCatalog);
return new SessionContext(
defaultContext,
sessionId,
endpointVersion,
configuration,
- classLoader,
+ userClassLoader,
sessionState,
new OperationManager(operationExecutorService));
}
- private static URLClassLoader buildClassLoader(
- Set<URL> envDependencies, Set<URL> userDependencies, Configuration conf) {
- Set<URL> newDependencies = new HashSet<>();
- newDependencies.addAll(envDependencies);
- newDependencies.addAll(userDependencies);
-
- // override to use SafetyNetWrapperClassLoader
- conf.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, true);
-
- return ClientUtils.buildUserCodeClassLoader(
- new ArrayList<>(newDependencies),
- Collections.emptyList(),
- SessionContext.class.getClassLoader(),
- conf);
- }
-
// ------------------------------------------------------------------------------------------------------------------
// Helpers
// ------------------------------------------------------------------------------------------------------------------
@@ -251,7 +241,7 @@ public class SessionContext {
tableConfig.setRootConfiguration(defaultContext.getFlinkConfig());
tableConfig.addConfiguration(sessionConf);
- final Executor executor = lookupExecutor(streamExecEnv);
+ final Executor executor = lookupExecutor(streamExecEnv, userClassloader);
return createStreamTableEnvironment(
streamExecEnv,
settings,
@@ -259,29 +249,29 @@ public class SessionContext {
executor,
sessionState.catalogManager,
sessionState.moduleManager,
- sessionState.functionCatalog,
- userClassloader);
+ sessionState.resourceManager,
+ sessionState.functionCatalog);
}
public OperationManager getOperationManager() {
return operationManager;
}
- private TableEnvironmentInternal createStreamTableEnvironment(
+ private static TableEnvironmentInternal createStreamTableEnvironment(
StreamExecutionEnvironment env,
EnvironmentSettings settings,
TableConfig tableConfig,
Executor executor,
CatalogManager catalogManager,
ModuleManager moduleManager,
- FunctionCatalog functionCatalog,
- ClassLoader userClassLoader) {
+ ResourceManager resourceManager,
+ FunctionCatalog functionCatalog) {
final Planner planner =
PlannerFactoryUtil.createPlanner(
executor,
tableConfig,
- userClassLoader,
+ resourceManager.getUserClassLoader(),
moduleManager,
catalogManager,
functionCatalog);
@@ -289,20 +279,21 @@ public class SessionContext {
return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
+ resourceManager,
functionCatalog,
tableConfig,
env,
planner,
executor,
- settings.isStreamingMode(),
- userClassLoader);
+ settings.isStreamingMode());
}
- private Executor lookupExecutor(StreamExecutionEnvironment executionEnvironment) {
+ private static Executor lookupExecutor(
+ StreamExecutionEnvironment executionEnvironment, ClassLoader userClassLoader) {
try {
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
- userClassloader,
+ userClassLoader,
ExecutorFactory.class,
ExecutorFactory.DEFAULT_IDENTIFIER);
final Method createMethod =
@@ -326,6 +317,17 @@ public class SessionContext {
return new StreamExecutionEnvironment(new Configuration(sessionConf), userClassloader);
}
+ private static void setResourceDownloadTmpDir(
+ Configuration configuration, SessionHandle sessionId) {
+ Path path =
+ Paths.get(
+ configuration.get(TableConfigOptions.RESOURCE_DOWNLOAD_DIR),
+ String.format("sql-gateway-%s", sessionId));
+ // override resource download temp directory
+ configuration.set(
+ TableConfigOptions.RESOURCE_DOWNLOAD_DIR, path.toAbsolutePath().toString());
+ }
+
// --------------------------------------------------------------------------------------------
// Inner class
// --------------------------------------------------------------------------------------------
@@ -334,15 +336,18 @@ public class SessionContext {
public static class SessionState {
public final CatalogManager catalogManager;
+ public final ResourceManager resourceManager;
public final FunctionCatalog functionCatalog;
public final ModuleManager moduleManager;
public SessionState(
CatalogManager catalogManager,
ModuleManager moduleManager,
+ ResourceManager resourceManager,
FunctionCatalog functionCatalog) {
this.catalogManager = catalogManager;
this.moduleManager = moduleManager;
+ this.resourceManager = resourceManager;
this.functionCatalog = functionCatalog;
}
}
diff --git a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
index 16c52b77be2..06c58c5e473 100644
--- a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
@@ -57,6 +57,7 @@ import org.apache.flink.table.operations.ExternalQueryOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
+import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.typeutils.FieldInfoUtils;
@@ -78,22 +79,22 @@ public abstract class AbstractStreamTableEnvironmentImpl extends TableEnvironmen
public AbstractStreamTableEnvironmentImpl(
CatalogManager catalogManager,
ModuleManager moduleManager,
+ ResourceManager resourceManager,
TableConfig tableConfig,
Executor executor,
FunctionCatalog functionCatalog,
Planner planner,
boolean isStreamingMode,
- ClassLoader userClassLoader,
StreamExecutionEnvironment executionEnvironment) {
super(
catalogManager,
moduleManager,
+ resourceManager,
tableConfig,
executor,
functionCatalog,
planner,
- isStreamingMode,
- userClassLoader);
+ isStreamingMode);
this.executionEnvironment = executionEnvironment;
}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index cd8e4c4c622..716bfe806b1 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -46,14 +46,18 @@ import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.OutputConversionModifyOperation;
+import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.MutableURLClassLoader;
import org.apache.flink.util.Preconditions;
+import java.net.URL;
import java.util.Arrays;
import java.util.Optional;
@@ -70,40 +74,43 @@ public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnviron
public StreamTableEnvironmentImpl(
CatalogManager catalogManager,
ModuleManager moduleManager,
+ ResourceManager resourceManager,
FunctionCatalog functionCatalog,
TableConfig tableConfig,
StreamExecutionEnvironment executionEnvironment,
Planner planner,
Executor executor,
- boolean isStreamingMode,
- ClassLoader userClassLoader) {
+ boolean isStreamingMode) {
super(
catalogManager,
moduleManager,
+ resourceManager,
tableConfig,
executor,
functionCatalog,
planner,
isStreamingMode,
- userClassLoader,
executionEnvironment);
}
public static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
- final ClassLoader classLoader = settings.getUserClassLoader();
-
- final Executor executor = lookupExecutor(classLoader, executionEnvironment);
+ final MutableURLClassLoader userClassLoader =
+ FlinkUserCodeClassLoaders.create(
+ new URL[0], settings.getUserClassLoader(), settings.getConfiguration());
+ final Executor executor = lookupExecutor(userClassLoader, executionEnvironment);
final TableConfig tableConfig = TableConfig.getDefault();
tableConfig.setRootConfiguration(executor.getConfiguration());
tableConfig.addConfiguration(settings.getConfiguration());
+ final ResourceManager resourceManager =
+ new ResourceManager(settings.getConfiguration(), userClassLoader);
final ModuleManager moduleManager = new ModuleManager();
final CatalogManager catalogManager =
CatalogManager.newBuilder()
- .classLoader(classLoader)
+ .classLoader(userClassLoader)
.config(tableConfig)
.defaultCatalog(
settings.getBuiltInCatalogName(),
@@ -114,13 +121,13 @@ public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnviron
.build();
final FunctionCatalog functionCatalog =
- new FunctionCatalog(tableConfig, catalogManager, moduleManager, classLoader);
+ new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager);
final Planner planner =
PlannerFactoryUtil.createPlanner(
executor,
tableConfig,
- classLoader,
+ userClassLoader,
moduleManager,
catalogManager,
functionCatalog);
@@ -128,13 +135,13 @@ public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnviron
return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
+ resourceManager,
functionCatalog,
tableConfig,
executionEnvironment,
planner,
executor,
- settings.isStreamingMode(),
- classLoader);
+ settings.isStreamingMode());
}
@Override
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
index b2d4162c21d..4245c6193ca 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.table.utils.ExecutorMock;
import org.apache.flink.table.utils.PlannerMock;
@@ -34,6 +35,7 @@ import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
+import java.net.URL;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -77,20 +79,22 @@ class StreamTableEnvironmentImplTest {
TableConfig tableConfig = TableConfig.getDefault();
CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
ModuleManager moduleManager = new ModuleManager();
+ ResourceManager resourceManager =
+ ResourceManager.createResourceManager(
+ new URL[0],
+ Thread.currentThread().getContextClassLoader(),
+ tableConfig.getConfiguration());
+
return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
- new FunctionCatalog(
- tableConfig,
- catalogManager,
- moduleManager,
- StreamTableEnvironmentImplTest.class.getClassLoader()),
+ resourceManager,
+ new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager),
tableConfig,
env,
new TestPlanner(elements.getTransformation()),
new ExecutorMock(),
- true,
- this.getClass().getClassLoader());
+ true);
}
private static class TestPlanner extends PlannerMock {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index f93782eb71e..68b97a0d279 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -22,7 +22,9 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.DataTypes;
@@ -145,6 +147,7 @@ import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
+import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
@@ -155,10 +158,13 @@ import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.table.utils.print.PrintStyle;
import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.MutableURLClassLoader;
import org.apache.flink.util.Preconditions;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
@@ -167,6 +173,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -186,6 +193,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
private static final boolean IS_STREAM_TABLE = true;
private final CatalogManager catalogManager;
private final ModuleManager moduleManager;
+ private final ResourceManager resourceManager;
private final OperationTreeBuilder operationTreeBuilder;
protected final TableConfig tableConfig;
@@ -193,7 +201,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
protected final FunctionCatalog functionCatalog;
protected final Planner planner;
private final boolean isStreamingMode;
- private final ClassLoader userClassLoader;
private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG =
"Unsupported SQL query! executeSql() only accepts a single SQL statement of type "
+ "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
@@ -207,14 +214,15 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
protected TableEnvironmentImpl(
CatalogManager catalogManager,
ModuleManager moduleManager,
+ ResourceManager resourceManager,
TableConfig tableConfig,
Executor executor,
FunctionCatalog functionCatalog,
Planner planner,
- boolean isStreamingMode,
- ClassLoader userClassLoader) {
+ boolean isStreamingMode) {
this.catalogManager = catalogManager;
this.moduleManager = moduleManager;
+ this.resourceManager = resourceManager;
this.execEnv = executor;
this.tableConfig = tableConfig;
@@ -222,11 +230,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
this.functionCatalog = functionCatalog;
this.planner = planner;
this.isStreamingMode = isStreamingMode;
- this.userClassLoader = userClassLoader;
this.operationTreeBuilder =
OperationTreeBuilder.create(
tableConfig,
- userClassLoader,
+ resourceManager.getUserClassLoader(),
functionCatalog.asLookup(getParser()::parseIdentifier),
catalogManager.getDataTypeFactory(),
path -> {
@@ -260,11 +267,13 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
}
public static TableEnvironmentImpl create(EnvironmentSettings settings) {
- final ClassLoader classLoader = settings.getUserClassLoader();
+ final MutableURLClassLoader userClassLoader =
+ FlinkUserCodeClassLoaders.create(
+ new URL[0], settings.getUserClassLoader(), settings.getConfiguration());
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
- classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
+ userClassLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
final Executor executor = executorFactory.create(settings.getConfiguration());
// use configuration to init table config
@@ -272,11 +281,12 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
tableConfig.setRootConfiguration(executor.getConfiguration());
tableConfig.addConfiguration(settings.getConfiguration());
+ final ResourceManager resourceManager =
+ new ResourceManager(settings.getConfiguration(), userClassLoader);
final ModuleManager moduleManager = new ModuleManager();
-
final CatalogManager catalogManager =
CatalogManager.newBuilder()
- .classLoader(classLoader)
+ .classLoader(userClassLoader)
.config(tableConfig)
.defaultCatalog(
settings.getBuiltInCatalogName(),
@@ -286,13 +296,13 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
.build();
final FunctionCatalog functionCatalog =
- new FunctionCatalog(tableConfig, catalogManager, moduleManager, classLoader);
+ new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager);
final Planner planner =
PlannerFactoryUtil.createPlanner(
executor,
tableConfig,
- classLoader,
+ userClassLoader,
moduleManager,
catalogManager,
functionCatalog);
@@ -300,12 +310,12 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
return new TableEnvironmentImpl(
catalogManager,
moduleManager,
+ resourceManager,
tableConfig,
executor,
functionCatalog,
planner,
- settings.isStreamingMode(),
- classLoader);
+ settings.isStreamingMode());
}
@Override
@@ -791,6 +801,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
private TableResultInternal executeInternal(
List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
final String defaultJobName = "insert-into_" + String.join(",", sinkIdentifierNames);
+
+ // Merge user jars to table configuration
+ mergePipelineJarsToConfig(
+ resourceManager.getLocalJarResources(), tableConfig.getConfiguration());
+
// We pass only the configuration to avoid reconfiguration with the rootConfiguration
Pipeline pipeline =
execEnv.createPipeline(
@@ -822,6 +837,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
List<Transformation<?>> transformations =
translate(Collections.singletonList(sinkOperation));
final String defaultJobName = "collect";
+
+ // Merge user jars to table configuration
+ mergePipelineJarsToConfig(
+ resourceManager.getLocalJarResources(), tableConfig.getConfiguration());
+
// We pass only the configuration to avoid reconfiguration with the rootConfiguration
Pipeline pipeline =
execEnv.createPipeline(
@@ -1349,7 +1369,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
Catalog catalog =
FactoryUtil.createCatalog(
- catalogName, properties, tableConfig, userClassLoader);
+ catalogName,
+ properties,
+ tableConfig,
+ resourceManager.getUserClassLoader());
catalogManager.registerCatalog(catalogName, catalog);
return TableResultImpl.TABLE_RESULT_OK;
@@ -1366,7 +1389,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
operation.getModuleName(),
operation.getOptions(),
tableConfig,
- userClassLoader);
+ resourceManager.getUserClassLoader());
moduleManager.loadModule(operation.getModuleName(), module);
return TableResultImpl.TABLE_RESULT_OK;
} catch (ValidationException e) {
@@ -1849,6 +1872,15 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
}
}
+ private void mergePipelineJarsToConfig(Set<URL> jarUrls, Configuration configuration) {
+ ConfigUtils.mergeCollectionsToConfig(
+ configuration,
+ PipelineOptions.JARS,
+ jarUrls.stream().map(URL::toString).collect(Collectors.toSet()),
+ String::toString,
+ String::toString);
+ }
+
@VisibleForTesting
public TableImpl createTable(QueryOperation tableOperation) {
return TableImpl.createTable(
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index bd624a37b7d..6a6c0c940ff 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.util.Preconditions;
@@ -65,9 +66,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public final class FunctionCatalog {
private final ReadableConfig config;
+ private final ResourceManager resourceManager;
private final CatalogManager catalogManager;
private final ModuleManager moduleManager;
- private ClassLoader classLoader;
private final Map<String, CatalogFunction> tempSystemFunctions = new LinkedHashMap<>();
private final Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions =
@@ -81,18 +82,13 @@ public final class FunctionCatalog {
public FunctionCatalog(
ReadableConfig config,
+ ResourceManager resourceManager,
CatalogManager catalogManager,
- ModuleManager moduleManager,
- ClassLoader classLoader) {
+ ModuleManager moduleManager) {
this.config = checkNotNull(config);
+ this.resourceManager = checkNotNull(resourceManager);
this.catalogManager = checkNotNull(catalogManager);
this.moduleManager = checkNotNull(moduleManager);
- this.classLoader = classLoader;
- }
-
- /** Updates the classloader, this is a temporary solution until FLINK-14055 is fixed. */
- public void updateClassLoader(ClassLoader classLoader) {
- this.classLoader = classLoader;
}
public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInferenceUtil) {
@@ -159,7 +155,7 @@ public final class FunctionCatalog {
normalizedIdentifier.toObjectPath(), catalogFunction);
}
try {
- validateAndPrepareFunction(catalogFunction);
+ validateAndPrepareFunction(identifier.asSummaryString(), catalogFunction);
} catch (Throwable t) {
throw new ValidationException(
String.format(
@@ -506,7 +502,7 @@ public final class FunctionCatalog {
final String normalizedName = FunctionIdentifier.normalizeName(name);
try {
- validateAndPrepareFunction(function);
+ validateAndPrepareFunction(name, function);
} catch (Throwable t) {
throw new ValidationException(
String.format(
@@ -634,7 +630,7 @@ public final class FunctionCatalog {
}
@SuppressWarnings("unchecked")
- private void validateAndPrepareFunction(CatalogFunction function)
+ private void validateAndPrepareFunction(String name, CatalogFunction function)
throws ClassNotFoundException {
// If the input is instance of UserDefinedFunction, it means it uses the new type inference.
// In this situation the UDF have not been validated and cleaned, so we need to validate it
@@ -649,9 +645,15 @@ public final class FunctionCatalog {
}
// Skip validation if it's not a UserDefinedFunction.
} else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
+ // If the jar resource of UDF used is not empty, register it to classloader before
+ // validate.
+ registerFunctionJarResources(name, function.getFunctionResources());
+
UserDefinedFunctionHelper.validateClass(
(Class<? extends UserDefinedFunction>)
- classLoader.loadClass(function.getClassName()));
+ resourceManager
+ .getUserClassLoader()
+ .loadClass(function.getClassName()));
}
}
@@ -662,15 +664,33 @@ public final class FunctionCatalog {
// directly.
return ((InlineCatalogFunction) function).getDefinition();
}
+ // If the jar resource of UDF used is not empty, register it to classloader before
+ // validate.
+ registerFunctionJarResources(name, function.getFunctionResources());
+
return UserDefinedFunctionHelper.instantiateFunction(
- classLoader,
+ resourceManager.getUserClassLoader(),
// future
config,
name,
function);
}
- /** The CatalogFunction which holds a instantiated UDF. */
+ private void registerFunctionJarResources(String functionName, List<ResourceUri> resourceUris) {
+ try {
+ if (!resourceUris.isEmpty()) {
+ resourceManager.registerJarResources(resourceUris);
+ }
+ } catch (Exception e) {
+ throw new TableException(
+ String.format(
+ "Failed to register jar resource '%s' of function '%s'.",
+ resourceUris, functionName),
+ e);
+ }
+ }
+
+ /** The CatalogFunction which holds an instantiated UDF. */
public static class InlineCatalogFunction implements CatalogFunction {
private final FunctionDefinition definition;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
index 49be84749ce..2596ed056f4 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
@@ -23,8 +23,11 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.JarUtils;
import org.apache.flink.util.MutableURLClassLoader;
@@ -41,6 +44,7 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -52,9 +56,19 @@ public class ResourceManager implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ResourceManager.class);
+ private static final String JAR_SUFFIX = "jar";
+ private static final String FILE_SCHEME = "file";
+
private final Path localResourceDir;
- private final Map<ResourceUri, URL> resourceInfos;
- private final MutableURLClassLoader userClassLoader;
+ protected final Map<ResourceUri, URL> resourceInfos;
+ protected final MutableURLClassLoader userClassLoader;
+
+ public static ResourceManager createResourceManager(
+ URL[] urls, ClassLoader parent, Configuration configuration) {
+ MutableURLClassLoader mutableURLClassLoader =
+ FlinkUserCodeClassLoaders.create(urls, parent, configuration);
+ return new ResourceManager(configuration, mutableURLClassLoader);
+ }
public ResourceManager(Configuration config, MutableURLClassLoader userClassLoader) {
this.localResourceDir =
@@ -65,78 +79,149 @@ public class ResourceManager implements Closeable {
this.userClassLoader = userClassLoader;
}
- public URLClassLoader getUserClassLoader() {
- return userClassLoader;
- }
+ /**
+ * Due to anyone of the resource in list maybe fail during register, so we should stage it
+ * before actual register to guarantee transaction process. If all the resources are available,
+ * register them into the {@link ResourceManager}.
+ */
+ public void registerJarResources(List<ResourceUri> resourceUris) throws IOException {
+ // check jar resource before register
+ checkJarResources(resourceUris);
- public void registerResource(ResourceUri resourceUri) throws IOException {
- // check whether the resource has been registered
- if (resourceInfos.containsKey(resourceUri)) {
- LOG.info(
- "Resource [{}] has been registered, overwriting of registered resource is not supported "
- + "in the current version, skipping.",
- resourceUri.getUri());
- return;
- }
+ Map<ResourceUri, URL> stagingResourceLocalURLs = new HashMap<>();
+ for (ResourceUri resourceUri : resourceUris) {
+ // check whether the resource has been registered
+ if (resourceInfos.containsKey(resourceUri) && resourceInfos.get(resourceUri) != null) {
+ LOG.info(
+ "Resource [{}] has been registered, overwriting of registered resource is not supported "
+ + "in the current version, skipping.",
+ resourceUri.getUri());
+ continue;
+ }
- // here can check whether the resource path is valid
- Path path = new Path(resourceUri.getUri());
- // check resource firstly
- checkResource(path);
-
- URL localUrl;
- // check resource scheme
- String scheme = StringUtils.lowerCase(path.toUri().getScheme());
- // download resource to local path firstly if in remote
- if (scheme != null && !"file".equals(scheme)) {
- localUrl = downloadResource(path);
- } else {
- localUrl = getURLFromPath(path);
- }
+ // here can check whether the resource path is valid
+ Path path = new Path(resourceUri.getUri());
+ URL localUrl;
+ // check resource scheme
+ String scheme = StringUtils.lowerCase(path.toUri().getScheme());
+ // download resource to local path firstly if in remote
+ if (scheme != null && !FILE_SCHEME.equals(scheme)) {
+ localUrl = downloadResource(path);
+ } else {
+ localUrl = getURLFromPath(path);
+ // if the local jar resource is a relative path, here convert it to absolute path
+ // before register
+ resourceUri = new ResourceUri(ResourceType.JAR, localUrl.getPath());
+ }
- // only need add jar resource to classloader
- if (ResourceType.JAR.equals(resourceUri.getResourceType())) {
- // check the Jar file firstly
+ // check the local jar file
JarUtils.checkJarFile(localUrl);
- // add it to classloader
- userClassLoader.addURL(localUrl);
- LOG.info("Added jar resource [{}] to class path.", localUrl);
+ // add it to staging map
+ stagingResourceLocalURLs.put(resourceUri, localUrl);
}
- resourceInfos.put(resourceUri, localUrl);
- LOG.info("Register resource [{}] successfully.", resourceUri.getUri());
+ // register resource in batch
+ stagingResourceLocalURLs.forEach(
+ (resourceUri, url) -> {
+ // jar resource need add to classloader
+ userClassLoader.addURL(url);
+ LOG.info("Added jar resource [{}] to class path.", url);
+
+ resourceInfos.put(resourceUri, url);
+ LOG.info("Register resource [{}] successfully.", resourceUri.getUri());
+ });
+ }
+
+ public URLClassLoader getUserClassLoader() {
+ return userClassLoader;
}
public Map<ResourceUri, URL> getResources() {
return Collections.unmodifiableMap(resourceInfos);
}
- public Set<URL> getJarResourceURLs() {
+ /**
+ * Get the local jars' URL. Return the URL corresponding to downloaded jars in the local file
+ * system for the remote jar. For the local jar, return the registered URL.
+ */
+ public Set<URL> getLocalJarResources() {
return resourceInfos.entrySet().stream()
.filter(entry -> ResourceType.JAR.equals(entry.getKey().getResourceType()))
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
}
- private void checkResource(Path path) throws IOException {
- FileSystem fs = path.getFileSystem();
- // check resource exists firstly
- if (!fs.exists(path)) {
- throw new FileNotFoundException(String.format("Resource [%s] not found.", path));
+ @Override
+ public void close() throws IOException {
+ resourceInfos.clear();
+
+ IOException exception = null;
+ try {
+ userClassLoader.close();
+ } catch (IOException e) {
+ LOG.debug("Error while closing user classloader.", e);
+ exception = e;
}
- // register directory is not allowed for resource
- if (fs.getFileStatus(path).isDir()) {
- throw new IOException(
+ FileSystem fileSystem = FileSystem.getLocalFileSystem();
+ try {
+ if (fileSystem.exists(localResourceDir)) {
+ fileSystem.delete(localResourceDir, true);
+ }
+ } catch (IOException ioe) {
+ LOG.debug(String.format("Error while delete directory [%s].", localResourceDir), ioe);
+ exception = ExceptionUtils.firstOrSuppressed(ioe, exception);
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ private void checkJarResources(List<ResourceUri> resourceUris) throws IOException {
+ // only support register jar resource
+ if (resourceUris.stream()
+ .anyMatch(resourceUri -> !ResourceType.JAR.equals(resourceUri.getResourceType()))) {
+ throw new ValidationException(
String.format(
- "The resource [%s] is a directory, however, the directory is not allowed for registering resource.",
- path));
+ "Only support to register jar resource, resource info:\n %s.",
+ resourceUris.stream()
+ .map(ResourceUri::getUri)
+ .collect(Collectors.joining(",\n"))));
+ }
+
+ for (ResourceUri resourceUri : resourceUris) {
+ // here can check whether the resource path is valid
+ Path path = new Path(resourceUri.getUri());
+ // file name should end with .jar suffix
+ String fileExtension = Files.getFileExtension(path.getName());
+ if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
+ throw new ValidationException(
+ String.format(
+ "The registering jar resource [%s] must ends with '.jar' suffix.",
+ path));
+ }
+
+ FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
+ // check resource exists firstly
+ if (!fs.exists(path)) {
+ throw new FileNotFoundException(
+ String.format("Jar resource [%s] not found.", path));
+ }
+
+ // register directory is not allowed for resource
+ if (fs.getFileStatus(path).isDir()) {
+ throw new ValidationException(
+ String.format(
+ "The registering jar resource [%s] is a directory that is not allowed.",
+ path));
+ }
}
}
@VisibleForTesting
- protected URL downloadResource(Path remotePath) throws IOException {
+ URL downloadResource(Path remotePath) throws IOException {
// get local resource path
Path localPath = getResourceLocalPath(remotePath);
try {
@@ -155,6 +240,20 @@ public class ResourceManager implements Closeable {
return getURLFromPath(localPath);
}
+ @VisibleForTesting
+ URL getURLFromPath(Path path) throws IOException {
+ // if scheme is null, rewrite it to file
+ if (path.toUri().getScheme() == null) {
+ path = path.makeQualified(FileSystem.getLocalFileSystem());
+ }
+ return path.toUri().toURL();
+ }
+
+ @VisibleForTesting
+ Path getLocalResourceDir() {
+ return localResourceDir;
+ }
+
private Path getResourceLocalPath(Path remotePath) {
String fileName = remotePath.getName();
String fileExtension = Files.getFileExtension(fileName);
@@ -172,26 +271,4 @@ public class ResourceManager implements Closeable {
}
return new Path(localResourceDir, fileNameWithUUID);
}
-
- @VisibleForTesting
- protected URL getURLFromPath(Path path) throws IOException {
- // if scheme is null, rewrite it to file
- if (path.toUri().getScheme() == null) {
- path = path.makeQualified(FileSystem.getLocalFileSystem());
- }
- return path.toUri().toURL();
- }
-
- @Override
- public void close() throws IOException {
- // close classloader
- userClassLoader.close();
- // clear the map
- resourceInfos.clear();
- // delete the local resource
- FileSystem fileSystem = localResourceDir.getFileSystem();
- if (fileSystem.exists(localResourceDir)) {
- fileSystem.delete(localResourceDir, true);
- }
- }
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index a60ea2f1348..fa9baacbd9e 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -27,11 +27,13 @@ import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
@@ -83,14 +85,18 @@ class FunctionCatalogTest {
moduleManager = new ModuleManager();
+ Configuration configuration = new Configuration();
functionCatalog =
new FunctionCatalog(
- new Configuration(),
+ configuration,
+ ResourceManager.createResourceManager(
+ new URL[0],
+ FunctionCatalogTest.class.getClassLoader(),
+ configuration),
CatalogManagerMocks.preparedCatalogManager()
.defaultCatalog(DEFAULT_CATALOG, catalog)
.build(),
- moduleManager,
- FunctionCatalogTest.class.getClassLoader());
+ moduleManager);
}
@Test
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
index b6f43d5df5d..df11a571cc5 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
@@ -19,73 +19,112 @@
package org.apache.flink.table.resource;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.FlinkUserCodeClassLoaders;
-import org.apache.flink.util.MutableURLClassLoader;
import org.apache.flink.util.UserClassLoaderJarTestUtils;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
-import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
/** Tests for {@link ResourceManager}. */
public class ResourceManagerTest {
- @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- public static final String LOWER_UDF_CLASS = "LowerUDF";
- public static final String LOWER_UDF_CODE =
- "public class "
- + LOWER_UDF_CLASS
- + " extends org.apache.flink.table.functions.ScalarFunction {\n"
- + " public String eval(String str) {\n"
- + " return str.toLowerCase();\n"
- + " }\n"
- + "}\n";
-
+ @TempDir private static File tempFolder;
private static File udfJar;
- @BeforeClass
+ private ResourceManager resourceManager;
+
+ @BeforeAll
public static void prepare() throws Exception {
udfJar =
UserClassLoaderJarTestUtils.createJarFile(
- temporaryFolder.newFolder("test-jar"),
+ tempFolder,
"test-classloader-udf.jar",
- LOWER_UDF_CLASS,
- LOWER_UDF_CODE);
+ GENERATED_LOWER_UDF_CLASS,
+ String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
+ }
+
+ @BeforeEach
+ public void before() {
+ resourceManager =
+ ResourceManager.createResourceManager(
+ new URL[0], getClass().getClassLoader(), new Configuration());
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ resourceManager.close();
}
@Test
public void testRegisterResource() throws Exception {
- ResourceManager resourceManager = createResourceManager(new URL[0]);
URLClassLoader userClassLoader = resourceManager.getUserClassLoader();
// test class loading before register resource
CommonTestUtils.assertThrows(
- String.format("LowerUDF"),
+ "LowerUDF",
ClassNotFoundException.class,
- () -> Class.forName(LOWER_UDF_CLASS, false, userClassLoader));
+ () -> Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader));
+ ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, udfJar.getPath());
// register the same jar repeatedly
- resourceManager.registerResource(new ResourceUri(ResourceType.JAR, udfJar.getPath()));
- resourceManager.registerResource(new ResourceUri(ResourceType.JAR, udfJar.getPath()));
+ resourceManager.registerJarResources(Arrays.asList(resourceUri, resourceUri));
+
+ // assert resource infos
+ Map<ResourceUri, URL> expected =
+ Collections.singletonMap(
+ resourceUri, resourceManager.getURLFromPath(new Path(udfJar.getPath())));
+
+ assertEquals(expected, resourceManager.getResources());
+
+ // test load class
+ final Class<?> clazz1 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader);
+ final Class<?> clazz2 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader);
+
+ assertEquals(clazz1, clazz2);
+ }
+
+ @Test
+ public void testRegisterResourceWithRelativePath() throws Exception {
+ URLClassLoader userClassLoader = resourceManager.getUserClassLoader();
+
+ // test class loading before register resource
+ CommonTestUtils.assertThrows(
+ "LowerUDF",
+ ClassNotFoundException.class,
+ () -> Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader));
+
+ ResourceUri resourceUri =
+ new ResourceUri(
+ ResourceType.JAR,
+ new File(".")
+ .getCanonicalFile()
+ .toPath()
+ .relativize(udfJar.toPath())
+ .toString());
+ // register jar
+ resourceManager.registerJarResources(Collections.singletonList(resourceUri));
// assert resource infos
Map<ResourceUri, URL> expected =
@@ -96,51 +135,58 @@ public class ResourceManagerTest {
assertEquals(expected, resourceManager.getResources());
// test load class
- final Class<?> clazz1 = Class.forName(LOWER_UDF_CLASS, false, userClassLoader);
- final Class<?> clazz2 = Class.forName(LOWER_UDF_CLASS, false, userClassLoader);
+ final Class<?> clazz1 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader);
+ final Class<?> clazz2 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader);
assertEquals(clazz1, clazz2);
-
- resourceManager.close();
}
@Test
public void testRegisterInvalidResource() throws Exception {
- ResourceManager resourceManager = createResourceManager(new URL[0]);
-
- // test register non-exist file
- final String fileUri =
- temporaryFolder.getRoot().getPath() + Path.SEPARATOR + "test-non-exist-file";
+ final String fileUri = tempFolder.getAbsolutePath() + Path.SEPARATOR + "test-file";
CommonTestUtils.assertThrows(
- String.format("Resource [%s] not found.", fileUri),
- FileNotFoundException.class,
+ String.format(
+ "Only support to register jar resource, resource info:\n %s.", fileUri),
+ ValidationException.class,
() -> {
- resourceManager.registerResource(new ResourceUri(ResourceType.FILE, fileUri));
+ resourceManager.registerJarResources(
+ Collections.singletonList(new ResourceUri(ResourceType.FILE, fileUri)));
return null;
});
// test register directory for jar resource
- final String jarUri = temporaryFolder.newFolder("test-jar-dir").getPath();
+ final String jarDir = tempFolder.getPath();
CommonTestUtils.assertThrows(
String.format(
- "The resource [%s] is a directory, however, the directory is not allowed for registering resource.",
- jarUri),
- IOException.class,
+ "The registering jar resource [%s] must ends with '.jar' suffix.", jarDir),
+ ValidationException.class,
() -> {
- resourceManager.registerResource(new ResourceUri(ResourceType.JAR, jarUri));
+ resourceManager.registerJarResources(
+ Collections.singletonList(new ResourceUri(ResourceType.JAR, jarDir)));
return null;
});
- resourceManager.close();
+ // test register directory for jar resource
+ String jarPath =
+ Files.createDirectory(Paths.get(tempFolder.getPath(), "test-jar.jar")).toString();
+
+ CommonTestUtils.assertThrows(
+ String.format(
+ "The registering jar resource [%s] is a directory that is not allowed.",
+ jarPath),
+ ValidationException.class,
+ () -> {
+ resourceManager.registerJarResources(
+ Collections.singletonList(new ResourceUri(ResourceType.JAR, jarPath)));
+ return null;
+ });
}
@Test
public void testDownloadResource() throws Exception {
Path srcPath = new Path(udfJar.getPath());
- ResourceManager resourceManager = createResourceManager(new URL[0]);
-
// test download resource to local path
URL localUrl = resourceManager.downloadResource(srcPath);
@@ -148,35 +194,12 @@ public class ResourceManagerTest {
byte[] actual = FileUtils.readAllBytes(Paths.get(localUrl.toURI()));
assertArrayEquals(expected, actual);
-
- resourceManager.close();
- }
-
- private ResourceManager createResourceManager(URL[] urls) {
- Configuration configuration = new Configuration();
- MutableURLClassLoader mutableURLClassLoader =
- createClassLoader(configuration, urls, getClass().getClassLoader());
- return new ResourceManager(configuration, mutableURLClassLoader);
}
- private MutableURLClassLoader createClassLoader(
- Configuration configuration, URL[] urls, ClassLoader parentClassLoader) {
- final String[] alwaysParentFirstLoaderPatterns =
- CoreOptions.getParentFirstLoaderPatterns(configuration);
-
- // According to the specified class load order,child-first or parent-first
- // child-first: load from this classloader firstly, if not found, then from parent
- // parent-first: load from parent firstly, if not found, then from this classloader
- final String classLoaderResolveOrder =
- configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
- FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
- FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
- return FlinkUserCodeClassLoaders.create(
- resolveOrder,
- urls,
- parentClassLoader,
- alwaysParentFirstLoaderPatterns,
- NOOP_EXCEPTION_HANDLER,
- true);
+ @Test
+ public void testCloseResourceManagerCleanDownloadedResources() throws Exception {
+ resourceManager.close();
+ FileSystem fileSystem = FileSystem.getLocalFileSystem();
+ assertFalse(fileSystem.exists(resourceManager.getLocalResourceDir()));
}
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index 0dec32abd5a..6a6384f90a3 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -25,6 +25,9 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceManager;
+
+import java.net.URL;
/** Mocking {@link TableEnvironment} for tests. */
public class TableEnvironmentMock extends TableEnvironmentImpl {
@@ -40,6 +43,7 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
protected TableEnvironmentMock(
CatalogManager catalogManager,
ModuleManager moduleManager,
+ ResourceManager userResourceManager,
TableConfig tableConfig,
ExecutorMock executor,
FunctionCatalog functionCatalog,
@@ -48,13 +52,12 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
super(
catalogManager,
moduleManager,
+ userResourceManager,
tableConfig,
executor,
functionCatalog,
planner,
- isStreamingMode,
- TableEnvironmentMock.class.getClassLoader());
-
+ isStreamingMode);
this.catalogManager = catalogManager;
this.executor = executor;
this.functionCatalog = functionCatalog;
@@ -73,12 +76,19 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
final TableConfig tableConfig = TableConfig.getDefault();
final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
final ModuleManager moduleManager = new ModuleManager();
+ final ResourceManager resourceManager =
+ ResourceManager.createResourceManager(
+ new URL[0],
+ Thread.currentThread().getContextClassLoader(),
+ tableConfig.getConfiguration());
+
return new TableEnvironmentMock(
catalogManager,
moduleManager,
+ resourceManager,
tableConfig,
createExecutor(),
- createFunctionCatalog(tableConfig, catalogManager, moduleManager),
+ createFunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager),
createPlanner(),
isStreamingMode);
}
@@ -88,9 +98,11 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
}
private static FunctionCatalog createFunctionCatalog(
- ReadableConfig config, CatalogManager catalogManager, ModuleManager moduleManager) {
- return new FunctionCatalog(
- config, catalogManager, moduleManager, TableEnvironmentMock.class.getClassLoader());
+ ReadableConfig config,
+ ResourceManager resourceManager,
+ CatalogManager catalogManager,
+ ModuleManager moduleManager) {
+ return new FunctionCatalog(config, resourceManager, catalogManager, moduleManager);
}
private static PlannerMock createPlanner() {
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/UserDefinedFunctions.java
similarity index 81%
rename from flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
rename to flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/UserDefinedFunctions.java
index d0cd826162f..81ef13dc23a 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/UserDefinedFunctions.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.client.gateway.utils;
+package org.apache.flink.table.utils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -29,16 +29,25 @@ import org.apache.flink.types.Row;
/** A bunch of UDFs for testing the SQL Client. */
public class UserDefinedFunctions {
- public static final String GENERATED_UDF_CLASS = "LowerUDF";
+ public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+ public static final String GENERATED_UPPER_UDF_CLASS = "UpperUDF";
- public static final String GENERATED_UDF_CODE =
+ public static final String GENERATED_LOWER_UDF_CODE =
"public class "
- + GENERATED_UDF_CLASS
+ + "%s"
+ " extends org.apache.flink.table.functions.ScalarFunction {\n"
+ " public String eval(String str) {\n"
+ " return str.toLowerCase();\n"
+ " }\n"
+ "}\n";
+ public static final String GENERATED_UPPER_UDF_CODE =
+ "public class "
+ + "%s"
+ + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+ + " public String eval(String str) {\n"
+ + " return str.toUpperCase();\n"
+ + " }\n"
+ + "}\n";
/** The scalar function for SQL Client test. */
public static class ScalarUDF extends ScalarFunction {
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 83d78a569d7..abea62f3261 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -33,12 +33,14 @@ import org.apache.flink.table.factories.PlannerFactoryUtil
import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedFunctionHelper}
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.operations._
+import org.apache.flink.table.resource.ResourceManager
import org.apache.flink.table.sources.{TableSource, TableSourceValidation}
import org.apache.flink.table.types.AbstractDataType
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.types.Row
-import org.apache.flink.util.Preconditions
+import org.apache.flink.util.{FlinkUserCodeClassLoaders, MutableURLClassLoader, Preconditions}
+import java.net.URL
import java.util.Optional
import scala.collection.JavaConverters._
@@ -51,22 +53,22 @@ import scala.collection.JavaConverters._
class StreamTableEnvironmentImpl(
catalogManager: CatalogManager,
moduleManager: ModuleManager,
+ resourceManager: ResourceManager,
functionCatalog: FunctionCatalog,
tableConfig: TableConfig,
scalaExecutionEnvironment: StreamExecutionEnvironment,
planner: Planner,
executor: Executor,
- isStreaming: Boolean,
- userClassLoader: ClassLoader)
+ isStreaming: Boolean)
extends AbstractStreamTableEnvironmentImpl(
catalogManager,
moduleManager,
+ resourceManager,
tableConfig,
executor,
functionCatalog,
planner,
isStreaming,
- userClassLoader,
scalaExecutionEnvironment.getWrappedStreamExecutionEnvironment)
with StreamTableEnvironment {
@@ -291,20 +293,24 @@ object StreamTableEnvironmentImpl {
def create(
executionEnvironment: StreamExecutionEnvironment,
settings: EnvironmentSettings): StreamTableEnvironmentImpl = {
- val classLoader = settings.getUserClassLoader
+ val userClassLoader: MutableURLClassLoader =
+ FlinkUserCodeClassLoaders.create(
+ new Array[URL](0),
+ settings.getUserClassLoader,
+ settings.getConfiguration)
val executor = AbstractStreamTableEnvironmentImpl.lookupExecutor(
- classLoader,
+ userClassLoader,
executionEnvironment.getWrappedStreamExecutionEnvironment)
val tableConfig = TableConfig.getDefault
tableConfig.setRootConfiguration(executor.getConfiguration)
tableConfig.addConfiguration(settings.getConfiguration)
+ val resourceManager = new ResourceManager(settings.getConfiguration, userClassLoader)
val moduleManager = new ModuleManager
-
val catalogManager = CatalogManager.newBuilder
- .classLoader(classLoader)
+ .classLoader(userClassLoader)
.config(tableConfig)
.defaultCatalog(
settings.getBuiltInCatalogName,
@@ -313,12 +319,12 @@ object StreamTableEnvironmentImpl {
.build
val functionCatalog =
- new FunctionCatalog(tableConfig, catalogManager, moduleManager, classLoader)
+ new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager)
val planner = PlannerFactoryUtil.createPlanner(
executor,
tableConfig,
- classLoader,
+ userClassLoader,
moduleManager,
catalogManager,
functionCatalog)
@@ -326,13 +332,13 @@ object StreamTableEnvironmentImpl {
new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
+ resourceManager,
functionCatalog,
tableConfig,
executionEnvironment,
planner,
executor,
- settings.isStreamingMode,
- classLoader
+ settings.isStreamingMode
)
}
}
diff --git a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
index 7bf9259a8b6..5a2823a64dc 100644
--- a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
@@ -23,12 +23,14 @@ import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.catalog.FunctionCatalog
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.operations.ModifyOperation
+import org.apache.flink.table.resource.ResourceManager
import org.apache.flink.table.utils.{CatalogManagerMocks, ExecutorMock, PlannerMock}
import org.apache.flink.types.Row
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
+import java.net.URL
import java.time.Duration
import java.util.{Collections, List => JList}
@@ -70,20 +72,21 @@ class StreamTableEnvironmentImplTest {
val tableConfig = TableConfig.getDefault
val catalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val moduleManager = new ModuleManager
+ val resourceManager = ResourceManager.createResourceManager(
+ new Array[URL](0),
+ Thread.currentThread.getContextClassLoader,
+ tableConfig.getConfiguration)
+
new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
- new FunctionCatalog(
- tableConfig,
- catalogManager,
- moduleManager,
- Thread.currentThread().getContextClassLoader),
+ resourceManager,
+ new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager),
tableConfig,
env,
new TestPlanner(elements.javaStream.getTransformation),
new ExecutorMock,
- true,
- this.getClass.getClassLoader)
+ true)
}
private class TestPlanner(transformation: Transformation[_]) extends PlannerMock {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.java
index bb3210faa58..c9e9e6faeaa 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.java
@@ -71,7 +71,7 @@ public class CompactManagedTableITCase extends BatchTestBase {
@Override
@Before
- public void before() {
+ public void before() throws Exception {
super.before();
MANAGED_TABLES.put(tableIdentifier, new AtomicReference<>());
referenceOfManagedTableFileEntries = new AtomicReference<>();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
index d7737b6c053..522eddc2eaa 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
@@ -35,7 +35,7 @@ import java.util.Arrays;
public class ForwardHashExchangeITCase extends BatchTestBase {
@Before
- public void before() {
+ public void before() throws Exception {
super.before();
env().getConfig().setDynamicGraph(true);
env().disableOperatorChaining();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
new file mode 100644
index 00000000000..77aa2bfa9df
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for catalog and system functions in a table environment. */
+public class FunctionITCase extends BatchTestBase {
+
+ private static final Random random = new Random();
+ private String udfClassName;
+ private String jarPath;
+
+ @Before
+ @Override
+ public void before() throws Exception {
+ super.before();
+ udfClassName = GENERATED_LOWER_UDF_CLASS + random.nextInt(50);
+ jarPath =
+ UserClassLoaderJarTestUtils.createJarFile(
+ TEMPORARY_FOLDER.newFolder(
+ String.format("test-jar-%s", UUID.randomUUID())),
+ "test-classloader-udf.jar",
+ udfClassName,
+ String.format(GENERATED_LOWER_UDF_CODE, udfClassName))
+ .toURI()
+ .toString();
+ }
+
+ @Test
+ public void testCreateTemporarySystemFunctionByUsingJar() {
+ String ddl1 =
+ String.format(
+ "CREATE TEMPORARY SYSTEM FUNCTION f10 AS '%s' USING JAR '%s'",
+ udfClassName, jarPath);
+ String ddl2 =
+ String.format(
+ "CREATE TEMPORARY SYSTEM FUNCTION f11 AS '%s' USING JAR '%s'",
+ udfClassName, jarPath);
+ tEnv().executeSql(ddl1);
+ tEnv().executeSql(ddl2);
+
+ List<String> functions = Arrays.asList(tEnv().listFunctions());
+ assertThat(functions).contains("f10");
+ assertThat(functions).contains("f11");
+
+ tEnv().executeSql("DROP TEMPORARY SYSTEM FUNCTION f10");
+ tEnv().executeSql("DROP TEMPORARY SYSTEM FUNCTION f11");
+
+ functions = Arrays.asList(tEnv().listFunctions());
+ assertThat(functions).doesNotContain("f10");
+ assertThat(functions).doesNotContain("f11");
+ }
+
+ @Test
+ public void testCreateCatalogFunctionByUsingJar() {
+ String ddl =
+ String.format(
+ "CREATE FUNCTION default_database.f11 AS '%s' USING JAR '%s'",
+ udfClassName, jarPath);
+ tEnv().executeSql(ddl);
+ assertThat(Arrays.asList(tEnv().listFunctions())).contains("f11");
+
+ tEnv().executeSql("DROP FUNCTION default_database.f11");
+ assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f11");
+ }
+
+ @Test
+ public void testCreateTemporaryCatalogFunctionByUsingJar() {
+ String ddl =
+ String.format(
+ "CREATE TEMPORARY FUNCTION default_database.f12 AS '%s' USING JAR '%s'",
+ udfClassName, jarPath);
+ tEnv().executeSql(ddl);
+ assertThat(Arrays.asList(tEnv().listFunctions())).contains("f12");
+
+ tEnv().executeSql("DROP TEMPORARY FUNCTION default_database.f12");
+ assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f12");
+ }
+
+ @Test
+ public void testUserDefinedTemporarySystemFunctionByUsingJar() throws Exception {
+ String functionDDL =
+ String.format(
+ "create temporary system function lowerUdf as '%s' using jar '%s'",
+ udfClassName, jarPath);
+
+ String dropFunctionDDL = "drop temporary system function lowerUdf";
+ testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+ }
+
+ @Test
+ public void testUserDefinedRegularCatalogFunctionByUsingJar() throws Exception {
+ String functionDDL =
+ String.format(
+ "create function lowerUdf as '%s' using jar '%s'", udfClassName, jarPath);
+
+ String dropFunctionDDL = "drop function lowerUdf";
+ testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+ }
+
+ @Test
+ public void testUserDefinedTemporaryCatalogFunctionByUsingJar() throws Exception {
+ String functionDDL =
+ String.format(
+ "create temporary function lowerUdf as '%s' using jar '%s'",
+ udfClassName, jarPath);
+
+ String dropFunctionDDL = "drop temporary function lowerUdf";
+ testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+ }
+
+ private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, String dropFunctionDDL)
+ throws Exception {
+ List<Row> sourceData =
+ Arrays.asList(
+ Row.of(1, "JARK"),
+ Row.of(2, "RON"),
+ Row.of(3, "LeoNard"),
+ Row.of(1, "FLINK"),
+ Row.of(2, "CDC"));
+
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+
+ String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')";
+ String sinkDDL = "create table t2(a int, b varchar) with ('connector' = 'COLLECTION')";
+
+ String query = "select a, lowerUdf(b) from t1";
+
+ tEnv().executeSql(sourceDDL);
+ tEnv().executeSql(sinkDDL);
+ tEnv().executeSql(createFunctionDDL);
+ Table t2 = tEnv().sqlQuery(query);
+ t2.executeInsert("t2").await();
+
+ List<Row> result = TestCollectionTableFactory.RESULT();
+ List<Row> expected =
+ Arrays.asList(
+ Row.of(1, "jark"),
+ Row.of(2, "ron"),
+ Row.of(3, "leonard"),
+ Row.of(1, "flink"),
+ Row.of(2, "cdc"));
+ assertThat(result).isEqualTo(expected);
+
+ tEnv().executeSql("drop table t1");
+ tEnv().executeSql("drop table t2");
+ // delete the function
+ tEnv().executeSql(dropFunctionDDL);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
index 778bb09fefe..21b6e670a18 100755
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
@@ -36,7 +36,7 @@ import java.util.Collections;
public class LocalAggregatePushDownITCase extends BatchTestBase {
@Before
- public void before() {
+ public void before() throws Exception {
super.before();
env().setParallelism(1); // set sink parallelism to 1
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index bd55a390e1d..ea65c24de88 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -52,7 +52,9 @@ import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+import org.junit.Before;
import org.junit.Test;
import java.lang.invoke.MethodHandle;
@@ -65,10 +67,14 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
@@ -76,7 +82,7 @@ import static org.assertj.core.api.Assertions.fail;
/**
* Tests for catalog and system functions in a table environment.
*
- * <p>Note: This class is meant for testing the core function support. Use {@link
+ * <p>Note: This class is meant for testing the core function support. Use {@code
* org.apache.flink.table.planner.functions.BuiltInFunctionTestBase} for testing individual function
* implementations.
*/
@@ -84,6 +90,26 @@ public class FunctionITCase extends StreamingTestBase {
private static final String TEST_FUNCTION = TestUDF.class.getName();
+ private static final Random random = new Random();
+ private String udfClassName;
+ private String jarPath;
+
+ @Before
+ @Override
+ public void before() throws Exception {
+ super.before();
+ udfClassName = GENERATED_LOWER_UDF_CLASS + random.nextInt(50);
+ jarPath =
+ UserClassLoaderJarTestUtils.createJarFile(
+ TEMPORARY_FOLDER.newFolder(
+ String.format("test-jar-%s", UUID.randomUUID())),
+ "test-classloader-udf.jar",
+ udfClassName,
+ String.format(GENERATED_LOWER_UDF_CODE, udfClassName))
+ .toURI()
+ .toString();
+ }
+
@Test
public void testCreateCatalogFunctionInDefaultCatalog() {
String ddl1 = "create function f1 as 'org.apache.flink.function.TestFunction'";
@@ -198,6 +224,45 @@ public class FunctionITCase extends StreamingTestBase {
tEnv().executeSql(ddl3);
}
+ @Test
+ public void testCreateTemporarySystemFunctionByUsingJar() {
+ String ddl =
+ String.format(
+ "CREATE TEMPORARY SYSTEM FUNCTION f10 AS '%s' USING JAR '%s'",
+ udfClassName, jarPath);
+ tEnv().executeSql(ddl);
+ assertThat(Arrays.asList(tEnv().listFunctions())).contains("f10");
+
+ tEnv().executeSql("DROP TEMPORARY SYSTEM FUNCTION f10");
+ assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f10");
+ }
+
+ @Test
+ public void testCreateCatalogFunctionByUsingJar() {
+ String ddl =
+ String.format(
+ "CREATE FUNCTION default_database.f11 AS '%s' USING JAR '%s'",
+ udfClassName, jarPath);
+ tEnv().executeSql(ddl);
+ assertThat(Arrays.asList(tEnv().listFunctions())).contains("f11");
+
+ tEnv().executeSql("DROP FUNCTION default_database.f11");
+ assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f11");
+ }
+
+ @Test
+ public void testCreateTemporaryCatalogFunctionByUsingJar() {
+ String ddl =
+ String.format(
+ "CREATE TEMPORARY FUNCTION default_database.f12 AS '%s' USING JAR '%s'",
+ udfClassName, jarPath);
+ tEnv().executeSql(ddl);
+ assertThat(Arrays.asList(tEnv().listFunctions())).contains("f12");
+
+ tEnv().executeSql("DROP TEMPORARY FUNCTION default_database.f12");
+ assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f12");
+ }
+
@Test
public void testAlterFunction() throws Exception {
String create = "create function f3 as 'org.apache.flink.function.TestFunction'";
@@ -366,6 +431,38 @@ public class FunctionITCase extends StreamingTestBase {
tEnv().executeSql(dropFunctionDDL);
}
+ @Test
+ public void testUserDefinedTemporarySystemFunctionByUsingJar() throws Exception {
+ String functionDDL =
+ String.format(
+ "create temporary system function lowerUdf as '%s' using jar '%s'",
+ udfClassName, jarPath);
+
+ String dropFunctionDDL = "drop temporary system function lowerUdf";
+ testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+ }
+
+ @Test
+ public void testUserDefinedRegularCatalogFunctionByUsingJar() throws Exception {
+ String functionDDL =
+ String.format(
+ "create function lowerUdf as '%s' using jar '%s'", udfClassName, jarPath);
+
+ String dropFunctionDDL = "drop function lowerUdf";
+ testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+ }
+
+ @Test
+ public void testUserDefinedTemporaryCatalogFunctionByUsingJar() throws Exception {
+ String functionDDL =
+ String.format(
+ "create temporary function lowerUdf as '%s' using jar '%s'",
+ udfClassName, jarPath);
+
+ String dropFunctionDDL = "drop temporary function lowerUdf";
+ testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+ }
+
@Test
public void testUserDefinedTemporarySystemFunction() throws Exception {
String functionDDL = "create temporary system function addOne as '" + TEST_FUNCTION + "'";
@@ -409,12 +506,51 @@ public class FunctionITCase extends StreamingTestBase {
Table t2 = tEnv().sqlQuery(query);
t2.executeInsert("t2").await();
- Row[] result = TestCollectionTableFactory.RESULT().toArray(new Row[0]);
- Row[] expected = sourceData.toArray(new Row[0]);
+ List<Row> result = TestCollectionTableFactory.RESULT();
+ assertThat(result).isEqualTo(sourceData);
+
+ tEnv().executeSql("drop table t1");
+ tEnv().executeSql("drop table t2");
+ }
+
+ private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, String dropFunctionDDL)
+ throws Exception {
+ List<Row> sourceData =
+ Arrays.asList(
+ Row.of(1, "JARK"),
+ Row.of(2, "RON"),
+ Row.of(3, "LeoNard"),
+ Row.of(1, "FLINK"),
+ Row.of(2, "CDC"));
+
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+
+ String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')";
+ String sinkDDL = "create table t2(a int, b varchar) with ('connector' = 'COLLECTION')";
+
+ String query = "select a, lowerUdf(b) from t1";
+
+ tEnv().executeSql(sourceDDL);
+ tEnv().executeSql(sinkDDL);
+ tEnv().executeSql(createFunctionDDL);
+ Table t2 = tEnv().sqlQuery(query);
+ t2.executeInsert("t2").await();
+
+ List<Row> result = TestCollectionTableFactory.RESULT();
+ List<Row> expected =
+ Arrays.asList(
+ Row.of(1, "jark"),
+ Row.of(2, "ron"),
+ Row.of(3, "leonard"),
+ Row.of(1, "flink"),
+ Row.of(2, "cdc"));
assertThat(result).isEqualTo(expected);
tEnv().executeSql("drop table t1");
tEnv().executeSql("drop table t2");
+ // delete the function
+ tEnv().executeSql(dropFunctionDDL);
}
@Test
@@ -596,7 +732,7 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testVarArgScalarFunction() throws Exception {
+ public void testVarArgScalarFunction() {
final List<Row> sourceData = Arrays.asList(Row.of("Bob", 1), Row.of("Alice", 2));
TestCollectionTableFactory.reset();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
index b07a859da50..b852661f34e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
@@ -31,11 +31,13 @@ import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerContext;
+import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelTraitDef;
+import java.net.URL;
import java.util.Collections;
import java.util.List;
@@ -58,6 +60,7 @@ public class PlannerMocks {
private PlannerMocks(
boolean isBatchMode,
TableConfig tableConfig,
+ ResourceManager resourceManager,
CatalogManager catalogManager,
List<RelTraitDef> traitDefs,
CalciteSchema rootSchema) {
@@ -67,11 +70,7 @@ public class PlannerMocks {
final ModuleManager moduleManager = new ModuleManager();
this.functionCatalog =
- new FunctionCatalog(
- tableConfig,
- catalogManager,
- moduleManager,
- PlannerMocks.class.getClassLoader());
+ new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager);
this.plannerContext =
new PlannerContext(
@@ -156,6 +155,11 @@ public class PlannerMocks {
private boolean batchMode = false;
private TableConfig tableConfig = TableConfig.getDefault();
private CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
+ private ResourceManager resourceManager =
+ ResourceManager.createResourceManager(
+ new URL[0],
+ Thread.currentThread().getContextClassLoader(),
+ tableConfig.getConfiguration());
private List<RelTraitDef> traitDefs = Collections.emptyList();
private CalciteSchema rootSchema;
@@ -176,6 +180,11 @@ public class PlannerMocks {
return this;
}
+ public Builder withResourceManager(ResourceManager resourceManager) {
+ this.resourceManager = resourceManager;
+ return this;
+ }
+
public Builder withCatalogManager(CatalogManager catalogManager) {
this.catalogManager = catalogManager;
return this;
@@ -192,7 +201,8 @@ public class PlannerMocks {
}
public PlannerMocks build() {
- return new PlannerMocks(batchMode, tableConfig, catalogManager, traitDefs, rootSchema);
+ return new PlannerMocks(
+ batchMode, tableConfig, resourceManager, catalogManager, traitDefs, rootSchema);
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 48c110c680b..2eb87e35cc8 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -30,6 +30,7 @@ import org.apache.flink.table.planner.expressions.utils.Func1
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
import org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction}
+import org.apache.flink.table.resource.ResourceManager
import org.apache.flink.table.utils.CatalogManagerMocks
import org.apache.calcite.rel.`type`.RelDataType
@@ -43,6 +44,7 @@ import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat, assertTrue
import org.junit.Test
import java.math.BigDecimal
+import java.net.URL
import java.time.ZoneId
import java.util.{Arrays, List => JList, TimeZone}
@@ -50,13 +52,15 @@ import scala.collection.JavaConverters._
/** Test for [[RexNodeExtractor]]. */
class RexNodeExtractorTest extends RexNodeTestBase {
+ val tableConfig = TableConfig.getDefault
+ val resourceManager = ResourceManager.createResourceManager(
+ new Array[URL](0),
+ Thread.currentThread.getContextClassLoader,
+ tableConfig.getConfiguration)
val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager()
val moduleManager = new ModuleManager
- private val functionCatalog = new FunctionCatalog(
- TableConfig.getDefault,
- catalogManager,
- moduleManager,
- classOf[RexNodeExtractorTest].getClassLoader)
+ private val functionCatalog =
+ new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager)
@Test
def testExtractRefInputFields(): Unit = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
index 7ec6be2a06d..d02495134f0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
@@ -22,15 +22,12 @@ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.types.Row
-import org.junit.{Assert, Before}
-
-import java.lang.AssertionError
+import org.junit.Before
import scala.collection.Seq
/** Batch [[FileSystemITCaseBase]]. */
abstract class BatchFileSystemITCaseBase extends BatchTestBase with FileSystemITCaseBase {
-
@Before
override def before(): Unit = {
super.before()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
index 7947b689f8d..ad51e49412c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
@@ -17,26 +17,21 @@
*/
package org.apache.flink.table.planner.runtime.batch.sql
-import org.apache.flink.client.ClientUtils
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.api.{EnvironmentSettings, TableConfig}
-import org.apache.flink.table.api.internal.TableEnvironmentImpl
import org.apache.flink.table.catalog.{CatalogPartitionImpl, CatalogPartitionSpec, ObjectPath}
-import org.apache.flink.table.planner.delegation.PlannerBase
import org.apache.flink.table.planner.factories.{TestValuesCatalog, TestValuesTableFactory}
import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.utils.TestingTableEnvironment
+import org.apache.flink.table.resource.{ResourceType, ResourceUri}
import org.apache.flink.util.UserClassLoaderJarTestUtils
import org.junit.{Before, Test}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import java.io.File
-import java.net.URL
import java.util
+import java.util.Collections
import scala.collection.JavaConversions._
@@ -44,40 +39,8 @@ import scala.collection.JavaConversions._
class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatalogFilter: Boolean)
extends BatchTestBase {
- final private val UDF_CLASS =
- s"""
- |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction {
- | public String eval(String str) {
- | return str.trim();
- | }
- |}
- |""".stripMargin
-
- private def overrideTableEnv(): Unit = {
- val tmpDir: File = TEMPORARY_FOLDER.newFolder()
- val udfJarFile: File =
- UserClassLoaderJarTestUtils.createJarFile(tmpDir, "flink-test-udf.jar", "TrimUDF", UDF_CLASS)
- val jars: util.List[URL] = util.Collections.singletonList(udfJarFile.toURI.toURL)
- val cl = ClientUtils.buildUserCodeClassLoader(
- jars,
- util.Collections.emptyList(),
- getClass.getClassLoader,
- new Configuration())
-
- settings = EnvironmentSettings.newInstance().inBatchMode().withClassLoader(cl).build()
- testingTableEnv = TestingTableEnvironment
- .create(settings, catalogManager = None, TableConfig.getDefault)
- tEnv = testingTableEnv
- planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
- env = planner.getExecEnv
- env.getConfig.enableObjectReuse()
- tableConfig = tEnv.getConfig
- }
-
@Before
override def before(): Unit = {
- // override TableEnvironment by a user defined classloader
- overrideTableEnv()
super.before()
env.setParallelism(1) // set sink parallelism to 1
@@ -217,6 +180,28 @@ class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatal
@Test
def testPartitionPrunerCompileClassLoader(): Unit = {
+ val udfJavaCode =
+ s"""
+ |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction {
+ | public String eval(String str) {
+ | return str.trim();
+ | }
+ |}
+ |""".stripMargin
+ val tmpDir = TEMPORARY_FOLDER.newFolder()
+ val udfJarFile =
+ UserClassLoaderJarTestUtils.createJarFile(
+ tmpDir,
+ "flink-test-udf.jar",
+ "TrimUDF",
+ udfJavaCode)
+
+ tEnv
+ .asInstanceOf[TestingTableEnvironment]
+ .getResourceManager
+ .registerJarResources(
+ Collections.singletonList(new ResourceUri(ResourceType.JAR, udfJarFile.toURI.toString)))
+
tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'")
checkResult(
"select * from PartitionableTable where trimUDF(part1) = 'A' and part2 > 1",
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 3eaca6e3809..35c21da04a6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -72,6 +72,7 @@ class BatchTestBase extends BatchAbstractTestBase {
"(?s)From line ([0-9]+),"
+ " column ([0-9]+) to line ([0-9]+), column ([0-9]+): (.*)")
+ @throws(classOf[Exception])
@Before
def before(): Unit = {
BatchTestBase.configForMiniCluster(tableConfig)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
index c622af7f69a..222bb656ecc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
@@ -43,6 +43,7 @@ class StreamingTestBase extends AbstractTestBase {
@Rule
def tempFolder: TemporaryFolder = _tempFolder
+ @throws(classOf[Exception])
@Before
@BeforeEach
def before(): Unit = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 1f99ea6e154..1b37f8dcb86 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -42,7 +42,7 @@ import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.factories.{FactoryUtil, PlannerFactoryUtil, StreamTableSourceFactory}
import org.apache.flink.table.functions._
import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation, SinkModifyOperation}
+import org.apache.flink.table.operations.{ModifyOperation, QueryOperation}
import org.apache.flink.table.planner.calcite.CalciteConfig
import org.apache.flink.table.planner.delegation.PlannerBase
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
@@ -57,6 +57,7 @@ import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, Tes
import org.apache.flink.table.planner.sinks.CollectRowTableSink
import org.apache.flink.table.planner.utils.PlanKind.PlanKind
import org.apache.flink.table.planner.utils.TableTestUtil.{replaceNodeIdInOperator, replaceStageId, replaceStreamNodeId}
+import org.apache.flink.table.resource.ResourceManager
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
@@ -64,6 +65,7 @@ import org.apache.flink.table.types.logical.LogicalType
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.table.typeutils.FieldInfoUtils
import org.apache.flink.types.Row
+import org.apache.flink.util.{FlinkUserCodeClassLoaders, MutableURLClassLoader}
import _root_.java.math.{BigDecimal => JBigDecimal}
import _root_.java.util
@@ -78,6 +80,7 @@ import org.junit.Rule
import org.junit.rules.{ExpectedException, TemporaryFolder, TestName}
import java.io.{File, IOException}
+import java.net.URL
import java.nio.file.{Files, Paths}
import java.time.Duration
@@ -1431,21 +1434,23 @@ class TestTableSourceFactory extends StreamTableSourceFactory[Row] {
class TestingTableEnvironment private (
catalogManager: CatalogManager,
moduleManager: ModuleManager,
+ resourceManager: ResourceManager,
tableConfig: TableConfig,
executor: Executor,
functionCatalog: FunctionCatalog,
planner: PlannerBase,
- isStreamingMode: Boolean,
- userClassLoader: ClassLoader)
+ isStreamingMode: Boolean)
extends TableEnvironmentImpl(
catalogManager,
moduleManager,
+ resourceManager,
tableConfig,
executor,
functionCatalog,
planner,
- isStreamingMode,
- userClassLoader) {
+ isStreamingMode) {
+
+ def getResourceManager: ResourceManager = resourceManager
// just for testing, remove this method while
// `<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);`
@@ -1507,10 +1512,14 @@ object TestingTableEnvironment {
catalogManager: Option[CatalogManager] = None,
tableConfig: TableConfig): TestingTableEnvironment = {
- val classLoader = settings.getUserClassLoader
+ val userClassLoader: MutableURLClassLoader =
+ FlinkUserCodeClassLoaders.create(
+ new Array[URL](0),
+ settings.getUserClassLoader,
+ settings.getConfiguration)
val executorFactory = FactoryUtil.discoverFactory(
- classLoader,
+ userClassLoader,
classOf[ExecutorFactory],
ExecutorFactory.DEFAULT_IDENTIFIER)
@@ -1519,13 +1528,14 @@ object TestingTableEnvironment {
tableConfig.setRootConfiguration(executor.getConfiguration)
tableConfig.addConfiguration(settings.getConfiguration)
+ val resourceManager = new ResourceManager(settings.getConfiguration, userClassLoader)
val moduleManager = new ModuleManager
val catalogMgr = catalogManager match {
case Some(c) => c
case _ =>
CatalogManager.newBuilder
- .classLoader(classLoader)
+ .classLoader(userClassLoader)
.config(tableConfig)
.defaultCatalog(
settings.getBuiltInCatalogName,
@@ -1536,21 +1546,27 @@ object TestingTableEnvironment {
}
val functionCatalog =
- new FunctionCatalog(settings.getConfiguration, catalogMgr, moduleManager, classLoader)
+ new FunctionCatalog(settings.getConfiguration, resourceManager, catalogMgr, moduleManager)
val planner = PlannerFactoryUtil
- .createPlanner(executor, tableConfig, classLoader, moduleManager, catalogMgr, functionCatalog)
+ .createPlanner(
+ executor,
+ tableConfig,
+ userClassLoader,
+ moduleManager,
+ catalogMgr,
+ functionCatalog)
.asInstanceOf[PlannerBase]
new TestingTableEnvironment(
catalogMgr,
moduleManager,
+ resourceManager,
tableConfig,
executor,
functionCatalog,
planner,
- settings.isStreamingMode,
- classLoader)
+ settings.isStreamingMode)
}
}