You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "ChangZhuo Chen (陳昌倬)" <cz...@czchen.org> on 2022/06/02 01:08:09 UTC

Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

Hi,

We use GCS as storage, and have the following functions to list files in
GCS path for Flink batch mode to buidl states:


  def listPath(p: String): Seq[String] = {
    val path = new Path(p)
    val fs = path.getFileSystem(new Configuration())
    fs.listStatus(path) match {
      case null => Seq()
      case xs => xs.map(_.getPath.toString)
    }
  }

This function works fine in Flink 1.14. However, in Flink 1.15, we have
the following exception:

  Caused by: java.lang.ClassCastException: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
          at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
          at <redacted>.listPath(<redacted>) ~[?:?]

We found a similar issue in Spark [0]. However, we are not sure if it is
related, and if it is, how can we apply this fix. Any help is welcome.


[0] https://issues.apache.org/jira/browse/SPARK-9206


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

Posted by "ChangZhuo Chen (陳昌倬)" <cz...@czchen.org>.
On Tue, Jun 07, 2022 at 03:26:43PM +0800, czchen@gmail.com wrote:
> On Mon, Jun 06, 2022 at 10:42:08AM +0800, Shengkai Fang wrote:
> > Hi. In my experience, the step to debug classloading problems are as
> > follows:
> 
> Thanks for the help. We get the following log when using
> `-verbose:class`:
> 
>   [2.074s][info][class,load] org.apache.hadoop.fs.FileSystem source: file:/opt/flink/opt/flink-s3-fs-hadoop-1.15.0.jar
>   ...
>   [8.094s][info][class,load] com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem source: file:/opt/flink/opt/flink-gs-fs-hadoop-1.15.0.jar
> 
> 
> It looks like application uses hadoop.fs.FileSystem from
> flink-s3-fs-hadoop-1.15.0.jar, and use GoogleHadoopFileSystem from
> flink-gs-fs-hadoop-1.15.0.jar, and they are incompatible. Since we run
> Flink in both AWS and GCP, our base image contains both plugins at the
> same time. Any idea how to workaround it?
> 
> We also try to set `classloader.resolve-order: parent-first`. However,
> we got another error causing by library conflict between Flink and our
> application:
> 
>   Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.11.3 requires Jackson Databind version >= 2.11.0 and < 2.12.0

We solve the problem by moving plugins into correct plugins directory.
Thanks for the help from slack.



-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

Posted by cz...@gmail.com.
On Mon, Jun 06, 2022 at 10:42:08AM +0800, Shengkai Fang wrote:
> Hi. In my experience, the step to debug classloading problems are as
> follows:

Thanks for the help. We get the following log when using
`-verbose:class`:

  [2.074s][info][class,load] org.apache.hadoop.fs.FileSystem source: file:/opt/flink/opt/flink-s3-fs-hadoop-1.15.0.jar
  ...
  [8.094s][info][class,load] com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem source: file:/opt/flink/opt/flink-gs-fs-hadoop-1.15.0.jar


It looks like application uses hadoop.fs.FileSystem from
flink-s3-fs-hadoop-1.15.0.jar, and use GoogleHadoopFileSystem from
flink-gs-fs-hadoop-1.15.0.jar, and they are incompatible. Since we run
Flink in both AWS and GCP, our base image contains both plugins at the
same time. Any idea how to workaround it?

We also try to set `classloader.resolve-order: parent-first`. However,
we got another error causing by library conflict between Flink and our
application:

  Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.11.3 requires Jackson Databind version >= 2.11.0 and < 2.12.0

-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

Posted by Shengkai Fang <fs...@gmail.com>.
Hi. In my experience, the step to debug classloading problems are as
follows:

1. make sure the loaded class is from which jar
2. check the class in the jar extends FileSystem or not.
3. If the loaded class is not as you wish, you may need to shade the class
into your submit jars.

For the first step, you can modify the flink-conf.yaml, add
```
env.java.opts: "-verbose:class"
```

then restart the flink cluster.

Then you can submit your job again, you can see the class is from which jar
in the WebUI.

[image: image.png]

Best,
Shengkai



ChangZhuo Chen (陳昌倬) <cz...@czchen.org> 于2022年6月5日周日 01:41写道:

