You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by tarun joshi <19...@gmail.com> on 2021/08/06 06:02:09 UTC

flink not able to get scheme for S3

Hey All,

I am running flink in docker containers (image Tag
:flink:scala_2.11-java11) on EC2 and getting exception as I am trying to
submit a job through the local ./opt/flink/bin

*org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: No FileSystem for scheme "s3"*
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
at
org.apache.flink.examples.java.wordcount.WordCount.printParquetData(WordCount.java:142)
at
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:83)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 8 more

This is the way I am invoking Flink Built_IN S3 plugins for the  Jobmanager
and TaskManager :









*docker run \--rm \--volume /root/:/root/ \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=jobmanager \--network flink-network \--publish 8081:8081
\flink:scala_2.11-java11 jobmanager &*









*docker run \--rm \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=taskmanager_0 \--network flink-network \flink:scala_2.11-java11
taskmanager &*

This is how I am defining dependencies in my pom.xml (I am working upon the
Flink-Examples project from Flink Github repo).

<dependencies>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${project.version}</version>
      <scope>provided</scope>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.binary.version}</artifactId>
      <version>${project.version}</version>
      <scope>provided</scope>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${project.version}</version>
      <scope>provided</scope>
   </dependency>

   <dependency>
      <groupId>org.apache.parquet</groupId>
      <artifactId>parquet-avro</artifactId>
      <version>1.12.0</version>
   </dependency>
   <dependency>
      <groupId>org.apache.parquet</groupId>
      <artifactId>parquet-column</artifactId>
      <version>1.12.0</version>
   </dependency>
   <dependency>
      <groupId>org.apache.parquet</groupId>
      <artifactId>parquet-hadoop</artifactId>
      <version>1.12.0</version>
   </dependency>
   <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.3.1</version>
   </dependency>
</dependencies>

I am also able to see plugins being loaded for JobManager and TaskManager :





*Linking flink-s3-fs-hadoop-1.13.1.jar to plugin directorySuccessfully
enabled flink-s3-fs-hadoop-1.13.1.jarLinking flink-s3-fs-presto-1.13.1.jar
to plugin directorySuccessfully enabled flink-s3-fs-presto-1.13.1.jar*

Let me if I am doing anything wrong.

*Thanks for the help! *

Re: flink not able to get scheme for S3

Posted by tarun joshi <19...@gmail.com>.
Thanks Chesnay ! that helped me resolve the issue


On Fri, 6 Aug 2021 at 04:31, Chesnay Schepler <ch...@apache.org> wrote:

> The reason this doesn't work is that your application works directly
> against Hadoop.
> The filesystems in the plugins directory are only loaded via specific
> code-paths, specifically when the Flink FileSystem class is used.
> Since you are using Hadoop directly you are side-stepping the plugin
> mechanism.
>
> So you have to make sure that Hadoop + Hadoop's S3 filesystem is available
> to the client.
>
> On 06/08/2021 08:02, tarun joshi wrote:
>
> Hey All,
>
> I am running flink in docker containers (image Tag
> :flink:scala_2.11-java11) on EC2 and getting exception as I am trying to
> submit a job through the local ./opt/flink/bin
>
> *org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: No FileSystem for scheme "s3"*
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "s3"
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
> at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> at
> org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
> at
> org.apache.flink.examples.java.wordcount.WordCount.printParquetData(WordCount.java:142)
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:83)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ... 8 more
>
> This is the way I am invoking Flink Built_IN S3 plugins for the
> Jobmanager and TaskManager :
>
>
>
>
>
>
>
>
>
> *docker run \ --rm \ --volume /root/:/root/ \ --env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \ --env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \
> --env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \ --name=jobmanager \ --network flink-network \ --publish 8081:8081 \
> flink:scala_2.11-java11 jobmanager &*
>
>
>
>
>
>
>
>
>
> *docker run \ --rm \ --env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \ --env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \
> --env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \ --name=taskmanager_0 \ --network flink-network \ flink:scala_2.11-java11
> taskmanager & *
>
> This is how I am defining dependencies in my pom.xml (I am working upon
> the Flink-Examples project from Flink Github repo).
>
> <dependencies>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-java</artifactId>
>       <version>${project.version}</version>
>       <scope>provided</scope>
>    </dependency>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-scala_${scala.binary.version}</artifactId>
>       <version>${project.version}</version>
>       <scope>provided</scope>
>    </dependency>
>
>    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-clients_${scala.binary.version}</artifactId>
>       <version>${project.version}</version>
>       <scope>provided</scope>
>    </dependency>
>
>    <dependency>
>       <groupId>org.apache.parquet</groupId>
>       <artifactId>parquet-avro</artifactId>
>       <version>1.12.0</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.parquet</groupId>
>       <artifactId>parquet-column</artifactId>
>       <version>1.12.0</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.parquet</groupId>
>       <artifactId>parquet-hadoop</artifactId>
>       <version>1.12.0</version>
>    </dependency>
>    <dependency>
>       <groupId>org.apache.hadoop</groupId>
>       <artifactId>hadoop-common</artifactId>
>       <version>3.3.1</version>
>    </dependency>
> </dependencies>
>
> I am also able to see plugins being loaded for JobManager and TaskManager
> :
>
>
>
>
>
> *Linking flink-s3-fs-hadoop-1.13.1.jar to plugin directory Successfully
> enabled flink-s3-fs-hadoop-1.13.1.jar Linking flink-s3-fs-presto-1.13.1.jar
> to plugin directory Successfully enabled flink-s3-fs-presto-1.13.1.jar *
>
> Let me if I am doing anything wrong.
>
> *Thanks for the help! *
>
>
>

