You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Amit Gurdasani (Jira)" <ji...@apache.org> on 2023/02/27 12:01:00 UTC

[jira] [Created] (FLINK-31243) KryoSerializer when loaded from user code classloader cannot load Scala extensions from app classloader

Amit Gurdasani created FLINK-31243:
--------------------------------------

             Summary: KryoSerializer when loaded from user code classloader cannot load Scala extensions from app classloader
                 Key: FLINK-31243
                 URL: https://issues.apache.org/jira/browse/FLINK-31243
             Project: Flink
          Issue Type: Bug
          Components: API / Core
    Affects Versions: 1.16.1, 1.15.3
         Environment: OS: Amazon Linux 2

JVM: Amazon Corretto 11

 
            Reporter: Amit Gurdasani


The [KryoSerializer|https://github.com/apache/flink/blob/9bf0d9f2c2bcb2bc0c8ab6228bb0a9e76e10ad70/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java] uses Class.forName() to dynamically load Scala extensions by name. This seems to imply that it references only its own classloader to find these extensions. By default, as the application classloader is favored for KryoSerializer, this implies that unless the flink-scala artifact is available to the application classloader, the Scala extensions cannot be loaded. Scala applications that include flink-scala are therefore unable to benefit from the Scala extensions to the Kryo Serializer.

Exception looks like this:
{noformat}
java.lang.ClassNotFoundException: org.apache.flink.runtime.types.FlinkScalaKryoInstantiator
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Class.java:315)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:486)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:521)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryo(KryoSerializer.java:720)
    at software.amazon.kinesisanalytics.kryotest.Main.main(Main.java:16)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
    at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:239)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829){noformat}
Example code resulting in this issue:

Main class for Flink application:
{noformat}
package software.amazon.kinesisanalytics.kryotest;

import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.Serializable;

public class Main {
    private static class Something implements Serializable {
        public static long serialVersionUID = 289034745902347830L;
    }

    public static void main(String... args) {
        StreamExecutionEnvironment executionEnvironment = new StreamExecutionEnvironment();
        KryoSerializer<Something> serializer = new KryoSerializer<>(Something.class, executionEnvironment.getConfig());
        serializer.getKryo();
    }
}
{noformat}
build.gradle for Flink application:
{code:java}
plugins {
    id 'application'
    id 'java'
    id 'com.github.johnrengelman.shadow' version '7.1.2'
}

group 'software.amazon.kinesisanalytics'
version '0.1'

repositories {
    mavenCentral()
}

dependencies {
    compileOnly 'org.apache.flink:flink-core:1.15.2'
    compileOnly 'org.apache.flink:flink-streaming-java:1.15.2'
    implementation 'org.apache.flink:flink-scala_2.12:1.15.2'
}

shadowJar {
    dependencies {
        exclude(dependency('com.esotericsoftware.kryo:.*:.*'))
        exclude(dependency('com.esotericsoftware.minlog:.*:.*'))
        exclude(dependency('com.twitter:.*:.*'))
        exclude(dependency('org.apache.flink:flink-core:.*'))
        exclude(dependency('org.apache.flink:flink-streaming-java:.*'))
        exclude(dependency('org.scala-lang:.*:.*'))
    }
}

mainClassName = 'software.amazon.kinesisanalytics.kryotest.Main'
 {code}
Note that the application jar does not include Kryo itself, nor flink-core, but does include flink-scala.

Placing flink-scala in the application classpath eliminates the error, but as I understand it, the [point of eliminating Scala|https://flink.apache.org/2022/02/22/scala-free-in-one-fifteen/] from the Flink application classloader was to allow the only Scala dependencies to be loaded by the user code classloader. This issue prevents that from being achieved for the Scala extensions to the Kryo Serializer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)