> On Thu, Jun 02, 2022 at 06:23:20PM +0800, Qingsheng Ren wrote:
> > Thanks for the input ChangZhuo.
> >
> > Could you check if the configuration "classloader.resolve-order” is
> > set to “parent-first” in your Flink 1.14 cluster? I didn’t notice any
> > changes related to the user code classloader in Flink 1.15. If my
> > assumption is correct, you package the gcs-connector into your job JAR
> > but the Hadoop FS dependencies are not included, so
> > org.apache.hadoop.fs.FileSystem is loaded by app classloader from
> > flink-s3-fs-hadoop.jar under the lib of Flink, but
> > GoogleHadoopFileSystem is loaded by user code classloader from job
> > JAR. Setting the resolve order to "parent-first" could bypass the
> > issue [1] so I assume you have this config in 1.14 but not in 1.15.
> > Please forgive me if I understand incorrectly!
>
> No, we do not config classloader.resolve-order in both 1.14, and 1.15
> setup. We will check if "parent-first" can solve the problem, thanks for
> the advise.
>
>
> Also, in 1.14, we include the following jars into /opt/flink/lib to
> support GCS:
>
> * flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
> * gcs-connector-hadoop3-2.2.2-shaded.jar
>
> In 1.15, we add flink-gs-fs-hadoop-1.15.0.jar to /opt/flink/lib to
> support GCS. Maybe this different causes problem?
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>

Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

Posted by "ChangZhuo Chen (陳昌倬)" <cz...@czchen.org>.
On Thu, Jun 02, 2022 at 06:23:20PM +0800, Qingsheng Ren wrote:
> Thanks for the input ChangZhuo.
> 
> Could you check if the configuration "classloader.resolve-order” is
> set to “parent-first” in your Flink 1.14 cluster? I didn’t notice any
> changes related to the user code classloader in Flink 1.15. If my
> assumption is correct, you package the gcs-connector into your job JAR
> but the Hadoop FS dependencies are not included, so
> org.apache.hadoop.fs.FileSystem is loaded by app classloader from
> flink-s3-fs-hadoop.jar under the lib of Flink, but
> GoogleHadoopFileSystem is loaded by user code classloader from job
> JAR. Setting the resolve order to "parent-first" could bypass the
> issue [1] so I assume you have this config in 1.14 but not in 1.15.
> Please forgive me if I understand incorrectly!

No, we do not config classloader.resolve-order in both 1.14, and 1.15
setup. We will check if "parent-first" can solve the problem, thanks for
the advise.


Also, in 1.14, we include the following jars into /opt/flink/lib to
support GCS:

* flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
* gcs-connector-hadoop3-2.2.2-shaded.jar

In 1.15, we add flink-gs-fs-hadoop-1.15.0.jar to /opt/flink/lib to
support GCS. Maybe this different causes problem?


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

Posted by Martijn Visser <ma...@apache.org>.
I believe the change between Flink 1.14 and Flink 1.15 has been the
addition of a RecoverableWriter for GCS [1]

Perhaps this is the reason for this failure?

Best regards, Martijn

[1] https://issues.apache.org/jira/browse/FLINK-11838

Op do 2 jun. 2022 om 12:24 schreef Qingsheng Ren <re...@gmail.com>:

