You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Hill <qu...@gmail.com> on 2020/08/28 19:11:22 UTC

Issues with Flink Batch and Hadoop dependency

I'm assuming I have a simple, common setup problem.  I've spent 6 hours
debugging and haven't been able to figure it out.  Any help would be
greatly appreciated.


*Problem*
I have a Flink Streaming job setup that writes SequenceFiles in S3.  When I
try to create a Flink Batch job to read these Sequence files, I get the
following error:

NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat

It fails on this readSequenceFile.

env.createInput(HadoopInputs.readSequenceFile(Text.class,
ByteWritable.class, INPUT_FILE))

If I directly depend on org-apache-hadoop/hadoop-mapred when building the
job, I get the following error when trying to run the job:

Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "s3"
    at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
    at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
    at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
    at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209)
    at
org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48)
    at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254)
    at
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
    at
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
    at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:257)


*Extra context*
I'm using this Helm chart <https://hub.helm.sh/charts/riskfocus/flink> for
creating Flink.  I'm using v1.10.1.


*Questions*
Are there any existing projects that read batch Hadoop file formats from S3?

I've looked at these instructions for Hadoop Integration
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#add-hadoop-classpaths>.
I'm assuming my configuration is wrong.  I'm also assuming I need the
hadoop dependency properly setup in the jobmanager and taskmanager (not in
the job itself).  If I use this Helm chart, do I need to download a hadoop
common jar into the Flink images for jobmanager and taskmanager?  Are there
pre-built images which I can use that already have the dependencies setup?


- Dan

Re: Issues with Flink Batch and Hadoop dependency

Posted by Arvid Heise <ar...@ververica.com>.
Hi Dan,

Your approach in general is good. You might want to use the bundled hadoop
uber jar [1] to save some time if you find the appropriate version. You can
also build your own version and include it then in lib/.

In general, I'd recommend moving away from sequence files. As soon as you
change your records minimally, everything falls apart. Going with
established binary formats like Avro or Parquet is usually desired also
because of the additional tooling and pays quickly off in the long run.

[1] https://flink.apache.org/downloads.html#additional-components

On Sat, Aug 29, 2020 at 10:50 PM Dan Hill <qu...@gmail.com> wrote:

> I was able to get a basic version to work by including a bunch of hadoop
> and s3 dependencies in the job jar and hacking in some hadoop config
> values.  It's probably not optimal but it looks like I'm unblocked.
>
> On Fri, Aug 28, 2020 at 12:11 PM Dan Hill <qu...@gmail.com> wrote:
>
>> I'm assuming I have a simple, common setup problem.  I've spent 6 hours
>> debugging and haven't been able to figure it out.  Any help would be
>> greatly appreciated.
>>
>>
>> *Problem*
>> I have a Flink Streaming job setup that writes SequenceFiles in S3.  When
>> I try to create a Flink Batch job to read these Sequence files, I get the
>> following error:
>>
>> NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
>>
>> It fails on this readSequenceFile.
>>
>> env.createInput(HadoopInputs.readSequenceFile(Text.class,
>> ByteWritable.class, INPUT_FILE))
>>
>> If I directly depend on org-apache-hadoop/hadoop-mapred when building the
>> job, I get the following error when trying to run the job:
>>
>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>> FileSystem for scheme "s3"
>>     at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
>>     at
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>>     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>>     at
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>>     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>>     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
>>     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>>     at
>> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209)
>>     at
>> org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48)
>>     at
>> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254)
>>     at
>> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
>>     at
>> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
>>     at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:257)
>>
>>
>> *Extra context*
>> I'm using this Helm chart <https://hub.helm.sh/charts/riskfocus/flink>
>> for creating Flink.  I'm using v1.10.1.
>>
>>
>> *Questions*
>> Are there any existing projects that read batch Hadoop file formats from
>> S3?
>>
>> I've looked at these instructions for Hadoop Integration
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#add-hadoop-classpaths>.
>> I'm assuming my configuration is wrong.  I'm also assuming I need the
>> hadoop dependency properly setup in the jobmanager and taskmanager (not in
>> the job itself).  If I use this Helm chart, do I need to download a hadoop
>> common jar into the Flink images for jobmanager and taskmanager?  Are there
>> pre-built images which I can use that already have the dependencies setup?
>>
>>
>> - Dan
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Issues with Flink Batch and Hadoop dependency

Posted by Dan Hill <qu...@gmail.com>.
I was able to get a basic version to work by including a bunch of hadoop
and s3 dependencies in the job jar and hacking in some hadoop config
values.  It's probably not optimal but it looks like I'm unblocked.

On Fri, Aug 28, 2020 at 12:11 PM Dan Hill <qu...@gmail.com> wrote:

> I'm assuming I have a simple, common setup problem.  I've spent 6 hours
> debugging and haven't been able to figure it out.  Any help would be
> greatly appreciated.
>
>
> *Problem*
> I have a Flink Streaming job setup that writes SequenceFiles in S3.  When
> I try to create a Flink Batch job to read these Sequence files, I get the
> following error:
>
> NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
>
> It fails on this readSequenceFile.
>
> env.createInput(HadoopInputs.readSequenceFile(Text.class,
> ByteWritable.class, INPUT_FILE))
>
> If I directly depend on org-apache-hadoop/hadoop-mapred when building the
> job, I get the following error when trying to run the job:
>
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "s3"
>     at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
>     at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>     at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
>     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>     at
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209)
>     at
> org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48)
>     at
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254)
>     at
> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
>     at
> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
>     at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:257)
>
>
> *Extra context*
> I'm using this Helm chart <https://hub.helm.sh/charts/riskfocus/flink>
> for creating Flink.  I'm using v1.10.1.
>
>
> *Questions*
> Are there any existing projects that read batch Hadoop file formats from
> S3?
>
> I've looked at these instructions for Hadoop Integration
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#add-hadoop-classpaths>.
> I'm assuming my configuration is wrong.  I'm also assuming I need the
> hadoop dependency properly setup in the jobmanager and taskmanager (not in
> the job itself).  If I use this Helm chart, do I need to download a hadoop
> common jar into the Flink images for jobmanager and taskmanager?  Are there
> pre-built images which I can use that already have the dependencies setup?
>
>
> - Dan
>