You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "David Morávek (Jira)" <ji...@apache.org> on 2022/03/01 09:43:00 UTC

[jira] [Created] (FLINK-26419) StreamConfig.toString uses a wrong classloader

David Morávek created FLINK-26419:
-------------------------------------

             Summary: StreamConfig.toString uses a wrong classloader
                 Key: FLINK-26419
                 URL: https://issues.apache.org/jira/browse/FLINK-26419
             Project: Flink
          Issue Type: Bug
          Components: API / Core
            Reporter: David Morávek


StreamConfig#toString method needs a classloader to de-serialize some bits to be able to provide a user friendly output. Unfortunately it doesn't has access to user classloader, so it uses whatever classloader that has loaded the StreamConfig class.

This method will break if we for example use a custom KeyPartitioner that comes from the user code, because we don't have a proper classloader for de-serializing it.

This problem becomes visible when extended serialization debug is enabled (which is useful eg. for debugging UDF serialization issues).

{code}
env.java.opts: "-Dsun.io.serialization.extendedDebugInfo=true"
{code}

Truncated exception snippet:

{code}
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate non chained outputs.
	at org.apache.flink.streaming.api.graph.StreamConfig.getNonChainedOutputs(StreamConfig.java:388) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamConfig.toString(StreamConfig.java:691) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at java.lang.String.valueOf(String.java:2951) ~[?:?]
	at java.lang.StringBuilder.append(StringBuilder.java:168) ~[?:?]
	at java.util.AbstractMap.toString(AbstractMap.java:556) ~[?:?]
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1422) ~[?:?]
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) ~[?:?]
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349) ~[?:?]
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:546) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamConfig.setTransitiveChainedTaskConfigs(StreamConfig.java:492) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:475) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:418) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:409) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:375) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:176) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:114) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:959) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.submitAndGetJobClientFuture(EmbeddedExecutor.java:122) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.execute(EmbeddedExecutor.java:104) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1956) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1833) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139) ~[beam-all.jar:?]
	at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:103) ~[beam-all.jar:?]
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323) ~[beam-all.jar:?]
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309) ~[beam-all.jar:?]
	at org.apache.dmvk.beam.TestPipeline.main(TestPipeline.java:51) ~[beam-all.jar:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.11-1.13.6.jar:1.13.6]
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.11-1.13.6.jar:1.13.6]
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.11-1.13.6.jar:1.13.6]
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.6.jar:1.13.6]
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.6.jar:1.13.6]
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.6.jar:1.13.6]
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.6.jar:1.13.6]
Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector
	at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?]
	at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
	at java.lang.Class.forName0(Native Method) ~[?:?]
	at java.lang.Class.forName(Class.java:398) ~[?:?]
	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1995) ~[?:?]
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1862) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2169) ~[?:?]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679) ~[?:?]
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464) ~[?:?]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) ~[?:?]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679) ~[?:?]
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464) ~[?:?]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) ~[?:?]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679) ~[?:?]
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:493) ~[?:?]
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:451) ~[?:?]
	at java.util.ArrayList.readObject(ArrayList.java:929) ~[?:?]
	at jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1175) ~[?:?]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2325) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196) ~[?:?]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679) ~[?:?]
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:493) ~[?:?]
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:451) ~[?:?]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	at org.apache.flink.streaming.api.graph.StreamConfig.getNonChainedOutputs(StreamConfig.java:385) ~[flink-dist_2.11-1.13.6.jar:1.13.6]
	... 53 more
{code}

Full log: https://gist.github.com/dmvk/012027477fbf2436b474007310df9cac



--
This message was sent by Atlassian Jira
(v8.20.1#820001)