You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2023/03/02 15:45:00 UTC

[jira] [Commented] (FLINK-31243) KryoSerializer when loaded from user code classloader cannot load Scala extensions from app classloader

    [ https://issues.apache.org/jira/browse/FLINK-31243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695789#comment-17695789 ] 

Martijn Visser commented on FLINK-31243:
----------------------------------------

| Placing flink-scala in the application classpath eliminates the error, but as I understand it, the point of eliminating Scala from the Flink application classloader was to allow the only Scala dependencies to be loaded by the user code classloader. 

The point in the blog post is that you can now have a Scala-free user-code classpath (because you only use the Java APIs or because you want to build your application in Scala with the Java APIs) if you want to, by removing the flink-scala-* from the lib folder. 

If you want to use Scala (as you want), that is not an option. 

> KryoSerializer when loaded from user code classloader cannot load Scala extensions from app classloader
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-31243
>                 URL: https://issues.apache.org/jira/browse/FLINK-31243
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.15.3, 1.16.1
>         Environment: OS: Amazon Linux 2
> JVM: Amazon Corretto 11
>  
>            Reporter: Amit Gurdasani
>            Priority: Major
>
> The [KryoSerializer|https://github.com/apache/flink/blob/9bf0d9f2c2bcb2bc0c8ab6228bb0a9e76e10ad70/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java] uses Class.forName() to dynamically load Scala extensions by name. This seems to imply that it references only its own classloader to find these extensions. By default, as the application classloader is favored for KryoSerializer, this implies that unless the flink-scala artifact is available to the application classloader, the Scala extensions cannot be loaded. Scala applications that include flink-scala are therefore unable to benefit from the Scala extensions to the Kryo Serializer.
> Exception looks like this:
> {noformat}
> java.lang.ClassNotFoundException: org.apache.flink.runtime.types.FlinkScalaKryoInstantiator
>     at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>     at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>     at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>     at java.base/java.lang.Class.forName0(Native Method)
>     at java.base/java.lang.Class.forName(Class.java:315)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:486)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:521)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryo(KryoSerializer.java:720)
>     at software.amazon.kinesisanalytics.kryotest.Main.main(Main.java:16)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>     at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
>     at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
>     at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:239)
>     at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829){noformat}
> Example code resulting in this issue:
> Main class for Flink application:
> {noformat}
> package software.amazon.kinesisanalytics.kryotest;
> import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.io.Serializable;
> public class Main {
>     private static class Something implements Serializable {
>         public static long serialVersionUID = 289034745902347830L;
>     }
>     public static void main(String... args) {
>         StreamExecutionEnvironment executionEnvironment = new StreamExecutionEnvironment();
>         KryoSerializer<Something> serializer = new KryoSerializer<>(Something.class, executionEnvironment.getConfig());
>         serializer.getKryo();
>     }
> }
> {noformat}
> build.gradle for Flink application:
> {code:java}
> plugins {
>     id 'application'
>     id 'java'
>     id 'com.github.johnrengelman.shadow' version '7.1.2'
> }
> group 'software.amazon.kinesisanalytics'
> version '0.1'
> repositories {
>     mavenCentral()
> }
> dependencies {
>     compileOnly 'org.apache.flink:flink-core:1.15.2'
>     compileOnly 'org.apache.flink:flink-streaming-java:1.15.2'
>     implementation 'org.apache.flink:flink-scala_2.12:1.15.2'
> }
> shadowJar {
>     dependencies {
>         exclude(dependency('com.esotericsoftware.kryo:.*:.*'))
>         exclude(dependency('com.esotericsoftware.minlog:.*:.*'))
>         exclude(dependency('com.twitter:.*:.*'))
>         exclude(dependency('org.apache.flink:flink-core:.*'))
>         exclude(dependency('org.apache.flink:flink-streaming-java:.*'))
>         exclude(dependency('org.scala-lang:.*:.*'))
>     }
> }
> mainClassName = 'software.amazon.kinesisanalytics.kryotest.Main'
>  {code}
> Note that the application jar does not include Kryo itself, nor flink-core, but does include flink-scala.
> Placing flink-scala in the application classpath eliminates the error, but as I understand it, the [point of eliminating Scala|https://flink.apache.org/2022/02/22/scala-free-in-one-fifteen/] from the Flink application classloader was to allow the only Scala dependencies to be loaded by the user code classloader. This issue prevents that from being achieved for the Scala extensions to the Kryo Serializer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)