You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Oleksandr Baliev <al...@gmail.com> on 2018/01/09 21:46:07 UTC

hadoop-free hdfs config

Hello guys,

want to clarify for myself: since flink 1.4.0 allows to use hadoop-free
distribution and dynamic hadoop dependencies loading, I suppose that if to
download hadoop-free distribution, start cluster without any hadoop and
then load any job's jar which has some hadoop dependencies (i
used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start
job which accesses hdfs via source/sink/etc. or making checkpoints can be
run on such hadoop-free cluster.

But when I start a job during config initialization for checkpoint I have
"Hadoop is not in the classpath/dependencies.":

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 'hdfs'. The scheme is not
directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<
init>(FsCheckpointStreamFactory.java:99)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.
createStreamFactory(FsStateBackend.java:277)
...


 What I've found seems in
org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem
in FS_FACTORIES there is no "hdfs" schema registered and FALLBACK_FACTORY
which should be loaded with hadoop factory has
org.apache.flink.core.fs.UnsupportedSchemeFactory
but it loads when taskmanager is starting (when there should be no hadoop
dependencies), so that should be ok.

so as I understand hadoop file system is not recongnised by flink if it was
not loaded at the beginning, is it correct or maybe I just messed up with
something / somewhere?

Thanks,
Sasha

Re: hadoop-free hdfs config

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for trying it out and letting us know.

Cheers,
Till

On Thu, Jan 11, 2018 at 9:56 AM, Oleksandr Baliev <aleksanderbalev@gmail.com
> wrote:

> Hi Till,
>
> thanks for your reply and clarification! With RocksDBStateBackend btw the
> same story, looks like a wrapper over FsStateBackend:
>
> 01/11/2018 09:27:22 Job execution switched to status FAILING.
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:405)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> *at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)*
> *at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)*
> *at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273)*
> * at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)*
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:247)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: *Hadoop
> is not in the classpath/dependencies.*
> at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
>
>
> Then I also changed url for fs state backend to file:// which is ok, but
> then I have the same issue in BucketingSink:
>
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:358)
> ...<some our simple wrapper class call>.initializeState(...)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
> tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
> restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:259)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded.
> *at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)*
> *at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)*
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initFileSystem(BucketingSink.java:411)
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:355)
> ... 10 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: *Hadoop
> is not in the classpath/dependencies.*
> at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
> ... 13 more
>
>
> I was using for tests clean "Without bundled Hadood" flink binaries and
> didn't change anything in configs.
>
> Currently we have to persist checkpoints on "hdfs" so we will use some
> flink-shaded-hadoop2-uber*.jar anyway, thanks.
>
> Best,
> Sasha
>
> 2018-01-10 10:47 GMT+01:00 Till Rohrmann <tr...@apache.org>:
>
>> Hi Sasha,
>>
>> you're right that if you want to access HDFS from the user code only it
>> should be possible to use the Hadoop free Flink version and bundle the
>> Hadoop dependencies with your user code. However, if you want to use
>> Flink's file system state backend as you did, then you have to start the
>> Flink cluster with the Hadoop dependency in its classpath. The reason is
>> that the FsStateBackend is part of the Flink distribution and will be
>> loaded using the system class loader.
>>
>> One thing you could try out is to use the RocksDB state backend instead.
>> Since the RocksDBStateBackend is loaded dynamically, I think it should use
>> the Hadoop dependencies when trying to load the filesystem.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 9, 2018 at 10:46 PM, Oleksandr Baliev <
>> aleksanderbalev@gmail.com> wrote:
>>
>>> Hello guys,
>>>
>>> want to clarify for myself: since flink 1.4.0 allows to use hadoop-free
>>> distribution and dynamic hadoop dependencies loading, I suppose that if to
>>> download hadoop-free distribution, start cluster without any hadoop and
>>> then load any job's jar which has some hadoop dependencies (i
>>> used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start
>>> job which accesses hdfs via source/sink/etc. or making checkpoints can be
>>> run on such hadoop-free cluster.
>>>
>>> But when I start a job during config initialization for checkpoint I
>>> have "Hadoop is not in the classpath/dependencies.":
>>>
>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could
>>> not find a file system implementation for scheme 'hdfs'. The scheme is not
>>> directly supported by Flink and no Hadoop file system to support this
>>> scheme could be loaded.
>>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
>>> ileSystem.java:405)
>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
>>> at org.apache.flink.runtime.state.filesystem.FsCheckpointStream
>>> Factory.<init>(FsCheckpointStreamFactory.java:99)
>>> at org.apache.flink.runtime.state.filesystem.FsStateBackend.cre
>>> ateStreamFactory(FsStateBackend.java:277)
>>> ...
>>>
>>>
>>>  What I've found seems in org.apache.flink.core.fs.Fi
>>> leSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs"
>>> schema registered and FALLBACK_FACTORY which should be loaded with hadoop
>>> factory has org.apache.flink.core.fs.UnsupportedSchemeFactory but it
>>> loads when taskmanager is starting (when there should be no hadoop
>>> dependencies), so that should be ok.
>>>
>>> so as I understand hadoop file system is not recongnised by flink if it
>>> was not loaded at the beginning, is it correct or maybe I just messed up
>>> with something / somewhere?
>>>
>>> Thanks,
>>> Sasha
>>>
>>
>>
>

Re: hadoop-free hdfs config

Posted by Oleksandr Baliev <al...@gmail.com>.
Hi Till,

thanks for your reply and clarification! With RocksDBStateBackend btw the
same story, looks like a wrapper over FsStateBackend:

