You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Piotr Nowojski <pi...@data-artisans.com> on 2018/03/09 08:43:23 UTC

Re: Using the BucketingSink with Flink 1.4.0

Hi,

There is an quite old ticket about this issue. Feel free to bump it in the comment to rise it’s priority.

https://issues.apache.org/jira/browse/FLINK-5789 <https://issues.apache.org/jira/browse/FLINK-5789>

Regarding a walk around, maybe someone else will know more. There was a similar discussion on this topic which:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/hadoop-free-hdfs-config-td17693.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/hadoop-free-hdfs-config-td17693.html>

Piotrek

> On 9 Mar 2018, at 02:11, lrao@lyft.com wrote:
> 
> I want to use the BucketingSink in the hadoop-free Flink system (i.e. 1.4.0) but currently I am kind of blocked because of its dependency on the Hadoop file system. 
> 1. Is this something that's going to be fixed in the next version of Flink? 
> 2. In the meantime, to unblock myself, what is the best way forward? I have tried packaging the hadoop dependencies I need in my user jar but I run into problems while running the job. Stacktrace below:
> ```
> 21:26:09.654 INFO  o.a.f.r.t.Task - Source: source -> Sink: S3-Sink (1/1) (9ac2cb1fc2b913c3b9d75aace08bcd37) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
>        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>        at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
>        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
>        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
>        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>        ... 9 common frames omitted
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
>        at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
>        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>        ... 12 common frames omitted
> ```


Re: Using the BucketingSink with Flink 1.4.0

Posted by lr...@lyft.com, lr...@lyft.com.
Thank you for your responses Stephan and Piotrek!
It's great to know that the hadoop-free Bucketing Sink might be available as early as 1.5.x!

