You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Julio Biason <ju...@azion.com> on 2018/10/02 13:21:44 UTC
BucketingSink to S3: Missing class com/amazonaws/AmazonClientException
Hey guys,
I've setup a BucketingSink as a dead letter queue into our Ceph
cluster using S3, but when I start the job, I get this error:
java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 17 more
I find it weird 'cause I've already set up checkpoints (and savepoitns) to
use S3 as protocol, and I just assume that, if it works for checkpoints, it
should work here.
(I suppose I could add the aws client as a dependency of my build but,
again, I assumed that once S3 works for checkpoints, it should work
everywhere.)
And kinda related, can I assume that using the FileSystem class to create
FSOutputStreams will follow Flink configuration? I have another type of
dead letter queue that won't work with BucketingSink and I was thinking
about using it directly to create files inside that Ceph/S3.
--
*Julio Biason*, Sofware Engineer
*AZION* | Deliver. Accelerate. Protect.
Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51
<callto:+5551996209291>*99907 0554*
Re: BucketingSink to S3: Missing class com/amazonaws/AmazonClientException
Posted by Amit Jain <aj...@gmail.com>.
Hi Julio,
What's the Flink version for this setup?
--
Thanks,
Amit
On Wed, Oct 3, 2018 at 4:22 PM Andrey Zagrebin <an...@data-artisans.com>
wrote:
> Hi Julio,
>
> Looks like some problem with dependencies.
> Have you followed the recommended s3 configuration guide [1]?
> Is it correct that your job already created checkpoints/savepoints on s3
> before?
>
> I think if you manually create file system using FileSystem.get(path), it
> should be configured the same way as for bucketing sink and checkpoints.
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/aws.html#s3-simple-storage-service
>
> On 2 Oct 2018, at 15:21, Julio Biason <ju...@azion.com> wrote:
>
> Hey guys,
>
> I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:
>
>
> java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306)
> at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271)
> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 17 more
>
> I find it weird 'cause I've already set up checkpoints (and savepoitns) to
> use S3 as protocol, and I just assume that, if it works for checkpoints, it
> should work here.
>
> (I suppose I could add the aws client as a dependency of my build but,
> again, I assumed that once S3 works for checkpoints, it should work
> everywhere.)
>
> And kinda related, can I assume that using the FileSystem class to create
> FSOutputStreams will follow Flink configuration? I have another type of
> dead letter queue that won't work with BucketingSink and I was thinking
> about using it directly to create files inside that Ceph/S3.
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION* | Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51
> <callto:+5551996209291>*99907 0554*
>
>
>
Re: BucketingSink to S3: Missing class
com/amazonaws/AmazonClientException
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
they are actually using different interfaces and dependencies. Checkpointing uses Flink FileSystem and the shaded Hadoop Filesystem is a special implementation of this based on the Hadoop S3 FileSystem that has all dependencies bundled in. The BucketingSink uses HDFS/Hadoop FileSystem, therefore this needs to have the correct dependency setup.
Flink 1.6. released the new StreamingFileSink which is a replacement for BucketingSink. With Flink 1.7 this will also support the bundled S3 file systems.
Best,
Aljoscha
> On 3. Oct 2018, at 17:55, Julio Biason <ju...@azion.com> wrote:
>
> Hi Andrey,
>
> Yes, we followed the guide. Our checkpoints/savepoints are already being saved on S3/Ceph, using the ShadedHadoop/S3AFileSystem (because it's the one we managed to completely override the AWS address to point to our Ceph cluster).
>
> I suppose I can add the package with the AmazonClientException to my project, but I still wonder why it works fine for Flink but fails for my project; in theory, both are using the same dependencies, right?
>
> On Wed, Oct 3, 2018 at 7:51 AM, Andrey Zagrebin <andrey@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi Julio,
>
> Looks like some problem with dependencies.
> Have you followed the recommended s3 configuration guide [1]?
> Is it correct that your job already created checkpoints/savepoints on s3 before?
>
> I think if you manually create file system using FileSystem.get(path), it should be configured the same way as for bucketing sink and checkpoints.
>
> Best,
> Andrey
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/aws.html#s3-simple-storage-service <https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/aws.html#s3-simple-storage-service>
>
>> On 2 Oct 2018, at 15:21, Julio Biason <julio.biason@azion.com <ma...@azion.com>> wrote:
>>
>> Hey guys,
>>
>> I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:
>>
>> java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306)
>> at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271)
>> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
>> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)
>> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295)
>> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
>> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 17 more
>> I find it weird 'cause I've already set up checkpoints (and savepoitns) to use S3 as protocol, and I just assume that, if it works for checkpoints, it should work here.
>>
>> (I suppose I could add the aws client as a dependency of my build but, again, I assumed that once S3 works for checkpoints, it should work everywhere.)
>>
>> And kinda related, can I assume that using the FileSystem class to create FSOutputStreams will follow Flink configuration? I have another type of dead letter queue that won't work with BucketingSink and I was thinking about using it directly to create files inside that Ceph/S3.
>>
>> --
>> Julio Biason, Sofware Engineer
>> AZION | Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51 <callto:+5551996209291>99907 0554
>
>
>
>
> --
> Julio Biason, Sofware Engineer
> AZION | Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51 <callto:+5551996209291>99907 0554
Re: BucketingSink to S3: Missing class com/amazonaws/AmazonClientException
Posted by Julio Biason <ju...@azion.com>.
Hi Andrey,
Yes, we followed the guide. Our checkpoints/savepoints are already being
saved on S3/Ceph, using the ShadedHadoop/S3AFileSystem (because it's the
one we managed to completely override the AWS address to point to our Ceph
cluster).
I suppose I can add the package with the AmazonClientException to my
project, but I still wonder why it works fine for Flink but fails for my
project; in theory, both are using the same dependencies, right?
On Wed, Oct 3, 2018 at 7:51 AM, Andrey Zagrebin <an...@data-artisans.com>
wrote:
> Hi Julio,
>
> Looks like some problem with dependencies.
> Have you followed the recommended s3 configuration guide [1]?
> Is it correct that your job already created checkpoints/savepoints on s3
> before?
>
> I think if you manually create file system using FileSystem.get(path), it
> should be configured the same way as for bucketing sink and checkpoints.
>
> Best,
> Andrey
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/
> ops/deployment/aws.html#s3-simple-storage-service
>
> On 2 Oct 2018, at 15:21, Julio Biason <ju...@azion.com> wrote:
>
> Hey guys,
>
> I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:
>
>
> java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306)
> at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271)
> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 17 more
>
> I find it weird 'cause I've already set up checkpoints (and savepoitns) to
> use S3 as protocol, and I just assume that, if it works for checkpoints, it
> should work here.
>
> (I suppose I could add the aws client as a dependency of my build but,
> again, I assumed that once S3 works for checkpoints, it should work
> everywhere.)
>
> And kinda related, can I assume that using the FileSystem class to create
> FSOutputStreams will follow Flink configuration? I have another type of
> dead letter queue that won't work with BucketingSink and I was thinking
> about using it directly to create files inside that Ceph/S3.
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION* | Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51
> <callto:+5551996209291>*99907 0554*
>
>
>
--
*Julio Biason*, Sofware Engineer
*AZION* | Deliver. Accelerate. Protect.
Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51
<callto:+5551996209291>*99907 0554*
Re: BucketingSink to S3: Missing class
com/amazonaws/AmazonClientException
Posted by Andrey Zagrebin <an...@data-artisans.com>.
Hi Julio,
Looks like some problem with dependencies.
Have you followed the recommended s3 configuration guide [1]?
Is it correct that your job already created checkpoints/savepoints on s3 before?
I think if you manually create file system using FileSystem.get(path), it should be configured the same way as for bucketing sink and checkpoints.
Best,
Andrey
[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/aws.html#s3-simple-storage-service
> On 2 Oct 2018, at 15:21, Julio Biason <ju...@azion.com> wrote:
>
> Hey guys,
>
> I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:
>
> java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306)
> at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271)
> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 17 more
> I find it weird 'cause I've already set up checkpoints (and savepoitns) to use S3 as protocol, and I just assume that, if it works for checkpoints, it should work here.
>
> (I suppose I could add the aws client as a dependency of my build but, again, I assumed that once S3 works for checkpoints, it should work everywhere.)
>
> And kinda related, can I assume that using the FileSystem class to create FSOutputStreams will follow Flink configuration? I have another type of dead letter queue that won't work with BucketingSink and I was thinking about using it directly to create files inside that Ceph/S3.
>
> --
> Julio Biason, Sofware Engineer
> AZION | Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51 <callto:+5551996209291>99907 0554