You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2020/01/02 14:31:00 UTC

[jira] [Commented] (FLINK-15355) Nightly streaming file sink fails with unshaded hadoop

    [ https://issues.apache.org/jira/browse/FLINK-15355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006836#comment-17006836 ] 

Piotr Nowojski commented on FLINK-15355:
----------------------------------------

Adapting [my comment from the pull request|https://github.com/apache/flink/pull/10678#pullrequestreview-336577210]

I would be against 4th option proposed by [~arvid heise]. I think it’s really important to all SPI classes be parent loaded first, otherwise very bad things can happen. For example if user accidentally embeds a different version (1.9.2 vs 1.9.1) of org.apache.flink classes. How I understand the issue:

If you have a SPI class (like Foo), if you implement it's method (like Bar Foo#bar() \{ return new Bar(); }), it's important that every object/class that crosses the boundary between plugin's implementation and the core system (like Bar) must originate from the core's system class loader. If not, if they are incompatible, you risk all kind of the dependency convergence errors (deadlocks, livelocks, method not found) if the versions of Bar are not binary compatible. Additionally if this Bar is passed around, it can crash at any place/point of time, very far away from the place it was created. Also using older plugin versions can just result in having an unfixed bugs, for example if Bar in 1.9.2 has some bug fixed.

Disclaimer: I don't have that much of an experience with class loading in Java and especially loading user classes, however all of the systems that I have seen, that were doing similar things (loading user classes, especially in a separate class loader), strongly emphasise "parent first" pattern for all of the dependencies that are on the boundary of the SPI.

https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/server/PluginClassLoader.java#L41
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java#L42

I would be more inclined towards something like [~banmoy]'s option 1. or 2.

However probably we can choose 1. for the sake of backward compatibility with non plugin based class loading ({{classloader.parent-first-patterns.default}} is used in more places, not only in plugins). 

*What is the reason, why {{org.apache.hadoop}} is added to this list in the first place?* For plugins I think we should be able to drop it. Unless I'm missing something, I do not see a reason why {{org.apache.hadoop}} should be loaded parent first for any plugin - that would kind of defeat the purpose of any plugin using hadoop. As I wrote above, for Plugins, only SPI classes should be parent first loaded and {{org.apache.hadoop}} should never be a part of any SPI that we expose (unless I'm missing something...). 

If I'm not missing anything, I would vote for duplicating {{classloader.parent-first-patterns.additional}} and {{classloader.parent-first-patterns.default}} to something like {{classloader.plugin-parent-first-patterns.*}} and dropping maybe not only {{org.apache.hadoop}}, but maybe almost everything from there.

> Nightly streaming file sink fails with unshaded hadoop
> ------------------------------------------------------
>
>                 Key: FLINK-15355
>                 URL: https://issues.apache.org/jira/browse/FLINK-15355
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystems
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Arvid Heise
>            Assignee: PengFei Li
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.10.0
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>  at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>  at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>  at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>  at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>  at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>  at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>  at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1751)
>  at org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
>  at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
>  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1628)
>  at StreamingFileSinkProgram.main(StreamingFileSinkProgram.java:77)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>  ... 11 more
> Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>  at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1746)
>  ... 20 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
>  at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:326)
>  at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>  at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
>  at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>  at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>  at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
>  at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
>  at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>  at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>  at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>  at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
>  at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>  ... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
>  at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
>  at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
>  at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
>  at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>  ... 7 more
> Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.getTimeDuration(Ljava/lang/String;Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J
>  at org.apache.hadoop.fs.s3a.S3ARetryPolicy.<init>(S3ARetryPolicy.java:113)
>  at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:257)
>  at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:126)
>  at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
>  at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441)
>  at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>  at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>  at org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage.<init>(MemoryBackendCheckpointStorage.java:85)
>  at org.apache.flink.runtime.state.memory.MemoryStateBackend.createCheckpointStorage(MemoryStateBackend.java:295)
>  at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:279)
>  at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:205)
>  at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486)
>  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
>  at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:245)
>  at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:217)
>  at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:205)
>  at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
>  at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
>  at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
>  at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
>  at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>  at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>  at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
>  ... 10 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)