You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/25 13:57:45 UTC
[1/2] flink git commit: [FLINK-7669] Always load Flink classes via
parent ClassLoader
Repository: flink
Updated Branches:
refs/heads/master 41e5a8338 -> 3bbff97b7
[FLINK-7669] Always load Flink classes via parent ClassLoader
Before, when setting classloader.resolve-order Flink classes where also
resolved via the child ClassLoader which could lead to problems if the
user-jar contains copies of Flink classes. This change introduces a new
setting classloader.parent-first-patterns. Classes that match this
pattern will always be resolved by the parent first, even when using the
child-first ClassLoader.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d88ab85
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d88ab85
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d88ab85
Branch: refs/heads/master
Commit: 2d88ab85ba8ff1f2e920e8a142c9b702c7f1e6da
Parents: 41e5a83
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 23 17:03:21 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 25 15:55:46 2017 +0200
----------------------------------------------------------------------
docs/ops/config.md | 7 +++++
.../state/RocksDbMultiClassLoaderTest.java | 4 +--
.../apache/flink/configuration/CoreOptions.java | 5 +++
.../librarycache/BlobLibraryCacheManager.java | 25 ++++++++++++---
.../librarycache/FlinkUserCodeClassLoaders.java | 31 ++++++++++++------
.../runtime/jobmaster/JobManagerServices.java | 9 +++++-
.../runtime/taskexecutor/TaskExecutor.java | 4 ++-
.../taskexecutor/TaskManagerConfiguration.java | 16 ++++++++--
.../flink/runtime/jobmanager/JobManager.scala | 8 ++++-
.../flink/runtime/taskmanager/TaskManager.scala | 4 ++-
.../BlobLibraryCacheManagerTest.java | 8 ++---
.../BlobLibraryCacheRecoveryITCase.java | 2 +-
.../jobmanager/JobManagerHARecoveryTest.java | 4 +--
.../JobManagerLeaderElectionTest.java | 2 +-
...askManagerComponentsStartupShutdownTest.java | 3 +-
.../runtime/util/JvmExitOnFatalErrorTest.java | 3 +-
.../runtime/tasks/BlockingCheckpointsTest.java | 3 +-
.../tasks/InterruptSensitiveRestoreTest.java | 3 +-
.../tasks/StreamTaskTerminationTest.java | 3 +-
.../test_streaming_classloader.sh | 33 ++++++++++++++++++++
20 files changed, 142 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index d476492..7720e3a 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -77,6 +77,13 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to
- `classloader.resolve-order`: Whether Flink should use a child-first `ClassLoader` when loading
user-code classes or a parent-first `ClassLoader`. Can be one of `parent-first` or `child-first`. (default: `child-first`)
+- `classloader.parent-first-patterns`: A (semicolon-separated) list of patterns that specifies which
+classes should always be resolved through the parent `ClassLoader` first. A pattern is a simple
+prefix that is checked against the fully qualified class name. By default, this is set to
+`java.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback`.
+If you want to change this setting you have to make sure to also include the default patterns in
+your list of patterns if you want to keep that default behaviour.
+
## Advanced Options
### Compute
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
index 1dbc05e..72c85ec 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
@@ -47,8 +47,8 @@ public class RocksDbMultiClassLoaderTest {
final URL codePath2 = RocksDB.class.getProtectionDomain().getCodeSource().getLocation();
final ClassLoader parent = getClass().getClassLoader();
- final ClassLoader loader1 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent);
- final ClassLoader loader2 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent);
+ final ClassLoader loader1 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent, new String[0]);
+ final ClassLoader loader2 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent, new String[0]);
final String className = RocksDBStateBackend.class.getName();
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index d1005c4..e8ab8e4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -31,6 +31,11 @@ public class CoreOptions {
.key("classloader.resolve-order")
.defaultValue("child-first");
+ public static final ConfigOption<String> ALWAYS_PARENT_FIRST_LOADER = ConfigOptions
+ .key("classloader.parent-first-patterns")
+ .defaultValue("java.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback");
+
+
// ------------------------------------------------------------------------
// process parameters
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 86ebfcc..b90e995 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -67,12 +67,21 @@ public class BlobLibraryCacheManager implements LibraryCacheManager {
/** The resolve order to use when creating a {@link ClassLoader}. */
private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;
+ /**
+ * List of patterns for classes that should always be resolved from the parent ClassLoader,
+ * if possible.
+ */
+ private final String[] alwaysParentFirstPatterns;
+
// --------------------------------------------------------------------------------------------
- public BlobLibraryCacheManager(PermanentBlobService blobService,
- FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) {
+ public BlobLibraryCacheManager(
+ PermanentBlobService blobService,
+ FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
+ String[] alwaysParentFirstPatterns) {
this.blobService = checkNotNull(blobService);
this.classLoaderResolveOrder = checkNotNull(classLoaderResolveOrder);
+ this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
}
@Override
@@ -118,7 +127,7 @@ public class BlobLibraryCacheManager implements LibraryCacheManager {
}
cacheEntries.put(jobId, new LibraryCacheEntry(
- requiredJarFiles, requiredClasspaths, urls, task, classLoaderResolveOrder));
+ requiredJarFiles, requiredClasspaths, urls, task, classLoaderResolveOrder, alwaysParentFirstPatterns));
} catch (Throwable t) {
// rethrow or wrap
ExceptionUtils.tryRethrowIOException(t);
@@ -243,19 +252,25 @@ public class BlobLibraryCacheManager implements LibraryCacheManager {
* <tt>requiredLibraries</tt> and <tt>requiredClasspaths</tt>)
* @param initialReference
* reference holder ID
+ * @param classLoaderResolveOrder Whether to resolve classes first in the child ClassLoader
+ * or parent ClassLoader
+ * @param alwaysParentFirstPatterns A list of patterns for classes that should always be
+ * resolved from the parent ClassLoader (if possible).
*/
LibraryCacheEntry(
Collection<PermanentBlobKey> requiredLibraries,
Collection<URL> requiredClasspaths,
URL[] libraryURLs,
ExecutionAttemptID initialReference,
- FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) {
+ FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
+ String[] alwaysParentFirstPatterns) {
this.classLoader =
FlinkUserCodeClassLoaders.create(
classLoaderResolveOrder,
libraryURLs,
- FlinkUserCodeClassLoaders.class.getClassLoader());
+ FlinkUserCodeClassLoaders.class.getClassLoader(),
+ alwaysParentFirstPatterns);
// NOTE: do not store the class paths, i.e. URLs, into a set for performance reasons
// see http://findbugs.sourceforge.net/bugDescriptions.html#DMI_COLLECTION_OF_URLS
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
index ef36c36..d40802e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
@@ -31,24 +31,23 @@ import java.util.List;
*/
public class FlinkUserCodeClassLoaders {
- public static URLClassLoader parentFirst(URL[] urls) {
- return new ParentFirstClassLoader(urls);
- }
-
public static URLClassLoader parentFirst(URL[] urls, ClassLoader parent) {
return new ParentFirstClassLoader(urls, parent);
}
- public static URLClassLoader childFirst(URL[] urls, ClassLoader parent) {
- return new ChildFirstClassLoader(urls, parent);
+ public static URLClassLoader childFirst(
+ URL[] urls,
+ ClassLoader parent,
+ String[] alwaysParentFirstPatterns) {
+ return new ChildFirstClassLoader(urls, parent, alwaysParentFirstPatterns);
}
public static URLClassLoader create(
- ResolveOrder resolveOrder, URL[] urls, ClassLoader parent) {
+ ResolveOrder resolveOrder, URL[] urls, ClassLoader parent, String[] alwaysParentFirstPatterns) {
switch (resolveOrder) {
case CHILD_FIRST:
- return childFirst(urls, parent);
+ return childFirst(urls, parent, alwaysParentFirstPatterns);
case PARENT_FIRST:
return parentFirst(urls, parent);
default:
@@ -95,14 +94,28 @@ public class FlinkUserCodeClassLoaders {
*/
static final class ChildFirstClassLoader extends URLClassLoader {
- public ChildFirstClassLoader(URL[] urls, ClassLoader parent) {
+ /**
+ * The classes that should always go through the parent ClassLoader. This is relevant
+ * for Flink classes, for example, to avoid loading Flink classes that cross the
+ * user-code/system-code barrier in the user-code ClassLoader.
+ */
+ private final String[] alwaysParentFirstPatterns;
+
+ public ChildFirstClassLoader(URL[] urls, ClassLoader parent, String[] alwaysParentFirstPatterns) {
super(urls, parent);
+ this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
}
@Override
protected synchronized Class<?> loadClass(
String name, boolean resolve) throws ClassNotFoundException {
+ for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {
+ if (name.startsWith(alwaysParentFirstPattern)) {
+ return super.loadClass(name, resolve);
+ }
+ }
+
// First, check if the class has already been loaded
Class<?> c = findLoadedClass(name);
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index f7daabb..6abb1af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -116,8 +116,15 @@ public class JobManagerServices {
final String classLoaderResolveOrder =
config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+ final String alwaysParentFirstLoaderString =
+ config.getString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER);
+ final String[] alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderString.split(";");
+
final BlobLibraryCacheManager libraryCacheManager =
- new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder));
+ new BlobLibraryCacheManager(
+ blobServer,
+ FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
+ alwaysParentFirstLoaderPatterns);
final FiniteDuration timeout;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d6295b9..cd67705 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -957,7 +957,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
taskManagerConfiguration.getConfiguration(),
haServices.createBlobStore());
libraryCacheManager = new BlobLibraryCacheManager(
- blobService.getPermanentBlobService(), taskManagerConfiguration.getClassLoaderResolveOrder());
+ blobService.getPermanentBlobService(),
+ taskManagerConfiguration.getClassLoaderResolveOrder(),
+ taskManagerConfiguration.getAlwaysParentFirstLoaderPatterns());
} catch (IOException e) {
// Can't pass the IOException up - we need a RuntimeException anyway
// two levels up where this is run asynchronously. Also, we don't
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 60dd643..f55e6f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -62,6 +62,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;
+ private final String[] alwaysParentFirstLoaderPatterns;
+
public TaskManagerConfiguration(
int numberSlots,
String[] tmpDirectories,
@@ -73,7 +75,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
long cleanupInterval,
Configuration configuration,
boolean exitJvmOnOutOfMemory,
- FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) {
+ FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
+ String[] alwaysParentFirstLoaderPatterns) {
this.numberSlots = numberSlots;
this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
@@ -85,6 +88,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
this.classLoaderResolveOrder = classLoaderResolveOrder;
+ this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;
}
public int getNumberSlots() {
@@ -130,6 +134,10 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
return classLoaderResolveOrder;
}
+ public String[] getAlwaysParentFirstLoaderPatterns() {
+ return alwaysParentFirstLoaderPatterns;
+ }
+
// --------------------------------------------------------------------------------------------
// Static factory methods
// --------------------------------------------------------------------------------------------
@@ -225,6 +233,9 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
final String classLoaderResolveOrder =
configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+ final String alwaysParentFirstLoaderString =
+ configuration.getString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER);
+ final String[] alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderString.split(";");
return new TaskManagerConfiguration(
numberSlots,
@@ -237,6 +248,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
cleanupInterval,
configuration,
exitOnOom,
- FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder));
+ FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
+ alwaysParentFirstLoaderPatterns);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index bbb8275..3bb7faa 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2483,6 +2483,9 @@ object JobManager {
val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
val classLoaderResolveOrder = configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER)
+ val alwaysParentFirstLoaderString =
+ configuration.getString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER)
+ val alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderString.split(';')
val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration)
@@ -2516,7 +2519,10 @@ object JobManager {
instanceManager = new InstanceManager()
scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
libraryCacheManager =
- new BlobLibraryCacheManager(blobServer, ResolveOrder.fromString(classLoaderResolveOrder))
+ new BlobLibraryCacheManager(
+ blobServer,
+ ResolveOrder.fromString(classLoaderResolveOrder),
+ alwaysParentFirstLoaderPatterns)
instanceManager.addInstanceListener(scheduler)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 75708d1..1219fc9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -968,7 +968,9 @@ class TaskManager(
blobCache = Option(blobcache)
libraryCacheManager = Some(
new BlobLibraryCacheManager(
- blobcache.getPermanentBlobService, config.getClassLoaderResolveOrder()))
+ blobcache.getPermanentBlobService,
+ config.getClassLoaderResolveOrder(),
+ config.getAlwaysParentFirstLoaderPatterns))
}
catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 080f743..83b6814 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -92,7 +92,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
keys1.add(server.putPermanent(jobId1, buf));
keys2.add(server.putPermanent(jobId2, buf));
- libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+ libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]);
cache.registerJob(jobId1);
cache.registerJob(jobId2);
@@ -222,7 +222,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
buf[0] += 1;
keys.add(server.putPermanent(jobId, buf));
- libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+ libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]);
cache.registerJob(jobId);
assertEquals(0, libCache.getNumberOfManagedJobs());
@@ -334,7 +334,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
buf[0] += 1;
keys.add(server.putPermanent(jobId, buf));
- libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+ libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]);
cache.registerJob(jobId);
assertEquals(0, libCache.getNumberOfManagedJobs());
@@ -440,7 +440,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
PermanentBlobKey dataKey1 = server.putPermanent(jobId, new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
PermanentBlobKey dataKey2 = server.putPermanent(jobId, new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
- libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+ libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]);
assertEquals(0, libCache.getNumberOfManagedJobs());
checkFileCountForJob(2, jobId, server);
checkFileCountForJob(0, jobId, cache);
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 2eac17a..ad18ae3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -83,7 +83,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
server[i] = new BlobServer(config, blobStoreService);
server[i].start();
serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
- libServer[i] = new BlobLibraryCacheManager(server[i], FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+ libServer[i] = new BlobLibraryCacheManager(server[i], FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]);
}
// Random data
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 0907800..d4bed23 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -198,7 +198,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
instanceManager,
scheduler,
blobServer,
- new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+ new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]),
archive,
new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
timeout,
@@ -372,7 +372,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
mock(InstanceManager.class),
mock(Scheduler.class),
blobServer,
- new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+ new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]),
ActorRef.noSender(),
new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
timeout,
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index b97bf1a..c3b57fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -191,7 +191,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
new InstanceManager(),
new Scheduler(TestingUtils.defaultExecutionContext()),
blobServer,
- new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+ new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]),
ActorRef.noSender(),
new NoRestartStrategy.NoRestartStrategyFactory(),
AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 08ebbb3..10b74c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -123,7 +123,8 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
1000000, // cleanup interval
config,
false, // exit-jvm-on-fatal-error
- FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+ FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+ new String[0]);
final int networkBufNum = 32;
// note: the network buffer memory configured here is not actually used below but set
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 82b6b29..8072295 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -186,7 +186,8 @@ public class JvmExitOnFatalErrorTest {
blobService,
new BlobLibraryCacheManager(
blobService.getPermanentBlobService(),
- FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+ FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+ new String[0]),
new FileCache(tmInfo.getTmpDirectories()),
tmInfo,
new UnregisteredTaskMetricsGroup(),
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 5a38615..a87e440 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -164,7 +164,8 @@ public class BlockingCheckpointsTest {
blobService,
new BlobLibraryCacheManager(
blobService.getPermanentBlobService(),
- FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+ FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+ new String[0]),
new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
new TestingTaskManagerRuntimeInfo(),
new UnregisteredTaskMetricsGroup(),
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index d049e31..c641aa8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -265,7 +265,8 @@ public class InterruptSensitiveRestoreTest {
blobService,
new BlobLibraryCacheManager(
blobService.getPermanentBlobService(),
- FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+ FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+ new String[0]),
new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
new TestingTaskManagerRuntimeInfo(),
new UnregisteredTaskMetricsGroup(),
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 7eb2655..b07b735 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -161,7 +161,8 @@ public class StreamTaskTerminationTest extends TestLogger {
blobService,
new BlobLibraryCacheManager(
blobService.getPermanentBlobService(),
- FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+ FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+ new String[0]),
mock(FileCache.class),
taskManagerRuntimeInfo,
new UnregisteredTaskMetricsGroup(),
http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/test-infra/end-to-end-test/test_streaming_classloader.sh
----------------------------------------------------------------------
diff --git a/test-infra/end-to-end-test/test_streaming_classloader.sh b/test-infra/end-to-end-test/test_streaming_classloader.sh
index efbf98e..8bc6858 100755
--- a/test-infra/end-to-end-test/test_streaming_classloader.sh
+++ b/test-infra/end-to-end-test/test_streaming_classloader.sh
@@ -45,6 +45,7 @@ GIT_REMOTE_URL=`grep "git\.remote\.origin\.url" $TEST_INFRA_DIR/../../flink-runt
# remove any leftover classloader settings
sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
+sed -i -e 's/classloader.parent-first-patterns: .*//' $FLINK_DIR/conf/flink-conf.yaml
echo "classloader.resolve-order: parent-first" >> "$FLINK_DIR/conf/flink-conf.yaml"
start_cluster
@@ -68,11 +69,42 @@ if [[ "$OUTPUT" != "$EXPECTED" ]]; then
PASS=""
fi
+# This verifies that Flink classes are still resolved from the parent because the default
+# "parent-first-pattern" is "org.apache.flink"
+echo "Testing child-first class loading with Flink classes loaded via parent"
+
+# remove any leftover classloader settings
+sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
+sed -i -e 's/classloader.parent-first-patterns: .*//' $FLINK_DIR/conf/flink-conf.yaml
+echo "classloader.resolve-order: child-first" >> "$FLINK_DIR/conf/flink-conf.yaml"
+
+start_cluster
+
+$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --resolve-order parent-first --output $TEST_DATA_DIR/out/cl_out_cf_pf
+
+stop_cluster
+
+# remove classloader settings again
+sed -i -e 's/classloader.resolve-order: .*//' $FLINK_DIR/conf/flink-conf.yaml
+
+OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_cf_pf`
+# first field: whether we found the method on TaskManager
+# result of getResource(".version.properties"), should be from the child
+# ordered result of getResources(".version.properties"), should be child first
+EXPECTED="NoSuchMethodError:hello-there-42:hello-there-42${GIT_REMOTE_URL}"
+if [[ "$OUTPUT" != "$EXPECTED" ]]; then
+ echo "Output from Flink program does not match expected output."
+ echo -e "EXPECTED: $EXPECTED"
+ echo -e "ACTUAL: $OUTPUT"
+ PASS=""
+fi
+
echo "Testing child-first class loading"
# remove any leftover classloader settings
sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
echo "classloader.resolve-order: child-first" >> "$FLINK_DIR/conf/flink-conf.yaml"
+echo "classloader.parent-first-patterns: foo.bar" >> "$FLINK_DIR/conf/flink-conf.yaml"
start_cluster
@@ -82,6 +114,7 @@ stop_cluster
# remove classloader settings again
sed -i -e 's/classloader.resolve-order: .*//' $FLINK_DIR/conf/flink-conf.yaml
+sed -i -e 's/classloader.parent-first-patterns: .*//' $FLINK_DIR/conf/flink-conf.yaml
OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_cf`
# first field: whether we found the method on TaskManager
[2/2] flink git commit: [FLINK-7669] Add ClassLoader resolution order
to classloader doc
Posted by al...@apache.org.
[FLINK-7669] Add ClassLoader resolution order to classloader doc
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bbff97b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bbff97b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bbff97b
Branch: refs/heads/master
Commit: 3bbff97b7e86a361fb77e8b243417fa5e1c60b7a
Parents: 2d88ab8
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Oct 23 17:05:57 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 25 15:56:21 2017 +0200
----------------------------------------------------------------------
docs/monitoring/debugging_classloading.md | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3bbff97b/docs/monitoring/debugging_classloading.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/debugging_classloading.md b/docs/monitoring/debugging_classloading.md
index 4f57c10..414eef6 100644
--- a/docs/monitoring/debugging_classloading.md
+++ b/docs/monitoring/debugging_classloading.md
@@ -57,10 +57,22 @@ YARN classloading differs between single job deployments and sessions:
**Mesos**
-Mesos setups following [this documentation](../ops/deployment/mesos.html) currently behave very much like the a
+Mesos setups following [this documentation](../ops/deployment/mesos.html) currently behave very much like the a
YARN session: The TaskManager and JobManager processes are started with the Flink framework classes in classpath, job
classes are loaded dynamically when the jobs are submitted.
+## Configuring ClassLoader Resolution Order
+
+Flink uses a hierarchy of ClassLoaders for loading classes from the user-code jar(s). The user-code
+ClassLoader has a reference to the parent ClassLoader, which is the default Java ClassLoader in most
+cases. By default, Java ClassLoaders will first look for classes in the parent ClassLoader and then in
+the child ClassLoader for cases where we have a hierarchy of ClassLoaders. This is problematic if you
+have in your user jar a version of a library that conflicts with a version that comes with Flink. You can
+change this behaviour by configuring the ClassLoader resolution order via
+`classloader.resolve-order: child-first` in the Flink config. However, Flink classes will still
+be resolved through the parent ClassLoader first, although you can also configure this via
+`classloader.parent-first-patterns` (see [config](../ops/config.html))
+
## Avoiding Dynamic Classloading
@@ -68,11 +80,11 @@ All components (JobManger, TaskManager, Client, ApplicationMaster, ...) log thei
They can be found as part of the environment information at the beginning of the log.
When running a setup where the Flink JobManager and TaskManagers are exclusive to one particular job, one can put JAR files
-directly into the `/lib` folder to make sure they are part of the classpath and not loaded dynamically.
+directly into the `/lib` folder to make sure they are part of the classpath and not loaded dynamically.
It usually works to put the job's JAR file into the `/lib` directory. The JAR will be part of both the classpath
(the *AppClassLoader*) and the dynamic class loader (*FlinkUserCodeClassLoader*).
-Because the AppClassLoader is the parent of the FlinkUserCodeClassLoader (and Java loads parent-first), this should
+Because the AppClassLoader is the parent of the FlinkUserCodeClassLoader (and Java loads parent-first, by default), this should
result in classes being loaded only once.
For setups where the job's JAR file cannot be put to the `/lib` folder (for example because the setup is a session that is