01/11/2018 09:27:22 Job execution switched to status FAILING.
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 'hdfs'. The scheme is not
directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
*at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)*
*at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)*
*at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273)*
* at
org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)*
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
*Hadoop
is not in the classpath/dependencies.*
at
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)


Then I also changed url for fs state backend to file:// which is ok, but
then I have the same issue in BucketingSink:

java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink.
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
...<some our simple wrapper class call>.initializeState(...)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 'hdfs'. The scheme
is not directly supported by Flink and no Hadoop file system to support
this scheme could be loaded.
*at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)*
*at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)*
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
*Hadoop
is not in the classpath/dependencies.*
at
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
... 13 more


I was using for tests clean "Without bundled Hadood" flink binaries and
didn't change anything in configs.

Currently we have to persist checkpoints on "hdfs" so we will use some
flink-shaded-hadoop2-uber*.jar anyway, thanks.

Best,
Sasha

2018-01-10 10:47 GMT+01:00 Till Rohrmann <tr...@apache.org>:

> Hi Sasha,
>
> you're right that if you want to access HDFS from the user code only it
> should be possible to use the Hadoop free Flink version and bundle the
> Hadoop dependencies with your user code. However, if you want to use
> Flink's file system state backend as you did, then you have to start the
> Flink cluster with the Hadoop dependency in its classpath. The reason is
> that the FsStateBackend is part of the Flink distribution and will be
> loaded using the system class loader.
>
> One thing you could try out is to use the RocksDB state backend instead.
> Since the RocksDBStateBackend is loaded dynamically, I think it should use
> the Hadoop dependencies when trying to load the filesystem.
>
> Cheers,
> Till
>
> On Tue, Jan 9, 2018 at 10:46 PM, Oleksandr Baliev <
> aleksanderbalev@gmail.com> wrote:
>
>> Hello guys,
>>
>> want to clarify for myself: since flink 1.4.0 allows to use hadoop-free
>> distribution and dynamic hadoop dependencies loading, I suppose that if to
>> download hadoop-free distribution, start cluster without any hadoop and
>> then load any job's jar which has some hadoop dependencies (i
>> used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start
>> job which accesses hdfs via source/sink/etc. or making checkpoints can be
>> run on such hadoop-free cluster.
>>
>> But when I start a job during config initialization for checkpoint I have
>> "Hadoop is not in the classpath/dependencies.":
>>
>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
>> find a file system implementation for scheme 'hdfs'. The scheme is not
>> directly supported by Flink and no Hadoop file system to support this
>> scheme could be loaded.
>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
>> ileSystem.java:405)
>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
>> at org.apache.flink.runtime.state.filesystem.FsCheckpointStream
>> Factory.<init>(FsCheckpointStreamFactory.java:99)
>> at org.apache.flink.runtime.state.filesystem.FsStateBackend.cre
>> ateStreamFactory(FsStateBackend.java:277)
>> ...
>>
>>
>>  What I've found seems in org.apache.flink.core.fs.Fi
>> leSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs"
>> schema registered and FALLBACK_FACTORY which should be loaded with hadoop
>> factory has org.apache.flink.core.fs.UnsupportedSchemeFactory but it
>> loads when taskmanager is starting (when there should be no hadoop
>> dependencies), so that should be ok.
>>
>> so as I understand hadoop file system is not recongnised by flink if it
>> was not loaded at the beginning, is it correct or maybe I just messed up
>> with something / somewhere?
>>
>> Thanks,
>> Sasha
>>
>
>

Re: hadoop-free hdfs config

Posted by Till Rohrmann <tr...@apache.org>.
Hi Sasha,

you're right that if you want to access HDFS from the user code only it
should be possible to use the Hadoop free Flink version and bundle the
Hadoop dependencies with your user code. However, if you want to use
Flink's file system state backend as you did, then you have to start the
Flink cluster with the Hadoop dependency in its classpath. The reason is
that the FsStateBackend is part of the Flink distribution and will be
loaded using the system class loader.

One thing you could try out is to use the RocksDB state backend instead.
Since the RocksDBStateBackend is loaded dynamically, I think it should use
the Hadoop dependencies when trying to load the filesystem.

Cheers,
Till

On Tue, Jan 9, 2018 at 10:46 PM, Oleksandr Baliev <aleksanderbalev@gmail.com
> wrote:

> Hello guys,
>
> want to clarify for myself: since flink 1.4.0 allows to use hadoop-free
> distribution and dynamic hadoop dependencies loading, I suppose that if to
> download hadoop-free distribution, start cluster without any hadoop and
> then load any job's jar which has some hadoop dependencies (i
> used 2.6.0-cdh5.10.1), hadoop should be visible in classpath and when start
> job which accesses hdfs via source/sink/etc. or making checkpoints can be
> run on such hadoop-free cluster.
>
> But when I start a job during config initialization for checkpoint I have
> "Hadoop is not in the classpath/dependencies.":
>
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
> ileSystem.java:405)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStream
> Factory.<init>(FsCheckpointStreamFactory.java:99)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend.cre
> ateStreamFactory(FsStateBackend.java:277)
> ...
>
>
>  What I've found seems in org.apache.flink.core.fs.Fi
> leSystem#getUnguardedFileSystem in FS_FACTORIES there is no "hdfs" schema
> registered and FALLBACK_FACTORY which should be loaded with hadoop factory
> has org.apache.flink.core.fs.UnsupportedSchemeFactory but it loads when
> taskmanager is starting (when there should be no hadoop dependencies), so
> that should be ok.
>
> so as I understand hadoop file system is not recongnised by flink if it
> was not loaded at the beginning, is it correct or maybe I just messed up
> with something / somewhere?
>
> Thanks,
> Sasha
>