You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by lr...@lyft.com, lr...@lyft.com on 2018/03/14 18:38:56 UTC

Correct way to reference Hadoop dependencies in Flink 1.4.0

I'm trying to use a BucketingSink to write files to S3 in my Flink job.

I have the Hadoop dependencies I need packaged in my user application jar. However, on running the job I get the following error (from the taskmanager):

java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
        ... 9 common frames omitted
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
        at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
        ... 13 common frames omitted

What's the right way to do this?

Re: Correct way to reference Hadoop dependencies in Flink 1.4.0

Posted by Deepak Jha <dk...@gmail.com>.
Hi,
You probably need to set core-site.xml and set the Hadoop conf path in
flink-conf.yaml

core-site.xml:

<configuration>
<property>
  <name>fs.s3.impl</name>
  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value></property>
<!-- Comma separated list of local directories used to buffer
     large results prior to transmitting them to S3. --><property>
  <name>fs.s3.buffer.dir</name>
  <value>/tmp</value></property>
</configuration>


I’ve had similar issue when I tried to upgrade to Flink 1.4.2 .

On Thu, Mar 15, 2018 at 9:39 AM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I believe for FileSystems to be correctly be picked up they have to be in
> the lib/ folder of Flink. Stephan (cc'ed), please correct me if I'm wrong
> here, you probably know that one best.
>
> Aljoscha
>
> > On 14. Mar 2018, at 18:26, lrao@lyft.com wrote:
> >
> > Hi,
> >
> > I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have
> the following dependencies packaged in my user application JAR:
> >
> > aws-java-sdk 1.7.4
> > flink-hadoop-fs 1.4.0
> > flink-shaded-hadoop2 1.4.0
> > flink-connector-filesystem_2.11 1.4.0
> > hadoop-common 2.7.4
> > hadoop-aws 2.7.4
> >
> > I have also tried the following conf:
> > classloader.resolve-order: parent-first
> > fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop
> >
> > But no luck. Anything else I could be missing?
> >
> > On 2018/03/14 18:57:47, Francesco Ciuci <fr...@gmail.com>
> wrote:
> >> Hi,
> >>
> >> You do not just need the hadoop dependencies in the jar but you need to
> >> have the hadoop file system running in your machine/cluster.
> >>
> >> Regards
> >>
> >> On 14 March 2018 at 18:38, lrao@lyft.com <lr...@lyft.com> wrote:
> >>
> >>> I'm trying to use a BucketingSink to write files to S3 in my Flink job.
> >>>
> >>> I have the Hadoop dependencies I need packaged in my user application
> jar.
> >>> However, on running the job I get the following error (from the
> >>> taskmanager):
> >>>
> >>> java.lang.RuntimeException: Error while creating FileSystem when
> >>> initializing the state of the BucketingSink.
> >>>        at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.initializeState(BucketingSink.java:358)
> >>>        at org.apache.flink.streaming.util.functions.
> >>>
> StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> >>>        at org.apache.flink.streaming.util.functions.
> >>>
> StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
> >>> 160)
> >>>        at org.apache.flink.streaming.api.operators.
> >>> AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
> >>> java:96)
> >>>        at org.apache.flink.streaming.api.operators.
> >>> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> >>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> >>> initializeOperators(StreamTask.java:694)
> >>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> >>> initializeState(StreamTask.java:682)
> >>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> >>> invoke(StreamTask.java:253)
> >>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> >>>        at java.lang.Thread.run(Thread.java:748)
> >>> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >>> Could not find a file system implementation for scheme 's3a'. The
> scheme is
> >>> not directly supported by Flink and no Hadoop file system to support
> this
> >>> scheme could be loaded.
> >>>        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> >>> FileSystem.java:405)
> >>>        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> >>>        at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
> >>>        at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.initFileSystem(BucketingSink.java:411)
> >>>        at org.apache.flink.streaming.connectors.fs.bucketing.
> >>> BucketingSink.initializeState(BucketingSink.java:355)
> >>>        ... 9 common frames omitted
> >>> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> >>> Hadoop is not in the classpath/dependencies.
> >>>        at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> >>> UnsupportedSchemeFactory.java:64)
> >>>        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> >>> FileSystem.java:401)
> >>>        ... 13 common frames omitted
> >>>
> >>> What's the right way to do this?
> >>>
> >>
>
> --
Sent from Gmail Mobile