> Thanks for the input ChangZhuo.
>
> Could you check if the configuration "classloader.resolve-order” is
> set to “parent-first” in your Flink 1.14 cluster? I didn’t notice any
> changes related to the user code classloader in Flink 1.15. If my
> assumption is correct, you package the gcs-connector into your job JAR
> but the Hadoop FS dependencies are not included, so
> org.apache.hadoop.fs.FileSystem is loaded by app classloader from
> flink-s3-fs-hadoop.jar under the lib of Flink, but
> GoogleHadoopFileSystem is loaded by user code classloader from job
> JAR. Setting the resolve order to "parent-first" could bypass the
> issue [1] so I assume you have this config in 1.14 but not in 1.15.
> Please forgive me if I understand incorrectly!
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>
> On Thu, Jun 2, 2022 at 11:22 AM ChangZhuo Chen (陳昌倬) <cz...@czchen.org>
> wrote:
> >
> > On Thu, Jun 02, 2022 at 11:17:19AM +0800, Qingsheng Ren wrote:
> > > Hi ChangZhuo,
> > >
> > > I assume it’s a classloading issue but I can’t track down to the root
> cause in code. Would you mind sharing the entire exception stack and some
> JM/TM logs related to file system?
> >
> > The following is exception log we have. Please let us know if you need
> > other logs.
> >
> > ps. <redacted>.listPath(<redacted>) is the function I mentioned earlier.
> >
> >
> >     2022-06-02 00:25:57,825 WARN
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
> [] - Application failed unexpectedly:
> >     java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> >             at
> java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source) ~[?:?]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> >             at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> >             at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
> [?:?]
> >             at
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
> [?:?]
> >             at java.util.concurrent.ForkJoinPool.scan(Unknown Source)
> [?:?]
> >             at java.util.concurrent.ForkJoinPool.runWorker(Unknown
> Source) [?:?]
> >             at java.util.concurrent.ForkJoinWorkerThread.run(Unknown
> Source) [?:?]
> >     Caused by:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> >             ... 14 more
> >     Caused by:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: class
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to
> class org.apache.hadoop.fs.FileSystem
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> >             at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             ... 13 more
> >     Caused by: java.lang.ClassCastException: class
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to
> class org.apache.hadoop.fs.FileSystem
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> >             at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             <redacted>.listPath(<redacted>) ~[?:?]
> >             at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
> >             at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
> >             at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> ~[?:?]
> >             at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
> >             at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             ... 13 more
> >     2022-06-02 00:25:57,828 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal
> error occurred in the cluster entrypoint.
> >     java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> >             at
> java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
> ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
> >             at
> java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source) ~[?:?]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> >             at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> >             at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> >             at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
> [?:?]
> >             at
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
> [?:?]
> >             at java.util.concurrent.ForkJoinPool.scan(Unknown Source)
> [?:?]
> >             at java.util.concurrent.ForkJoinPool.runWorker(Unknown
> Source) [?:?]
> >             at java.util.concurrent.ForkJoinWorkerThread.run(Unknown
> Source) [?:?]
> >     Caused by:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> >             ... 14 more
> >     Caused by:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: class
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to
> class org.apache.hadoop.fs.FileSystem
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> >             at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             ... 13 more
> >     Caused by: java.lang.ClassCastException: class
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to
> class org.apache.hadoop.fs.FileSystem
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> >             at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >             <redacted>.listPath(<redacted>) ~[?:?]
> >             at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
> >             at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
> >             at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> ~[?:?]
> >             at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
> >             at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
> ~[flink-dist-1.15.0.jar:1.15.0]
> >             ... 13 more
> >     2022-06-02 00:25:57,830 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting
> KubernetesApplicationClusterEntrypoint down with application status
> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
> >
> > >
> > > Best regards,
> > >
> > > Qingsheng
> > >
> > > > On Jun 2, 2022, at 09:08, ChangZhuo Chen (陳昌倬) <cz...@czchen.org>
> wrote:
> > > >
> > > > Hi,
> > > >
> > > > We use GCS as storage, and have the following functions to list
> files in
> > > > GCS path for Flink batch mode to buidl states:
> > > >
> > > >
> > > >  def listPath(p: String): Seq[String] = {
> > > >    val path = new Path(p)
> > > >    val fs = path.getFileSystem(new Configuration())
> > > >    fs.listStatus(path) match {
> > > >      case null => Seq()
> > > >      case xs => xs.map(_.getPath.toString)
> > > >    }
> > > >  }
> > > >
> > > > This function works fine in Flink 1.14. However, in Flink 1.15, we
> have
> > > > the following exception:
> > > >
> > > >  Caused by: java.lang.ClassCastException: class
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to
> class org.apache.hadoop.fs.FileSystem
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> > > >          at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at
> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > > >          at <redacted>.listPath(<redacted>) ~[?:?]
> > > >
> > > > We found a similar issue in Spark [0]. However, we are not sure if
> it is
> > > > related, and if it is, how can we apply this fix. Any help is
> welcome.
> > > >
> > > >
> > > > [0] https://issues.apache.org/jira/browse/SPARK-9206
> > > >
> > > >
> > > > --
> > > > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > > > http://czchen.info/
> > > > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
> > >
> >
> > --
> > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > http://czchen.info/
> > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>

Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

Posted by Qingsheng Ren <re...@gmail.com>.
Thanks for the input ChangZhuo.

