You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Eric Lee (Jira)" <ji...@apache.org> on 2019/12/05 03:58:00 UTC

[jira] [Commented] (FLINK-14973) OSS filesystem does not relocate many dependencies

    [ https://issues.apache.org/jira/browse/FLINK-14973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988437#comment-16988437 ] 

Eric Lee commented on FLINK-14973:
----------------------------------

Hi, [~plucas], I think this problem is caused by the chaos of the management of flink file system. 

All flink systems are all based on the Hadoop file system. But in these submodules' maven settings, all of them use shade plugin to relocate the package `org.apache.hadoop` but somehow some of them ignore the dependencies of Hadoop such as `com.google.common` and oss file system even doesn't relocate it. 

As a result, when oss and azure package are both added into the classpath and flink is going to restore from a savepoint in oss, it will initialize `com.google.common.xx.ListeningExecutorService` object and then try to find class `org.apache.flink.fs.shaded.hadoop3.org.apache.commons.SemaphoredDelegatingExecutor` which exists both in oss and azure jar package as they follow the same relocation rules. Then, JVM will hold the class SemaphoredDelegatingExecutor's definition in the azure jar cause the classloader will firstly load an azure package.  Due to different relocation rules, in the azure package, SemaphoredDelegatingExecutor holds the definition of `org.apache.flink.fs.{color:red}azure{color}.shaded.com.google.common.xx.ListeningExecutorService` rather than `com.google.common.xx.ListeningExecutorService`. So after oss try to use `com.google.common.xx.ListeningExecutorService` to new instance of `SemaphoredDelegatingExecutor(org.apache.flink.fs.{color:red}azure{color}.shaded.com.google.common.xx.ListeningExecutorService)`, JVM will encounter conflict and throw failure.

Maybe writing a README specifying all the rules the sub filesystems need to follow in the flink filesystem is needed.

> OSS filesystem does not relocate many dependencies
> --------------------------------------------------
>
>                 Key: FLINK-14973
>                 URL: https://issues.apache.org/jira/browse/FLINK-14973
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystems
>    Affects Versions: 1.9.1
>            Reporter: Patrick Lucas
>            Priority: Major
>
> Whereas the Azure and S3 Hadoop filesystem jars relocate all of their depdendencies:
> {noformat}
> $ jar tf opt/flink-azure-fs-hadoop-1.9.1.jar | grep -v '^org/apache/fs/shaded/' | grep -v '^META-INF' | grep '/' | cut -f -2 -d / | sort | uniq
> org/
> org/apache
> $ jar tf opt/flink-s3-fs-hadoop-1.9.1.jar | grep -v '^org/apache/fs/shaded/' | grep -v '^META-INF' | grep '/' | cut -f -2 -d / | sort | uniq
> org/
> org/apache
> {noformat}
> The OSS Hadoop filesystem leaves many things un-relocated:
> {noformat}
> $ jar tf opt/flink-oss-fs-hadoop-1.9.1.jar | grep -v '^org/apache/fs/shaded/' | grep -v '^META-INF' | grep '/' | cut -f -2 -d / | sort | uniq
> assets/
> assets/org
> avro/
> avro/shaded
> com/
> com/ctc
> com/fasterxml
> com/google
> com/jcraft
> com/nimbusds
> com/sun
> com/thoughtworks
> javax/
> javax/activation
> javax/el
> javax/servlet
> javax/ws
> javax/xml
> jersey/
> jersey/repackaged
> licenses/
> licenses/LICENSE.asm
> licenses/LICENSE.cddlv1.0
> licenses/LICENSE.cddlv1.1
> licenses/LICENSE.jdom
> licenses/LICENSE.jzlib
> licenses/LICENSE.paranamer
> licenses/LICENSE.protobuf
> licenses/LICENSE.re2j
> licenses/LICENSE.stax2api
> net/
> net/jcip
> net/minidev
> org/
> org/apache
> org/codehaus
> org/eclipse
> org/jdom
> org/objectweb
> org/tukaani
> org/xerial
> {noformat}
> The first symptom of this I ran into was that Flink is unable to restore from a savepoint if both the OSS and Azure Hadoop filesystems are on the classpath, but I assume this has the potential to cause further problems, at least until more progress is made on the module/classloading front.
> h3. Steps to reproduce
> # Copy both the Azure and OSS Hadoop filesystem JARs from opt/ into lib/
> # Run a job that restores from a savepoint (the savepoint might need to be stored on OSS)
> # See a crash and traceback like:
> {noformat}
> 2019-11-26 15:59:25,318 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
> org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take leadership with session id 00000000-0000-0000-0000-000000000000.
> 	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$30(Dispatcher.java:915) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_232]
> 	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_232]
> 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_232]
> 	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_232]
> 	at org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:691) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_232]
> 	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_232]
> 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_232]
> 	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) ~[?:1.8.0_232]
> 	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:753) ~[?:1.8.0_232]
> 	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_232]
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
> 	at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_232]
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	... 4 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
> 	at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_232]
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	... 4 more
> Caused by: java.lang.NoSuchMethodError: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.SemaphoredDelegatingExecutor.<init>(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
> 	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.open(AliyunOSSFileSystem.java:570) ~[flink-oss-fs-hadoop-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950) ~[flink-azure-fs-hadoop-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:120) ~[flink-oss-fs-hadoop-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:37) ~[flink-oss-fs-hadoop-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:141) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1132) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.scheduler.LegacyScheduler.tryRestoreExecutionGraphFromSavepoint(LegacyScheduler.java:237) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:196) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_232]
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) ~[flink-dist_2.12-1.9.1-stream2.jar:1.9.1-stream2]
> 	... 4 more
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)