Re: Correct way to reference Hadoop dependencies in Flink 1.4.0

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I believe for FileSystems to be correctly be picked up they have to be in the lib/ folder of Flink. Stephan (cc'ed), please correct me if I'm wrong here, you probably know that one best.

Aljoscha

> On 14. Mar 2018, at 18:26, lrao@lyft.com wrote:
> 
> Hi,
> 
> I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have the following dependencies packaged in my user application JAR:
> 
> aws-java-sdk 1.7.4
> flink-hadoop-fs 1.4.0
> flink-shaded-hadoop2 1.4.0
> flink-connector-filesystem_2.11 1.4.0
> hadoop-common 2.7.4
> hadoop-aws 2.7.4
> 
> I have also tried the following conf:
> classloader.resolve-order: parent-first
> fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop
> 
> But no luck. Anything else I could be missing?
> 
> On 2018/03/14 18:57:47, Francesco Ciuci <fr...@gmail.com> wrote: 
>> Hi,
>> 
>> You do not just need the hadoop dependencies in the jar but you need to
>> have the hadoop file system running in your machine/cluster.
>> 
>> Regards
>> 
>> On 14 March 2018 at 18:38, lrao@lyft.com <lr...@lyft.com> wrote:
>> 
>>> I'm trying to use a BucketingSink to write files to S3 in my Flink job.
>>> 
>>> I have the Hadoop dependencies I need packaged in my user application jar.
>>> However, on running the job I get the following error (from the
>>> taskmanager):
>>> 
>>> java.lang.RuntimeException: Error while creating FileSystem when
>>> initializing the state of the BucketingSink.
>>>        at org.apache.flink.streaming.connectors.fs.bucketing.
>>> BucketingSink.initializeState(BucketingSink.java:358)
>>>        at org.apache.flink.streaming.util.functions.
>>> StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>>        at org.apache.flink.streaming.util.functions.
>>> StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
>>> 160)
>>>        at org.apache.flink.streaming.api.operators.
>>> AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
>>> java:96)
>>>        at org.apache.flink.streaming.api.operators.
>>> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
>>> initializeOperators(StreamTask.java:694)
>>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
>>> initializeState(StreamTask.java:682)
>>>        at org.apache.flink.streaming.runtime.tasks.StreamTask.
>>> invoke(StreamTask.java:253)
>>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>        at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>>> Could not find a file system implementation for scheme 's3a'. The scheme is
>>> not directly supported by Flink and no Hadoop file system to support this
>>> scheme could be loaded.
>>>        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
>>> FileSystem.java:405)
>>>        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>>>        at org.apache.flink.streaming.connectors.fs.bucketing.
>>> BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
>>>        at org.apache.flink.streaming.connectors.fs.bucketing.
>>> BucketingSink.initFileSystem(BucketingSink.java:411)
>>>        at org.apache.flink.streaming.connectors.fs.bucketing.
>>> BucketingSink.initializeState(BucketingSink.java:355)
>>>        ... 9 common frames omitted
>>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>>> Hadoop is not in the classpath/dependencies.
>>>        at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
>>> UnsupportedSchemeFactory.java:64)
>>>        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
>>> FileSystem.java:401)
>>>        ... 13 common frames omitted
>>> 
>>> What's the right way to do this?
>>> 
>> 


Re: Correct way to reference Hadoop dependencies in Flink 1.4.0

Posted by lr...@lyft.com, lr...@lyft.com.
Hi,

I am running this on a Hadoop-free cluster (i.e. no YARN etc.). I have the following dependencies packaged in my user application JAR:

aws-java-sdk 1.7.4
flink-hadoop-fs 1.4.0
flink-shaded-hadoop2 1.4.0
flink-connector-filesystem_2.11 1.4.0
hadoop-common 2.7.4
hadoop-aws 2.7.4

I have also tried the following conf:
classloader.resolve-order: parent-first
fs.hdfs.hadoopconf: /srv/hadoop/hadoop-2.7.5/etc/hadoop

But no luck. Anything else I could be missing?

On 2018/03/14 18:57:47, Francesco Ciuci <fr...@gmail.com> wrote: 
> Hi,
> 
> You do not just need the hadoop dependencies in the jar but you need to
> have the hadoop file system running in your machine/cluster.
> 
> Regards
> 
> On 14 March 2018 at 18:38, lrao@lyft.com <lr...@lyft.com> wrote:
> 
> > I'm trying to use a BucketingSink to write files to S3 in my Flink job.
> >
> > I have the Hadoop dependencies I need packaged in my user application jar.
> > However, on running the job I get the following error (from the
> > taskmanager):
> >
> > java.lang.RuntimeException: Error while creating FileSystem when
> > initializing the state of the BucketingSink.
> >         at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initializeState(BucketingSink.java:358)
> >         at org.apache.flink.streaming.util.functions.
> > StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> >         at org.apache.flink.streaming.util.functions.
> > StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
> > 160)
> >         at org.apache.flink.streaming.api.operators.
> > AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
> > java:96)
> >         at org.apache.flink.streaming.api.operators.
> > AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > initializeOperators(StreamTask.java:694)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > initializeState(StreamTask.java:682)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > invoke(StreamTask.java:253)
> >         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> >         at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> > Could not find a file system implementation for scheme 's3a'. The scheme is
> > not directly supported by Flink and no Hadoop file system to support this
> > scheme could be loaded.
> >         at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> > FileSystem.java:405)
> >         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> >         at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
> >         at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initFileSystem(BucketingSink.java:411)
> >         at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initializeState(BucketingSink.java:355)
> >         ... 9 common frames omitted
> > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> > Hadoop is not in the classpath/dependencies.
> >         at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> > UnsupportedSchemeFactory.java:64)
> >         at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> > FileSystem.java:401)
> >         ... 13 common frames omitted
> >
> > What's the right way to do this?
> >
> 

Re: Correct way to reference Hadoop dependencies in Flink 1.4.0

Posted by Francesco Ciuci <fr...@gmail.com>.
Hi,

You do not just need the hadoop dependencies in the jar but you need to
have the hadoop file system running in your machine/cluster.

Regards

On 14 March 2018 at 18:38, lrao@lyft.com <lr...@lyft.com> wrote:

> I'm trying to use a BucketingSink to write files to S3 in my Flink job.
>
> I have the Hadoop dependencies I need packaged in my user application jar.
> However, on running the job I get the following error (from the
> taskmanager):
>
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
>         at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:358)
>         at org.apache.flink.streaming.util.functions.
> StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>         at org.apache.flink.streaming.util.functions.
> StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
> 160)
>         at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
> java:96)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 's3a'. The scheme is
> not directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
>         at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:405)
>         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>         at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
>         at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initFileSystem(BucketingSink.java:411)
>         at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:355)
>         ... 9 common frames omitted
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.
>         at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
>         at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
>         ... 13 common frames omitted
>
> What's the right way to do this?
>