Could you check if the configuration "classloader.resolve-order” is
set to “parent-first” in your Flink 1.14 cluster? I didn’t notice any
changes related to the user code classloader in Flink 1.15. If my
assumption is correct, you package the gcs-connector into your job JAR
but the Hadoop FS dependencies are not included, so
org.apache.hadoop.fs.FileSystem is loaded by app classloader from
flink-s3-fs-hadoop.jar under the lib of Flink, but
GoogleHadoopFileSystem is loaded by user code classloader from job
JAR. Setting the resolve order to "parent-first" could bypass the
issue [1] so I assume you have this config in 1.14 but not in 1.15.
Please forgive me if I understand incorrectly!

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

On Thu, Jun 2, 2022 at 11:22 AM ChangZhuo Chen (陳昌倬) <cz...@czchen.org> wrote:
>
> On Thu, Jun 02, 2022 at 11:17:19AM +0800, Qingsheng Ren wrote:
> > Hi ChangZhuo,
> >
> > I assume it’s a classloading issue but I can’t track down to the root cause in code. Would you mind sharing the entire exception stack and some JM/TM logs related to file system?
>
> The following is exception log we have. Please let us know if you need
> other logs.
>
> ps. <redacted>.listPath(<redacted>) is the function I mentioned earlier.
>
>
>     2022-06-02 00:25:57,825 WARN  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly:
>     java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
>             at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
>             at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
>             at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
>             at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
>             at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
>             at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244) ~[flink-dist-1.15.0.jar:1.15.0]
>             at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
>             at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>             at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
>             at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
>             at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
>             at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
>             at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
>             at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
>             at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
>             at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
>             at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
>             at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
>     Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
>             ... 14 more
>     Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
>             at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0]
>             ... 13 more
>     Caused by: java.lang.ClassCastException: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
>             at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             <redacted>.listPath(<redacted>) ~[?:?]
>             at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
>             at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
>             at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
>             at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>             at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0]
>             ... 13 more
>     2022-06-02 00:25:57,828 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
>     java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
>             at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
>             at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
>             at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
>             at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
>             at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
>             at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244) ~[flink-dist-1.15.0.jar:1.15.0]
>             at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
>             at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>             at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
>             at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
>             at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
>             at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
>             at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
>             at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
>             at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
>             at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
>             at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
>             at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
>     Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
>             ... 14 more
>     Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
>             at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0]
>             ... 13 more
>     Caused by: java.lang.ClassCastException: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
>             at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>             <redacted>.listPath(<redacted>) ~[?:?]
>             at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
>             at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
>             at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
>             at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>             at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0.jar:1.15.0]
>             at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0]
>             ... 13 more
>     2022-06-02 00:25:57,830 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
>
> >
> > Best regards,
> >
> > Qingsheng
> >
> > > On Jun 2, 2022, at 09:08, ChangZhuo Chen (陳昌倬) <cz...@czchen.org> wrote:
> > >
> > > Hi,
> > >
> > > We use GCS as storage, and have the following functions to list files in
> > > GCS path for Flink batch mode to buidl states:
> > >
> > >
> > >  def listPath(p: String): Seq[String] = {
> > >    val path = new Path(p)
> > >    val fs = path.getFileSystem(new Configuration())
> > >    fs.listStatus(path) match {
> > >      case null => Seq()
> > >      case xs => xs.map(_.getPath.toString)
> > >    }
> > >  }
> > >
> > > This function works fine in Flink 1.14. However, in Flink 1.15, we have
> > > the following exception:
> > >
> > >  Caused by: java.lang.ClassCastException: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> > >          at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > >          at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > >          at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > >          at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > >          at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > >          at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> > >          at <redacted>.listPath(<redacted>) ~[?:?]
> > >
> > > We found a similar issue in Spark [0]. However, we are not sure if it is
> > > related, and if it is, how can we apply this fix. Any help is welcome.
> > >
> > >
> > > [0] https://issues.apache.org/jira/browse/SPARK-9206
> > >
> > >
> > > --
> > > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > > http://czchen.info/
> > > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
> >
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