In the meantime, I have been trying workarounds but I am currently facing issues making it work. 
I tried including my Hadoop dependencies only in my user jar (but that didn't quite work, threw the classpath error I pasted earlier)

Currently my set up is: 

flink-conf.yaml (Additional params)
fs.hdfs.hadoopconf: /srv/hadoop-2.7.5/etc/hadoop
classloader.resolve-order: parent-first

Libs in /srv/flink/lib:
**

total 181864
-rw-r--r-- 1 root root 86370565 Dec 6 12:10 flink-dist_2.11-1.4.0.jar
-rw-r--r-- 1 root root 5177639 Mar 9 23:29 streamingplatform-core-1.0.4-20180228.035408-8.jar
-rw-r--r-- 1 root root 38244416 Mar 9 23:29 flink-s3-fs-presto-1.4.0.jar
-rw-r--r-- 1 root root 39662811 Mar 9 23:43 flink-shaded-hadoop2-uber-1.4.0.jar
-rw-r--r-- 1 root root 126287 Mar 9 23:43 hadoop-aws-2.7.3.jar
-rw-r--r-- 1 root root 11948376 Mar 9 23:43 aws-java-sdk-1.7.4.jar
-rw-r--r-- 1 root root 849398 Mar 9 23:44 aws-java-sdk-core-1.11.183.jar
-rw-r--r-- 1 root root 403994 Mar 9 23:44 aws-java-sdk-kms-1.11.183.jar
-rw-r--r-- 1 root root 258919 Mar 9 23:44 jackson-core-2.6.7.jar
-rw-r--r-- 1 root root 46986 Mar 9 23:44 jackson-annotations-2.6.7.jar
-rw-r--r-- 1 root root 1170668 Mar 9 23:45 jackson-databind-2.6.7.jar
-rw-r--r-- 1 root root 621931 Mar 9 23:45 joda-time-2.8.1.jar
-rw-r--r-- 1 root root 747794 Mar 9 23:46 httpclient-4.5.3.jar
-rw-r--r-- 1 root root 326724 Mar 9 23:46 httpcore-4.4.4.jar
 
core-site.xml
<configuration>		
    <property>		
         <name>fs.s3.impl</name>		
         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>		
     </property>		
     <property>		
         <name>fs.s3.buffer.dir</name>		
         <value>/tmp</value>		
     </property>		
 </configuration> 

Errors:

00:56:52.494 INFO o.a.f.r.t.Task - Source: source -> Sink: S3-Sink-Ugly-Lib (1/1) (b70868c8543e8ea28813f6b745bbb85b) switched from RUNNING to FAILED.
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: 5224A5007E58235E, AWS Error Code: AccessDenied, AWS Error Message: Access Denied
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

I am using IAM roles for granting access to S3 and the S3a filesystem. I am able to write to the bucket outside of the job (via command line). Any pointers on how to workaround this will be helpful!

Thanks much,
Lakshmi

On 2018/03/09 11:13:28, Stephan Ewen <se...@apache.org> wrote: 
> Hi!
> 
> Yes, the bucketing sink is unfortunately still tied to some specific Hadoop
> file systems, due to a special way of using truncate() and append().
> 
> This is very high up our list post the 1.5 release, possibly even
> backportable to 1.5.x.
> 
> The plan is to create a new Bucketing Sink based on Flink's file systems,
> meaning it can also work with Hadoop-free Flink when using file:// s3:// or
> so.
> 
> Best,
> Stephan
> 
> 
> On Fri, Mar 9, 2018 at 9:43 AM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
> 
> > Hi,
> >
> > There is an quite old ticket about this issue. Feel free to bump it in the
> > comment to rise it’s priority.
> >
> > https://issues.apache.org/jira/browse/FLINK-5789 <
> > https://issues.apache.org/jira/browse/FLINK-5789>
> >
> > Regarding a walk around, maybe someone else will know more. There was a
> > similar discussion on this topic which:
> >
> > http://apache-flink-user-mailing-list-archive.2336050.
> > n4.nabble.com/hadoop-free-hdfs-config-td17693.html <
> > http://apache-flink-user-mailing-list-archive.2336050.
> > n4.nabble.com/hadoop-free-hdfs-config-td17693.html>
> >
> > Piotrek
> >
> > > On 9 Mar 2018, at 02:11, lrao@lyft.com wrote:
> > >
> > > I want to use the BucketingSink in the hadoop-free Flink system (i.e.
> > 1.4.0) but currently I am kind of blocked because of its dependency on the
> > Hadoop file system.
> > > 1. Is this something that's going to be fixed in the next version of
> > Flink?
> > > 2. In the meantime, to unblock myself, what is the best way forward? I
> > have tried packaging the hadoop dependencies I need in my user jar but I
> > run into problems while running the job. Stacktrace below:
> > > ```
> > > 21:26:09.654 INFO  o.a.f.r.t.Task - Source: source -> Sink: S3-Sink
> > (1/1) (9ac2cb1fc2b913c3b9d75aace08bcd37) switched from RUNNING to FAILED.
> > > java.lang.RuntimeException: Error while creating FileSystem when
> > initializing the state of the BucketingSink.
> > >        at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initializeState(BucketingSink.java:358)
> > >        at org.apache.flink.streaming.util.functions.
> > StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> > >        at org.apache.flink.streaming.util.functions.
> > StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
> > 160)
> > >        at org.apache.flink.streaming.api.operators.
> > AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
> > java:96)
> > >        at org.apache.flink.streaming.api.operators.
> > AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> > >        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > initializeOperators(StreamTask.java:694)
> > >        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > initializeState(StreamTask.java:682)
> > >        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > invoke(StreamTask.java:253)
> > >        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> > >        at java.lang.Thread.run(Thread.java:748)
> > > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> > Could not find a file system implementation for scheme 'hdfs'. The scheme
> > is not directly supported by Flink and no Hadoop file system to support
> > this scheme could be loaded.
> > >        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> > FileSystem.java:405)
> > >        at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
> > >        at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initFileSystem(BucketingSink.java:411)
> > >        at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.initializeState(BucketingSink.java:355)
> > >        ... 9 common frames omitted
> > > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> > Hadoop is not in the classpath/dependencies.
> > >        at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> > UnsupportedSchemeFactory.java:64)
> > >        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> > FileSystem.java:401)
> > >        ... 12 common frames omitted
> > > ```
> >
> >
> 

Re: Using the BucketingSink with Flink 1.4.0

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Yes, the bucketing sink is unfortunately still tied to some specific Hadoop
file systems, due to a special way of using truncate() and append().

This is very high up our list post the 1.5 release, possibly even
backportable to 1.5.x.

The plan is to create a new Bucketing Sink based on Flink's file systems,
meaning it can also work with Hadoop-free Flink when using file:// s3:// or
so.

Best,
Stephan


On Fri, Mar 9, 2018 at 9:43 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> There is an quite old ticket about this issue. Feel free to bump it in the
> comment to rise it’s priority.
>
> https://issues.apache.org/jira/browse/FLINK-5789 <
> https://issues.apache.org/jira/browse/FLINK-5789>
>
> Regarding a walk around, maybe someone else will know more. There was a
> similar discussion on this topic which:
>
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/hadoop-free-hdfs-config-td17693.html <
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/hadoop-free-hdfs-config-td17693.html>
>
> Piotrek
>
> > On 9 Mar 2018, at 02:11, lrao@lyft.com wrote:
> >
> > I want to use the BucketingSink in the hadoop-free Flink system (i.e.
> 1.4.0) but currently I am kind of blocked because of its dependency on the
> Hadoop file system.
> > 1. Is this something that's going to be fixed in the next version of
> Flink?
> > 2. In the meantime, to unblock myself, what is the best way forward? I
> have tried packaging the hadoop dependencies I need in my user jar but I
> run into problems while running the job. Stacktrace below:
> > ```
> > 21:26:09.654 INFO  o.a.f.r.t.Task - Source: source -> Sink: S3-Sink
> (1/1) (9ac2cb1fc2b913c3b9d75aace08bcd37) switched from RUNNING to FAILED.
> > java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
> >        at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:358)
> >        at org.apache.flink.streaming.util.functions.
> StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> >        at org.apache.flink.streaming.util.functions.
> StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:
> 160)
> >        at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.
> java:96)
> >        at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> >        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> >        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> >        at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> >        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> >        at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded.
> >        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:405)
> >        at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
> >        at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initFileSystem(BucketingSink.java:411)
> >        at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:355)
> >        ... 9 common frames omitted
> > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.
> >        at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
> >        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
> >        ... 12 common frames omitted
> > ```
>
>