Re: flink not able to get scheme for S3

Posted by Chesnay Schepler <ch...@apache.org>.
The reason this doesn't work is that your application works directly 
against Hadoop.
The filesystems in the plugins directory are only loaded via specific 
code-paths, specifically when the Flink FileSystem class is used.
Since you are using Hadoop directly you are side-stepping the plugin 
mechanism.

So you have to make sure that Hadoop + Hadoop's S3 filesystem is 
available to the client.

On 06/08/2021 08:02, tarun joshi wrote:
> Hey All,
>
> I am running flink in docker containers (image Tag 
> :flink:scala_2.11-java11) on EC2 and getting exception as I am trying 
> to submit a job through the local ./opt/flink/bin
>
> */org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error: No FileSystem for scheme "s3"/*
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No 
> FileSystem for scheme "s3"
> at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
> at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> at 
> org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
> at 
> org.apache.flink.examples.java.wordcount.WordCount.printParquetData(WordCount.java:142)
> at 
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:83)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
> Source)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ... 8 more
>
> This is the way I am invoking Flink Built_IN S3 plugins for the  
> Jobmanager and TaskManager :
> */docker run \
> --rm \
> --volume /root/:/root/ \
> --env JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \
> --env 
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \
> --env 
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar" 
> \
> --name=jobmanager \
> --network flink-network \
> --publish 8081:8081 \
> flink:scala_2.11-java11 jobmanager &/*
> */
> /*
> */docker run \
> --rm \
> --env JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \
> --env 
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \
> --env 
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar" 
> \
> --name=taskmanager_0 \
> --network flink-network \
> flink:scala_2.11-java11 taskmanager &
> /*
> */
> /*
> This is how I am defining dependencies in my pom.xml (I am working 
> upon the Flink-Examples project from Flink Github repo).
>
> <dependencies>
>     <dependency>
>        <groupId>org.apache.flink</groupId>
>        <artifactId>flink-java</artifactId>
>        <version>${project.version}</version>
>        <scope>provided</scope>
>     </dependency>
>
>     <dependency>
>        <groupId>org.apache.flink</groupId>
>        <artifactId>flink-scala_${scala.binary.version}</artifactId>
>        <version>${project.version}</version>
>        <scope>provided</scope>
>     </dependency>
>
>     <dependency>
>        <groupId>org.apache.flink</groupId>
>        <artifactId>flink-clients_${scala.binary.version}</artifactId>
>        <version>${project.version}</version>
>        <scope>provided</scope>
>     </dependency>
>
>     <dependency>
>        <groupId>org.apache.parquet</groupId>
>        <artifactId>parquet-avro</artifactId>
>        <version>1.12.0</version>
>     </dependency>
>     <dependency>
>        <groupId>org.apache.parquet</groupId>
>        <artifactId>parquet-column</artifactId>
>        <version>1.12.0</version>
>     </dependency>
>     <dependency>
>        <groupId>org.apache.parquet</groupId>
>        <artifactId>parquet-hadoop</artifactId>
>        <version>1.12.0</version>
>     </dependency>
>     <dependency>
>        <groupId>org.apache.hadoop</groupId>
>        <artifactId>hadoop-common</artifactId>
>        <version>3.3.1</version>
>     </dependency>
> </dependencies>
> I am also able to see plugins being loaded for JobManager and 
> TaskManager :
> */
> /*
> */Linking flink-s3-fs-hadoop-1.13.1.jar to plugin directory
> Successfully enabled flink-s3-fs-hadoop-1.13.1.jar
> Linking flink-s3-fs-presto-1.13.1.jar to plugin directory
> Successfully enabled flink-s3-fs-presto-1.13.1.jar
> /*
> */
> /*
> Let me if I am doing anything wrong.
>
> /Thanks for the help! /
> */
> /*