Posted by "ChangZhuo Chen (陳昌倬)" <cz...@czchen.org>.
On Thu, Jun 02, 2022 at 11:17:19AM +0800, Qingsheng Ren wrote:
> Hi ChangZhuo,
> 
> I assume it’s a classloading issue but I can’t track down to the root cause in code. Would you mind sharing the entire exception stack and some JM/TM logs related to file system?

The following is exception log we have. Please let us know if you need
other logs.

ps. <redacted>.listPath(<redacted>) is the function I mentioned earlier.


    2022-06-02 00:25:57,825 WARN  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly:
    java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
            at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
            at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
            at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
            at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244) ~[flink-dist-1.15.0.jar:1.15.0]
            at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
            at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
            at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
            at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
            at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
            at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
            at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
            at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
            at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
            at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
            at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
            at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
    Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
            ... 14 more
    Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0]
            ... 13 more
    Caused by: java.lang.ClassCastException: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
            at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
            at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
            at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
            at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
            at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
            at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
            <redacted>.listPath(<redacted>) ~[?:?]
            at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
            at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
            at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
            at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0]
            ... 13 more
    2022-06-02 00:25:57,828 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
    java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
            at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
            at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
            at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
            at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244) ~[flink-dist-1.15.0.jar:1.15.0]
            at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
            at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
            at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
            at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
            at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
            at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
            at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
            at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
            at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
            at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
            at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
            at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
    Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
            ... 14 more
    Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0]
            ... 13 more
    Caused by: java.lang.ClassCastException: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
            at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
            at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
            at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
            at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
            at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
            at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
    	    <redacted>.listPath(<redacted>) ~[?:?]
            at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
            at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
            at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
            at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0.jar:1.15.0]
            at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0]
            ... 13 more
    2022-06-02 00:25:57,830 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..

> 
> Best regards, 
> 
> Qingsheng
> 
> > On Jun 2, 2022, at 09:08, ChangZhuo Chen (陳昌倬) <cz...@czchen.org> wrote:
> > 
> > Hi,
> > 
> > We use GCS as storage, and have the following functions to list files in
> > GCS path for Flink batch mode to buidl states:
> > 
> > 
> >  def listPath(p: String): Seq[String] = {
> >    val path = new Path(p)
> >    val fs = path.getFileSystem(new Configuration())
> >    fs.listStatus(path) match {
> >      case null => Seq()
> >      case xs => xs.map(_.getPath.toString)
> >    }
> >  }
> > 
> > This function works fine in Flink 1.14. However, in Flink 1.15, we have
> > the following exception:
> > 
> >  Caused by: java.lang.ClassCastException: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> >          at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >          at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >          at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >          at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >          at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >          at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >          at <redacted>.listPath(<redacted>) ~[?:?]
> > 
> > We found a similar issue in Spark [0]. However, we are not sure if it is
> > related, and if it is, how can we apply this fix. Any help is welcome.
> > 
> > 
> > [0] https://issues.apache.org/jira/browse/SPARK-9206
> > 
> > 
> > -- 
> > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > http://czchen.info/
> > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
> 

-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

Posted by Qingsheng Ren <re...@gmail.com>.
Hi ChangZhuo,

I assume it’s a classloading issue but I can’t track down to the root cause in code. Would you mind sharing the entire exception stack and some JM/TM logs related to file system?

Best regards, 

Qingsheng

> On Jun 2, 2022, at 09:08, ChangZhuo Chen (陳昌倬) <cz...@czchen.org> wrote:
> 
> Hi,
> 
> We use GCS as storage, and have the following functions to list files in
> GCS path for Flink batch mode to buidl states:
> 
> 
>  def listPath(p: String): Seq[String] = {
>    val path = new Path(p)
>    val fs = path.getFileSystem(new Configuration())
>    fs.listStatus(path) match {
>      case null => Seq()
>      case xs => xs.map(_.getPath.toString)
>    }
>  }
> 
> This function works fine in Flink 1.14. However, in Flink 1.15, we have
> the following exception:
> 
>  Caused by: java.lang.ClassCastException: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
>          at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>          at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>          at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>          at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>          at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>          at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
>          at <redacted>.listPath(<redacted>) ~[?:?]
> 
> We found a similar issue in Spark [0]. However, we are not sure if it is
> related, and if it is, how can we apply this fix. Any help is welcome.
> 
> 
> [0] https://issues.apache.org/jira/browse/SPARK-9206
> 
> 
> -- 
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B