You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by David Magalhães <sp...@gmail.com> on 2020/02/05 14:21:52 UTC

NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

I'm implementing an exponential backoff inside a custom sink that uses an
AvroParquetWriter to write to S3. I've change the number of attempts to 0
inside the core-site.xml, and I'm capturing the timeout exception, doing a
Thread.sleep for X seconds. This is working as intended, and when S3 is
offline, it waits until it is online.

I also want to test that the back pressure and the checkpoints are working
as intended, and for the first one, I can see the back pressure in Flink UI
going up, and recover as expected and not reading more data from Kafka.

For the checkpoints, and I've added inside the sink invoke function a
randomly exception (1 in 100, to simulate that a problem has happen, and
need to recover from the last good checkpoint), but something strange
happens. I can see the job is being canceled and created again, and running
fine, other times after a X number of times of being created and canceled,
it gives a *NoClassDefFoundError*, and it will keep giving that forever.

Do you guys have any thoughts?

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeParserBucket}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
... 7 more
Caused by: java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeParserBucket
at
org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
at
com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
at
com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
at
com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
at
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
at
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
at
org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
at
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
at
org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
at
com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.util.Try$.apply(Try.scala:209)
at
com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
at
com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
at
com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 more

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

Posted by David Magalhães <sp...@gmail.com>.
Thanks for the feedback Arvid. Currently isn't an issue, but I will look
back into it in the future.

On Tue, Feb 18, 2020 at 1:51 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi David,
>
> sorry for replying late. I was caught up on other incidents.
>
> I double-checked all the information that you provided and conclude that
> you completely bypass our filesystems and plugins.
>
> What you are using is AvroParquetWriter, which brings in the hadoop
> dependencies, including raw hadoop s3. It becomes obvious since the Path
> you are using is not coming from Flink namespace.
> The class issues that come from that are hard to debug. You are
> effectively bundling another hadoop, so if you also have a specific Hadoop
> version on your cluster (e.g. on EMR), then there can be ambiguities and
> the seen error happens.
>
> What I'd recommend you do is a completely different approach. Assuming you
> just want exponential backoff for all s3 write accesses, you could wrap the
> S3AFileSystem and create your own s3 plugin. That would work with any given
> format for future cases.
>
> If you want to stick to your approach, you should use
> org.apache.flink.formats.parquet.ParquetWriterFactory, which uses your
> mentioned StreamOutputFile.
>
> Best,
>
> Arvid
>
> On Thu, Feb 13, 2020 at 12:04 AM David Magalhães <sp...@gmail.com>
> wrote:
>
>> Hi Arvid, I use a docker image. Here is the Dockerfile:
>>
>> FROM flink:1.9.1-scala_2.12
>>
>> RUN mkdir /opt/flink/plugins/flink-s3-fs-hadoop
>> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.9.1.jar
>> /opt/flink/plugins/flink-s3-fs-hadoop/
>>
>> Please let me know if you need more information.
>>
>> On Wed, Feb 12, 2020 at 9:15 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi David,
>>>
>>> can you double-check the folder structure of your plugin? It should
>>> reside in its own subfolder. Here is an example.
>>>
>>> flink-dist
>>> ├── conf
>>> ├── lib
>>> ...
>>> └── plugins
>>>     └── s3
>>>         └── flink-s3-fs-hadoop.jar
>>>
>>> I will investigate your error deeply in the next few days but I'd like
>>> to have a final confirmation about the folder structure.
>>>
>>>
>>> On Wed, Feb 12, 2020 at 8:56 PM David Magalhães <sp...@gmail.com>
>>> wrote:
>>>
>>>> Hi Robert, I couldn't found any previous mention before the
>>>> NoClassDefFoundError.
>>>> Here is the full log [1] if you want to look for something more
>>>> specific.
>>>>
>>>> [1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0
>>>>
>>>> On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger <rm...@apache.org>
>>>> wrote:
>>>>
>>>>> According to this answer [1] the first exception "mentioning"
>>>>> org/joda/time/format/DateTimeParserBucket should be a different one.
>>>>> Can you go through the logs to make sure it is really a
>>>>> ClassNotFoundException, and not a ExceptionInInitializerError or something
>>>>> else?
>>>>>
>>>>> [1]https://stackoverflow.com/a/5756989/568695
>>>>>
>>>>> On Wed, Feb 12, 2020 at 12:36 PM David Magalhães <
>>>>> speeddragon@gmail.com> wrote:
>>>>>
>>>>>> Hi Arvid,
>>>>>>
>>>>>> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said
>>>>>> previously, this works normally until an exception is throw inside the
>>>>>> sink. It will try to recover again, but sometimes doesn't recover giving
>>>>>> this error.
>>>>>>
>>>>>> To write to S3 I use *AvroParquetWriter* with the following code:
>>>>>>
>>>>>> val writer = AvroParquetWriter
>>>>>>          .builder[GenericRecord](new Path(finalFilePath))
>>>>>>
>>>>>> *Path* is from *org.apache.hadoop.fs*, the other option is to use* org.apache.flink.formats.parquet.StreamOutputFile
>>>>>> *which will use flink S3 plugin, right ? Not sure how can I convert
>>>>>> from Path to StreamOuputFile. I know that when I've used StreamingFileSink,
>>>>>> I used StreamOuputFile.
>>>>>>
>>>>>> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <ar...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> upon closer reviewing your stacktrace, it seems like you are trying
>>>>>>> to access S3 without our S3 plugin. That's in general not recommended at
>>>>>>> all.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Arvid
>>>>>>>
>>>>>>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi David,
>>>>>>>>
>>>>>>>> this seems to be a bug in our s3 plugin. The joda dependency should
>>>>>>>> be bundled there.
>>>>>>>>
>>>>>>>> Are you using s3 as a plugin by any chance? Which flink version are
>>>>>>>> you using?
>>>>>>>>
>>>>>>>> If you are using s3 as a plugin, you could put joda in your plugin
>>>>>>>> folder like this
>>>>>>>>
>>>>>>>> flink-dist
>>>>>>>> ├── conf
>>>>>>>> ├── lib
>>>>>>>> ...
>>>>>>>> └── plugins
>>>>>>>>     └── s3
>>>>>>>>         ├── joda.jar
>>>>>>>>         └── flink-s3-fs-hadoop.jar
>>>>>>>>
>>>>>>>> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into
>>>>>>>> that.
>>>>>>>>
>>>>>>>> Adding joda to your user code will unfortunately not work.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Arvid
>>>>>>>>
>>>>>>>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <
>>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Andrey, thanks for your reply.
>>>>>>>>>
>>>>>>>>> The class is on the jar created with `*sbt assembly*` that is
>>>>>>>>> submitted to Flink to start a Job.
>>>>>>>>>
>>>>>>>>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep
>>>>>>>>> DateTimeParserBucket
>>>>>>>>>      1649  05-27-2016 10:24
>>>>>>>>> org/joda/time/format/DateTimeParserBucket$SavedField.class
>>>>>>>>>      1984  05-27-2016 10:24
>>>>>>>>> org/joda/time/format/DateTimeParserBucket$SavedState.class
>>>>>>>>>      8651  05-27-2016 10:24
>>>>>>>>> org/joda/time/format/DateTimeParserBucket.class
>>>>>>>>>
>>>>>>>>> Shouldn't this be enough ?
>>>>>>>>>
>>>>>>>>> I think it uses is when nothing happens, but as soon it have some
>>>>>>>>> exceptions, looks like it "forgets" it.
>>>>>>>>>
>>>>>>>>> Like I said before, this is kind of intermittent.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> David
>>>>>>>>>
>>>>>>>>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <
>>>>>>>>> azagrebin@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi David,
>>>>>>>>>>
>>>>>>>>>> This looks like a problem with resolution of maven dependencies
>>>>>>>>>> or something.
>>>>>>>>>> The custom WindowParquetGenericRecordListFileSink probably
>>>>>>>>>> transitively depends on org/joda/time/format/DateTimeParserBucket
>>>>>>>>>> and it is missing on the runtime classpath of Flink.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Andrey
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm implementing an exponential backoff inside a custom sink
>>>>>>>>>>> that uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>>>>>
>>>>>>>>>>> I also want to test that the back pressure and the checkpoints
>>>>>>>>>>> are working as intended, and for the first one, I can see the back pressure
>>>>>>>>>>> in Flink UI going up, and recover as expected and not reading more data
>>>>>>>>>>> from Kafka.
>>>>>>>>>>>
>>>>>>>>>>> For the checkpoints, and I've added inside the sink invoke
>>>>>>>>>>> function a randomly exception (1 in 100, to simulate that a problem has
>>>>>>>>>>> happen, and need to recover from the last good checkpoint), but something
>>>>>>>>>>> strange happens. I can see the job is being canceled and created again, and
>>>>>>>>>>> running fine, other times after a X number of times of being created and
>>>>>>>>>>> canceled, it gives a *NoClassDefFoundError*, and it will keep
>>>>>>>>>>> giving that forever.
>>>>>>>>>>>
>>>>>>>>>>> Do you guys have any thoughts?
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>>>>> Caught exception while processing timer.
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>>>>> ... 7 more
>>>>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>>>>> at
>>>>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>>>>> at
>>>>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>>>>> ... 7 more
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm implementing an exponential backoff inside a custom sink
>>>>>>>>>>> that uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>>>>>
>>>>>>>>>>> I also want to test that the back pressure and the checkpoints
>>>>>>>>>>> are working as intended, and for the first one, I can see the back pressure
>>>>>>>>>>> in Flink UI going up, and recover as expected and not reading more data
>>>>>>>>>>> from Kafka.
>>>>>>>>>>>
>>>>>>>>>>> For the checkpoints, and I've added inside the sink invoke
>>>>>>>>>>> function a randomly exception (1 in 100, to simulate that a problem has
>>>>>>>>>>> happen, and need to recover from the last good checkpoint), but something
>>>>>>>>>>> strange happens. I can see the job is being canceled and created again, and
>>>>>>>>>>> running fine, other times after a X number of times of being created and
>>>>>>>>>>> canceled, it gives a *NoClassDefFoundError*, and it will keep
>>>>>>>>>>> giving that forever.
>>>>>>>>>>>
>>>>>>>>>>> Do you guys have any thoughts?
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>>>>> Caught exception while processing timer.
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>>>>> ... 7 more
>>>>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>>>>> at
>>>>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>>>>> at
>>>>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>>>>> ... 7 more
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

Posted by Arvid Heise <ar...@ververica.com>.
Hi David,

sorry for replying late. I was caught up on other incidents.

I double-checked all the information that you provided and conclude that
you completely bypass our filesystems and plugins.

What you are using is AvroParquetWriter, which brings in the hadoop
dependencies, including raw hadoop s3. It becomes obvious since the Path
you are using is not coming from Flink namespace.
The class issues that come from that are hard to debug. You are effectively
bundling another hadoop, so if you also have a specific Hadoop version on
your cluster (e.g. on EMR), then there can be ambiguities and the seen
error happens.

What I'd recommend you do is a completely different approach. Assuming you
just want exponential backoff for all s3 write accesses, you could wrap the
S3AFileSystem and create your own s3 plugin. That would work with any given
format for future cases.

If you want to stick to your approach, you should use
org.apache.flink.formats.parquet.ParquetWriterFactory, which uses your
mentioned StreamOutputFile.

Best,

Arvid

On Thu, Feb 13, 2020 at 12:04 AM David Magalhães <sp...@gmail.com>
wrote:

> Hi Arvid, I use a docker image. Here is the Dockerfile:
>
> FROM flink:1.9.1-scala_2.12
>
> RUN mkdir /opt/flink/plugins/flink-s3-fs-hadoop
> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.9.1.jar
> /opt/flink/plugins/flink-s3-fs-hadoop/
>
> Please let me know if you need more information.
>
> On Wed, Feb 12, 2020 at 9:15 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi David,
>>
>> can you double-check the folder structure of your plugin? It should
>> reside in its own subfolder. Here is an example.
>>
>> flink-dist
>> ├── conf
>> ├── lib
>> ...
>> └── plugins
>>     └── s3
>>         └── flink-s3-fs-hadoop.jar
>>
>> I will investigate your error deeply in the next few days but I'd like to
>> have a final confirmation about the folder structure.
>>
>>
>> On Wed, Feb 12, 2020 at 8:56 PM David Magalhães <sp...@gmail.com>
>> wrote:
>>
>>> Hi Robert, I couldn't found any previous mention before the
>>> NoClassDefFoundError.
>>> Here is the full log [1] if you want to look for something more specific.
>>>
>>> [1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0
>>>
>>> On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger <rm...@apache.org>
>>> wrote:
>>>
>>>> According to this answer [1] the first exception "mentioning"
>>>> org/joda/time/format/DateTimeParserBucket should be a different one.
>>>> Can you go through the logs to make sure it is really a
>>>> ClassNotFoundException, and not a ExceptionInInitializerError or something
>>>> else?
>>>>
>>>> [1]https://stackoverflow.com/a/5756989/568695
>>>>
>>>> On Wed, Feb 12, 2020 at 12:36 PM David Magalhães <sp...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Arvid,
>>>>>
>>>>> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said
>>>>> previously, this works normally until an exception is throw inside the
>>>>> sink. It will try to recover again, but sometimes doesn't recover giving
>>>>> this error.
>>>>>
>>>>> To write to S3 I use *AvroParquetWriter* with the following code:
>>>>>
>>>>> val writer = AvroParquetWriter
>>>>>          .builder[GenericRecord](new Path(finalFilePath))
>>>>>
>>>>> *Path* is from *org.apache.hadoop.fs*, the other option is to use* org.apache.flink.formats.parquet.StreamOutputFile
>>>>> *which will use flink S3 plugin, right ? Not sure how can I convert
>>>>> from Path to StreamOuputFile. I know that when I've used StreamingFileSink,
>>>>> I used StreamOuputFile.
>>>>>
>>>>> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <ar...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi David,
>>>>>>
>>>>>> upon closer reviewing your stacktrace, it seems like you are trying
>>>>>> to access S3 without our S3 plugin. That's in general not recommended at
>>>>>> all.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Arvid
>>>>>>
>>>>>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> this seems to be a bug in our s3 plugin. The joda dependency should
>>>>>>> be bundled there.
>>>>>>>
>>>>>>> Are you using s3 as a plugin by any chance? Which flink version are
>>>>>>> you using?
>>>>>>>
>>>>>>> If you are using s3 as a plugin, you could put joda in your plugin
>>>>>>> folder like this
>>>>>>>
>>>>>>> flink-dist
>>>>>>> ├── conf
>>>>>>> ├── lib
>>>>>>> ...
>>>>>>> └── plugins
>>>>>>>     └── s3
>>>>>>>         ├── joda.jar
>>>>>>>         └── flink-s3-fs-hadoop.jar
>>>>>>>
>>>>>>> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into
>>>>>>> that.
>>>>>>>
>>>>>>> Adding joda to your user code will unfortunately not work.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Arvid
>>>>>>>
>>>>>>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <
>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Andrey, thanks for your reply.
>>>>>>>>
>>>>>>>> The class is on the jar created with `*sbt assembly*` that is
>>>>>>>> submitted to Flink to start a Job.
>>>>>>>>
>>>>>>>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep
>>>>>>>> DateTimeParserBucket
>>>>>>>>      1649  05-27-2016 10:24
>>>>>>>> org/joda/time/format/DateTimeParserBucket$SavedField.class
>>>>>>>>      1984  05-27-2016 10:24
>>>>>>>> org/joda/time/format/DateTimeParserBucket$SavedState.class
>>>>>>>>      8651  05-27-2016 10:24
>>>>>>>> org/joda/time/format/DateTimeParserBucket.class
>>>>>>>>
>>>>>>>> Shouldn't this be enough ?
>>>>>>>>
>>>>>>>> I think it uses is when nothing happens, but as soon it have some
>>>>>>>> exceptions, looks like it "forgets" it.
>>>>>>>>
>>>>>>>> Like I said before, this is kind of intermittent.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> David
>>>>>>>>
>>>>>>>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <
>>>>>>>> azagrebin@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi David,
>>>>>>>>>
>>>>>>>>> This looks like a problem with resolution of maven dependencies or
>>>>>>>>> something.
>>>>>>>>> The custom WindowParquetGenericRecordListFileSink probably
>>>>>>>>> transitively depends on org/joda/time/format/DateTimeParserBucket
>>>>>>>>> and it is missing on the runtime classpath of Flink.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Andrey
>>>>>>>>>
>>>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>>>>
>>>>>>>>>> I also want to test that the back pressure and the checkpoints
>>>>>>>>>> are working as intended, and for the first one, I can see the back pressure
>>>>>>>>>> in Flink UI going up, and recover as expected and not reading more data
>>>>>>>>>> from Kafka.
>>>>>>>>>>
>>>>>>>>>> For the checkpoints, and I've added inside the sink invoke
>>>>>>>>>> function a randomly exception (1 in 100, to simulate that a problem has
>>>>>>>>>> happen, and need to recover from the last good checkpoint), but something
>>>>>>>>>> strange happens. I can see the job is being canceled and created again, and
>>>>>>>>>> running fine, other times after a X number of times of being created and
>>>>>>>>>> canceled, it gives a *NoClassDefFoundError*, and it will keep
>>>>>>>>>> giving that forever.
>>>>>>>>>>
>>>>>>>>>> Do you guys have any thoughts?
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>>>> Caught exception while processing timer.
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>>>> ... 7 more
>>>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>>>> at
>>>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>>>> at
>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>>>> at
>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>>>> at
>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>>>> at
>>>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>>>> at
>>>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>>>> at
>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>>>> at
>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>>>> at
>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>>>> at
>>>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>>>> at
>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>>>> at
>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>>>> at
>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>>>> at
>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>>>> ... 7 more
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>>>>
>>>>>>>>>> I also want to test that the back pressure and the checkpoints
>>>>>>>>>> are working as intended, and for the first one, I can see the back pressure
>>>>>>>>>> in Flink UI going up, and recover as expected and not reading more data
>>>>>>>>>> from Kafka.
>>>>>>>>>>
>>>>>>>>>> For the checkpoints, and I've added inside the sink invoke
>>>>>>>>>> function a randomly exception (1 in 100, to simulate that a problem has
>>>>>>>>>> happen, and need to recover from the last good checkpoint), but something
>>>>>>>>>> strange happens. I can see the job is being canceled and created again, and
>>>>>>>>>> running fine, other times after a X number of times of being created and
>>>>>>>>>> canceled, it gives a *NoClassDefFoundError*, and it will keep
>>>>>>>>>> giving that forever.
>>>>>>>>>>
>>>>>>>>>> Do you guys have any thoughts?
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>>>> Caught exception while processing timer.
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>>>> ... 7 more
>>>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>>>> at
>>>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>>>> at
>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>>>> at
>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>>>> at
>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>>>> at
>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>>>> at
>>>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>>>> at
>>>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>>>> at
>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>>>> at
>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>>>> at
>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>>>> at
>>>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>>>> at
>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>>>> at
>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>>>> at
>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>>>> at
>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>>>> ... 7 more
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

Posted by David Magalhães <sp...@gmail.com>.
Hi Arvid, I use a docker image. Here is the Dockerfile:

FROM flink:1.9.1-scala_2.12

RUN mkdir /opt/flink/plugins/flink-s3-fs-hadoop
RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.9.1.jar
/opt/flink/plugins/flink-s3-fs-hadoop/

Please let me know if you need more information.

On Wed, Feb 12, 2020 at 9:15 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi David,
>
> can you double-check the folder structure of your plugin? It should reside
> in its own subfolder. Here is an example.
>
> flink-dist
> ├── conf
> ├── lib
> ...
> └── plugins
>     └── s3
>         └── flink-s3-fs-hadoop.jar
>
> I will investigate your error deeply in the next few days but I'd like to
> have a final confirmation about the folder structure.
>
>
> On Wed, Feb 12, 2020 at 8:56 PM David Magalhães <sp...@gmail.com>
> wrote:
>
>> Hi Robert, I couldn't found any previous mention before the
>> NoClassDefFoundError.
>> Here is the full log [1] if you want to look for something more specific.
>>
>> [1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0
>>
>> On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger <rm...@apache.org>
>> wrote:
>>
>>> According to this answer [1] the first exception "mentioning"
>>> org/joda/time/format/DateTimeParserBucket should be a different one.
>>> Can you go through the logs to make sure it is really a
>>> ClassNotFoundException, and not a ExceptionInInitializerError or something
>>> else?
>>>
>>> [1]https://stackoverflow.com/a/5756989/568695
>>>
>>> On Wed, Feb 12, 2020 at 12:36 PM David Magalhães <sp...@gmail.com>
>>> wrote:
>>>
>>>> Hi Arvid,
>>>>
>>>> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said
>>>> previously, this works normally until an exception is throw inside the
>>>> sink. It will try to recover again, but sometimes doesn't recover giving
>>>> this error.
>>>>
>>>> To write to S3 I use *AvroParquetWriter* with the following code:
>>>>
>>>> val writer = AvroParquetWriter
>>>>          .builder[GenericRecord](new Path(finalFilePath))
>>>>
>>>> *Path* is from *org.apache.hadoop.fs*, the other option is to use* org.apache.flink.formats.parquet.StreamOutputFile
>>>> *which will use flink S3 plugin, right ? Not sure how can I convert
>>>> from Path to StreamOuputFile. I know that when I've used StreamingFileSink,
>>>> I used StreamOuputFile.
>>>>
>>>> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> upon closer reviewing your stacktrace, it seems like you are trying to
>>>>> access S3 without our S3 plugin. That's in general not recommended at all.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi David,
>>>>>>
>>>>>> this seems to be a bug in our s3 plugin. The joda dependency should
>>>>>> be bundled there.
>>>>>>
>>>>>> Are you using s3 as a plugin by any chance? Which flink version are
>>>>>> you using?
>>>>>>
>>>>>> If you are using s3 as a plugin, you could put joda in your plugin
>>>>>> folder like this
>>>>>>
>>>>>> flink-dist
>>>>>> ├── conf
>>>>>> ├── lib
>>>>>> ...
>>>>>> └── plugins
>>>>>>     └── s3
>>>>>>         ├── joda.jar
>>>>>>         └── flink-s3-fs-hadoop.jar
>>>>>>
>>>>>> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into
>>>>>> that.
>>>>>>
>>>>>> Adding joda to your user code will unfortunately not work.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Arvid
>>>>>>
>>>>>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <
>>>>>> speeddragon@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Andrey, thanks for your reply.
>>>>>>>
>>>>>>> The class is on the jar created with `*sbt assembly*` that is
>>>>>>> submitted to Flink to start a Job.
>>>>>>>
>>>>>>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep
>>>>>>> DateTimeParserBucket
>>>>>>>      1649  05-27-2016 10:24
>>>>>>> org/joda/time/format/DateTimeParserBucket$SavedField.class
>>>>>>>      1984  05-27-2016 10:24
>>>>>>> org/joda/time/format/DateTimeParserBucket$SavedState.class
>>>>>>>      8651  05-27-2016 10:24
>>>>>>> org/joda/time/format/DateTimeParserBucket.class
>>>>>>>
>>>>>>> Shouldn't this be enough ?
>>>>>>>
>>>>>>> I think it uses is when nothing happens, but as soon it have some
>>>>>>> exceptions, looks like it "forgets" it.
>>>>>>>
>>>>>>> Like I said before, this is kind of intermittent.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> David
>>>>>>>
>>>>>>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <az...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi David,
>>>>>>>>
>>>>>>>> This looks like a problem with resolution of maven dependencies or
>>>>>>>> something.
>>>>>>>> The custom WindowParquetGenericRecordListFileSink probably
>>>>>>>> transitively depends on org/joda/time/format/DateTimeParserBucket
>>>>>>>> and it is missing on the runtime classpath of Flink.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Andrey
>>>>>>>>
>>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>>>
>>>>>>>>> I also want to test that the back pressure and the checkpoints are
>>>>>>>>> working as intended, and for the first one, I can see the back pressure in
>>>>>>>>> Flink UI going up, and recover as expected and not reading more data from
>>>>>>>>> Kafka.
>>>>>>>>>
>>>>>>>>> For the checkpoints, and I've added inside the sink invoke
>>>>>>>>> function a randomly exception (1 in 100, to simulate that a problem has
>>>>>>>>> happen, and need to recover from the last good checkpoint), but something
>>>>>>>>> strange happens. I can see the job is being canceled and created again, and
>>>>>>>>> running fine, other times after a X number of times of being created and
>>>>>>>>> canceled, it gives a *NoClassDefFoundError*, and it will keep
>>>>>>>>> giving that forever.
>>>>>>>>>
>>>>>>>>> Do you guys have any thoughts?
>>>>>>>>>
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>>> Caught exception while processing timer.
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>>> ... 7 more
>>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>>> at
>>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>>> at
>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>>> at
>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>>> at
>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>>> at
>>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>>> at
>>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>>> at
>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>>> at
>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>>> at
>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>>> at
>>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>>> at
>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>>> at
>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>>> at
>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>>> at
>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>>> ... 7 more
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>>>
>>>>>>>>> I also want to test that the back pressure and the checkpoints are
>>>>>>>>> working as intended, and for the first one, I can see the back pressure in
>>>>>>>>> Flink UI going up, and recover as expected and not reading more data from
>>>>>>>>> Kafka.
>>>>>>>>>
>>>>>>>>> For the checkpoints, and I've added inside the sink invoke
>>>>>>>>> function a randomly exception (1 in 100, to simulate that a problem has
>>>>>>>>> happen, and need to recover from the last good checkpoint), but something
>>>>>>>>> strange happens. I can see the job is being canceled and created again, and
>>>>>>>>> running fine, other times after a X number of times of being created and
>>>>>>>>> canceled, it gives a *NoClassDefFoundError*, and it will keep
>>>>>>>>> giving that forever.
>>>>>>>>>
>>>>>>>>> Do you guys have any thoughts?
>>>>>>>>>
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>>> Caught exception while processing timer.
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>>> ... 7 more
>>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>>> at
>>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>>> at
>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>>> at
>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>>> at
>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>>> at
>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>>> at
>>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>>> at
>>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>>> at
>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>>> at
>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>>> at
>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>>> at
>>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>>> at
>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>>> at
>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>>> at
>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>>> at
>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>>> ... 7 more
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

Posted by Arvid Heise <ar...@ververica.com>.
Hi David,

can you double-check the folder structure of your plugin? It should reside
in its own subfolder. Here is an example.

flink-dist
├── conf
├── lib
...
└── plugins
    └── s3
        └── flink-s3-fs-hadoop.jar

I will investigate your error deeply in the next few days but I'd like to
have a final confirmation about the folder structure.


On Wed, Feb 12, 2020 at 8:56 PM David Magalhães <sp...@gmail.com>
wrote:

> Hi Robert, I couldn't found any previous mention before the
> NoClassDefFoundError.
> Here is the full log [1] if you want to look for something more specific.
>
> [1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0
>
> On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger <rm...@apache.org>
> wrote:
>
>> According to this answer [1] the first exception "mentioning"
>> org/joda/time/format/DateTimeParserBucket should be a different one. Can
>> you go through the logs to make sure it is really a ClassNotFoundException,
>> and not a ExceptionInInitializerError or something else?
>>
>> [1]https://stackoverflow.com/a/5756989/568695
>>
>> On Wed, Feb 12, 2020 at 12:36 PM David Magalhães <sp...@gmail.com>
>> wrote:
>>
>>> Hi Arvid,
>>>
>>> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said
>>> previously, this works normally until an exception is throw inside the
>>> sink. It will try to recover again, but sometimes doesn't recover giving
>>> this error.
>>>
>>> To write to S3 I use *AvroParquetWriter* with the following code:
>>>
>>> val writer = AvroParquetWriter
>>>          .builder[GenericRecord](new Path(finalFilePath))
>>>
>>> *Path* is from *org.apache.hadoop.fs*, the other option is to use* org.apache.flink.formats.parquet.StreamOutputFile
>>> *which will use flink S3 plugin, right ? Not sure how can I convert
>>> from Path to StreamOuputFile. I know that when I've used StreamingFileSink,
>>> I used StreamOuputFile.
>>>
>>> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <ar...@ververica.com>
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> upon closer reviewing your stacktrace, it seems like you are trying to
>>>> access S3 without our S3 plugin. That's in general not recommended at all.
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> this seems to be a bug in our s3 plugin. The joda dependency should be
>>>>> bundled there.
>>>>>
>>>>> Are you using s3 as a plugin by any chance? Which flink version are
>>>>> you using?
>>>>>
>>>>> If you are using s3 as a plugin, you could put joda in your plugin
>>>>> folder like this
>>>>>
>>>>> flink-dist
>>>>> ├── conf
>>>>> ├── lib
>>>>> ...
>>>>> └── plugins
>>>>>     └── s3
>>>>>         ├── joda.jar
>>>>>         └── flink-s3-fs-hadoop.jar
>>>>>
>>>>> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into
>>>>> that.
>>>>>
>>>>> Adding joda to your user code will unfortunately not work.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <sp...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Andrey, thanks for your reply.
>>>>>>
>>>>>> The class is on the jar created with `*sbt assembly*` that is
>>>>>> submitted to Flink to start a Job.
>>>>>>
>>>>>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep
>>>>>> DateTimeParserBucket
>>>>>>      1649  05-27-2016 10:24
>>>>>> org/joda/time/format/DateTimeParserBucket$SavedField.class
>>>>>>      1984  05-27-2016 10:24
>>>>>> org/joda/time/format/DateTimeParserBucket$SavedState.class
>>>>>>      8651  05-27-2016 10:24
>>>>>> org/joda/time/format/DateTimeParserBucket.class
>>>>>>
>>>>>> Shouldn't this be enough ?
>>>>>>
>>>>>> I think it uses is when nothing happens, but as soon it have some
>>>>>> exceptions, looks like it "forgets" it.
>>>>>>
>>>>>> Like I said before, this is kind of intermittent.
>>>>>>
>>>>>> Thanks,
>>>>>> David
>>>>>>
>>>>>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <az...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> This looks like a problem with resolution of maven dependencies or
>>>>>>> something.
>>>>>>> The custom WindowParquetGenericRecordListFileSink probably
>>>>>>> transitively depends on org/joda/time/format/DateTimeParserBucket
>>>>>>> and it is missing on the runtime classpath of Flink.
>>>>>>>
>>>>>>> Best,
>>>>>>> Andrey
>>>>>>>
>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>
>>>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>>
>>>>>>>> I also want to test that the back pressure and the checkpoints are
>>>>>>>> working as intended, and for the first one, I can see the back pressure in
>>>>>>>> Flink UI going up, and recover as expected and not reading more data from
>>>>>>>> Kafka.
>>>>>>>>
>>>>>>>> For the checkpoints, and I've added inside the sink invoke function
>>>>>>>> a randomly exception (1 in 100, to simulate that a problem has happen, and
>>>>>>>> need to recover from the last good checkpoint), but something strange
>>>>>>>> happens. I can see the job is being canceled and created again, and running
>>>>>>>> fine, other times after a X number of times of being created and canceled,
>>>>>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>>>>>> forever.
>>>>>>>>
>>>>>>>> Do you guys have any thoughts?
>>>>>>>>
>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>> Caught exception while processing timer.
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>> at
>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>> ... 7 more
>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>> at
>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>> at
>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>> ... 7 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>
>>>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>>
>>>>>>>> I also want to test that the back pressure and the checkpoints are
>>>>>>>> working as intended, and for the first one, I can see the back pressure in
>>>>>>>> Flink UI going up, and recover as expected and not reading more data from
>>>>>>>> Kafka.
>>>>>>>>
>>>>>>>> For the checkpoints, and I've added inside the sink invoke function
>>>>>>>> a randomly exception (1 in 100, to simulate that a problem has happen, and
>>>>>>>> need to recover from the last good checkpoint), but something strange
>>>>>>>> happens. I can see the job is being canceled and created again, and running
>>>>>>>> fine, other times after a X number of times of being created and canceled,
>>>>>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>>>>>> forever.
>>>>>>>>
>>>>>>>> Do you guys have any thoughts?
>>>>>>>>
>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>> Caught exception while processing timer.
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>> at
>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>> ... 7 more
>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>> at
>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>> at
>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>> at
>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>> at
>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>> at
>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>> at
>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>> at
>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>> ... 7 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

Posted by David Magalhães <sp...@gmail.com>.
Hi Robert, I couldn't found any previous mention before the
NoClassDefFoundError.
Here is the full log [1] if you want to look for something more specific.

[1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0

On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger <rm...@apache.org> wrote:

> According to this answer [1] the first exception "mentioning"
> org/joda/time/format/DateTimeParserBucket should be a different one. Can
> you go through the logs to make sure it is really a ClassNotFoundException,
> and not a ExceptionInInitializerError or something else?
>
> [1]https://stackoverflow.com/a/5756989/568695
>
> On Wed, Feb 12, 2020 at 12:36 PM David Magalhães <sp...@gmail.com>
> wrote:
>
>> Hi Arvid,
>>
>> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said
>> previously, this works normally until an exception is throw inside the
>> sink. It will try to recover again, but sometimes doesn't recover giving
>> this error.
>>
>> To write to S3 I use *AvroParquetWriter* with the following code:
>>
>> val writer = AvroParquetWriter
>>          .builder[GenericRecord](new Path(finalFilePath))
>>
>> *Path* is from *org.apache.hadoop.fs*, the other option is to use* org.apache.flink.formats.parquet.StreamOutputFile
>> *which will use flink S3 plugin, right ? Not sure how can I convert from
>> Path to StreamOuputFile. I know that when I've used StreamingFileSink, I
>> used StreamOuputFile.
>>
>> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi David,
>>>
>>> upon closer reviewing your stacktrace, it seems like you are trying to
>>> access S3 without our S3 plugin. That's in general not recommended at all.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com>
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> this seems to be a bug in our s3 plugin. The joda dependency should be
>>>> bundled there.
>>>>
>>>> Are you using s3 as a plugin by any chance? Which flink version are you
>>>> using?
>>>>
>>>> If you are using s3 as a plugin, you could put joda in your plugin
>>>> folder like this
>>>>
>>>> flink-dist
>>>> ├── conf
>>>> ├── lib
>>>> ...
>>>> └── plugins
>>>>     └── s3
>>>>         ├── joda.jar
>>>>         └── flink-s3-fs-hadoop.jar
>>>>
>>>> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into
>>>> that.
>>>>
>>>> Adding joda to your user code will unfortunately not work.
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <sp...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Andrey, thanks for your reply.
>>>>>
>>>>> The class is on the jar created with `*sbt assembly*` that is
>>>>> submitted to Flink to start a Job.
>>>>>
>>>>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep
>>>>> DateTimeParserBucket
>>>>>      1649  05-27-2016 10:24
>>>>> org/joda/time/format/DateTimeParserBucket$SavedField.class
>>>>>      1984  05-27-2016 10:24
>>>>> org/joda/time/format/DateTimeParserBucket$SavedState.class
>>>>>      8651  05-27-2016 10:24
>>>>> org/joda/time/format/DateTimeParserBucket.class
>>>>>
>>>>> Shouldn't this be enough ?
>>>>>
>>>>> I think it uses is when nothing happens, but as soon it have some
>>>>> exceptions, looks like it "forgets" it.
>>>>>
>>>>> Like I said before, this is kind of intermittent.
>>>>>
>>>>> Thanks,
>>>>> David
>>>>>
>>>>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <az...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi David,
>>>>>>
>>>>>> This looks like a problem with resolution of maven dependencies or
>>>>>> something.
>>>>>> The custom WindowParquetGenericRecordListFileSink probably
>>>>>> transitively depends on org/joda/time/format/DateTimeParserBucket
>>>>>> and it is missing on the runtime classpath of Flink.
>>>>>>
>>>>>> Best,
>>>>>> Andrey
>>>>>>
>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>
>>>>>>> I also want to test that the back pressure and the checkpoints are
>>>>>>> working as intended, and for the first one, I can see the back pressure in
>>>>>>> Flink UI going up, and recover as expected and not reading more data from
>>>>>>> Kafka.
>>>>>>>
>>>>>>> For the checkpoints, and I've added inside the sink invoke function
>>>>>>> a randomly exception (1 in 100, to simulate that a problem has happen, and
>>>>>>> need to recover from the last good checkpoint), but something strange
>>>>>>> happens. I can see the job is being canceled and created again, and running
>>>>>>> fine, other times after a X number of times of being created and canceled,
>>>>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>>>>> forever.
>>>>>>>
>>>>>>> Do you guys have any thoughts?
>>>>>>>
>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>> Caught exception while processing timer.
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>> at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>> ... 7 more
>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>> at
>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>> at
>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>> at
>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>> at
>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>> at
>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>> at
>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>> at
>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>> at
>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>> at
>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>> at
>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>> at
>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>> at
>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>> at
>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>> at
>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>> at
>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>> at
>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>> at
>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>> at
>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>> ... 7 more
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>>
>>>>>>> I also want to test that the back pressure and the checkpoints are
>>>>>>> working as intended, and for the first one, I can see the back pressure in
>>>>>>> Flink UI going up, and recover as expected and not reading more data from
>>>>>>> Kafka.
>>>>>>>
>>>>>>> For the checkpoints, and I've added inside the sink invoke function
>>>>>>> a randomly exception (1 in 100, to simulate that a problem has happen, and
>>>>>>> need to recover from the last good checkpoint), but something strange
>>>>>>> happens. I can see the job is being canceled and created again, and running
>>>>>>> fine, other times after a X number of times of being created and canceled,
>>>>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>>>>> forever.
>>>>>>>
>>>>>>> Do you guys have any thoughts?
>>>>>>>
>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>> Caught exception while processing timer.
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>> at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>> ... 7 more
>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>> at
>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>> at
>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>> at
>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>> at
>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>> at
>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>> at
>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>> at
>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>> at
>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>> at
>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>> at
>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>> at
>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>> at
>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>> at
>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>> at
>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>> at
>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>> at
>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>> at
>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>> at
>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>> at
>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>> ... 7 more
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

Posted by Robert Metzger <rm...@apache.org>.
According to this answer [1] the first exception "mentioning"
org/joda/time/format/DateTimeParserBucket should be a different one. Can
you go through the logs to make sure it is really a ClassNotFoundException,
and not a ExceptionInInitializerError or something else?

[1]https://stackoverflow.com/a/5756989/568695

On Wed, Feb 12, 2020 at 12:36 PM David Magalhães <sp...@gmail.com>
wrote:

> Hi Arvid,
>
> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said
> previously, this works normally until an exception is throw inside the
> sink. It will try to recover again, but sometimes doesn't recover giving
> this error.
>
> To write to S3 I use *AvroParquetWriter* with the following code:
>
> val writer = AvroParquetWriter
>          .builder[GenericRecord](new Path(finalFilePath))
>
> *Path* is from *org.apache.hadoop.fs*, the other option is to use* org.apache.flink.formats.parquet.StreamOutputFile
> *which will use flink S3 plugin, right ? Not sure how can I convert from
> Path to StreamOuputFile. I know that when I've used StreamingFileSink, I
> used StreamOuputFile.
>
> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi David,
>>
>> upon closer reviewing your stacktrace, it seems like you are trying to
>> access S3 without our S3 plugin. That's in general not recommended at all.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi David,
>>>
>>> this seems to be a bug in our s3 plugin. The joda dependency should be
>>> bundled there.
>>>
>>> Are you using s3 as a plugin by any chance? Which flink version are you
>>> using?
>>>
>>> If you are using s3 as a plugin, you could put joda in your plugin
>>> folder like this
>>>
>>> flink-dist
>>> ├── conf
>>> ├── lib
>>> ...
>>> └── plugins
>>>     └── s3
>>>         ├── joda.jar
>>>         └── flink-s3-fs-hadoop.jar
>>>
>>> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into that.
>>>
>>> Adding joda to your user code will unfortunately not work.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <sp...@gmail.com>
>>> wrote:
>>>
>>>> Hi Andrey, thanks for your reply.
>>>>
>>>> The class is on the jar created with `*sbt assembly*` that is
>>>> submitted to Flink to start a Job.
>>>>
>>>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep DateTimeParserBucket
>>>>      1649  05-27-2016 10:24
>>>> org/joda/time/format/DateTimeParserBucket$SavedField.class
>>>>      1984  05-27-2016 10:24
>>>> org/joda/time/format/DateTimeParserBucket$SavedState.class
>>>>      8651  05-27-2016 10:24
>>>> org/joda/time/format/DateTimeParserBucket.class
>>>>
>>>> Shouldn't this be enough ?
>>>>
>>>> I think it uses is when nothing happens, but as soon it have some
>>>> exceptions, looks like it "forgets" it.
>>>>
>>>> Like I said before, this is kind of intermittent.
>>>>
>>>> Thanks,
>>>> David
>>>>
>>>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <az...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> This looks like a problem with resolution of maven dependencies or
>>>>> something.
>>>>> The custom WindowParquetGenericRecordListFileSink probably
>>>>> transitively depends on org/joda/time/format/DateTimeParserBucket
>>>>> and it is missing on the runtime classpath of Flink.
>>>>>
>>>>> Best,
>>>>> Andrey
>>>>>
>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>
>>>>>> I also want to test that the back pressure and the checkpoints are
>>>>>> working as intended, and for the first one, I can see the back pressure in
>>>>>> Flink UI going up, and recover as expected and not reading more data from
>>>>>> Kafka.
>>>>>>
>>>>>> For the checkpoints, and I've added inside the sink invoke function a
>>>>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>>>>> need to recover from the last good checkpoint), but something strange
>>>>>> happens. I can see the job is being canceled and created again, and running
>>>>>> fine, other times after a X number of times of being created and canceled,
>>>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>>>> forever.
>>>>>>
>>>>>> Do you guys have any thoughts?
>>>>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>> Caught exception while processing timer.
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>> at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>> at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>> at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>> ... 7 more
>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>> at
>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>> at
>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>> at
>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>> at
>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>> at
>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>> at
>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>> at
>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>> at
>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>> at
>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>> at
>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>> at
>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>> at
>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>> at
>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>> at
>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>> at
>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>> at
>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>> at
>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>> at
>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>> at
>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>> at
>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>> at
>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>> at
>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>> at
>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>> at
>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>> at
>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>> ... 7 more
>>>>>>
>>>>>>
>>>>>>
>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'm implementing an exponential backoff inside a custom sink that
>>>>>> uses an AvroParquetWriter to write to S3. I've change the number of
>>>>>> attempts to 0 inside the core-site.xml, and I'm capturing the timeout
>>>>>> exception, doing a Thread.sleep for X seconds. This is working as intended,
>>>>>> and when S3 is offline, it waits until it is online.
>>>>>>
>>>>>> I also want to test that the back pressure and the checkpoints are
>>>>>> working as intended, and for the first one, I can see the back pressure in
>>>>>> Flink UI going up, and recover as expected and not reading more data from
>>>>>> Kafka.
>>>>>>
>>>>>> For the checkpoints, and I've added inside the sink invoke function a
>>>>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>>>>> need to recover from the last good checkpoint), but something strange
>>>>>> happens. I can see the job is being canceled and created again, and running
>>>>>> fine, other times after a X number of times of being created and canceled,
>>>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>>>> forever.
>>>>>>
>>>>>> Do you guys have any thoughts?
>>>>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>> Caught exception while processing timer.
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>> at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>> at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>> at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>> ... 7 more
>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>> at
>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>> at
>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>> at
>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>> at
>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>> at
>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>> at
>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>> at
>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>> at
>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>> at
>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>> at
>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>> at
>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>> at
>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>> at
>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>> at
>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>> at
>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>> at
>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>> at
>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>> at
>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>> at
>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>> at
>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>> at
>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>> at
>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>> at
>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>> at
>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>> at
>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>> ... 7 more
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

Posted by David Magalhães <sp...@gmail.com>.
Hi Arvid,

I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said
previously, this works normally until an exception is throw inside the
sink. It will try to recover again, but sometimes doesn't recover giving
this error.

To write to S3 I use *AvroParquetWriter* with the following code:

val writer = AvroParquetWriter
         .builder[GenericRecord](new Path(finalFilePath))

*Path* is from *org.apache.hadoop.fs*, the other option is to use*
org.apache.flink.formats.parquet.StreamOutputFile
*which will use flink S3 plugin, right ? Not sure how can I convert from
Path to StreamOuputFile. I know that when I've used StreamingFileSink, I
used StreamOuputFile.

On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi David,
>
> upon closer reviewing your stacktrace, it seems like you are trying to
> access S3 without our S3 plugin. That's in general not recommended at all.
>
> Best,
>
> Arvid
>
> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi David,
>>
>> this seems to be a bug in our s3 plugin. The joda dependency should be
>> bundled there.
>>
>> Are you using s3 as a plugin by any chance? Which flink version are you
>> using?
>>
>> If you are using s3 as a plugin, you could put joda in your plugin folder
>> like this
>>
>> flink-dist
>> ├── conf
>> ├── lib
>> ...
>> └── plugins
>>     └── s3
>>         ├── joda.jar
>>         └── flink-s3-fs-hadoop.jar
>>
>> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into that.
>>
>> Adding joda to your user code will unfortunately not work.
>>
>> Best,
>>
>> Arvid
>>
>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <sp...@gmail.com>
>> wrote:
>>
>>> Hi Andrey, thanks for your reply.
>>>
>>> The class is on the jar created with `*sbt assembly*` that is
>>> submitted to Flink to start a Job.
>>>
>>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep DateTimeParserBucket
>>>      1649  05-27-2016 10:24
>>> org/joda/time/format/DateTimeParserBucket$SavedField.class
>>>      1984  05-27-2016 10:24
>>> org/joda/time/format/DateTimeParserBucket$SavedState.class
>>>      8651  05-27-2016 10:24
>>> org/joda/time/format/DateTimeParserBucket.class
>>>
>>> Shouldn't this be enough ?
>>>
>>> I think it uses is when nothing happens, but as soon it have some
>>> exceptions, looks like it "forgets" it.
>>>
>>> Like I said before, this is kind of intermittent.
>>>
>>> Thanks,
>>> David
>>>
>>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <az...@apache.org>
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> This looks like a problem with resolution of maven dependencies or
>>>> something.
>>>> The custom WindowParquetGenericRecordListFileSink probably
>>>> transitively depends on org/joda/time/format/DateTimeParserBucket
>>>> and it is missing on the runtime classpath of Flink.
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm implementing an exponential backoff inside a custom sink that uses
>>>>> an AvroParquetWriter to write to S3. I've change the number of attempts to
>>>>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing
>>>>> a Thread.sleep for X seconds. This is working as intended, and when S3 is
>>>>> offline, it waits until it is online.
>>>>>
>>>>> I also want to test that the back pressure and the checkpoints are
>>>>> working as intended, and for the first one, I can see the back pressure in
>>>>> Flink UI going up, and recover as expected and not reading more data from
>>>>> Kafka.
>>>>>
>>>>> For the checkpoints, and I've added inside the sink invoke function a
>>>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>>>> need to recover from the last good checkpoint), but something strange
>>>>> happens. I can see the job is being canceled and created again, and running
>>>>> fine, other times after a X number of times of being created and canceled,
>>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>>> forever.
>>>>>
>>>>> Do you guys have any thoughts?
>>>>>
>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>>>> exception while processing timer.
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>> at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>> ... 7 more
>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>> org/joda/time/format/DateTimeParserBucket
>>>>> at
>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>> at
>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>> at
>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>> at
>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>> at
>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>> at
>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>> at
>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>> at
>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>> at
>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>> at
>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>> at
>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>> at
>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>> at
>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>> at
>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>> at
>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>> at
>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>> at
>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>> at
>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>> at
>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>> at
>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>> at
>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>> at
>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>> ... 7 more
>>>>>
>>>>>
>>>>>
>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm implementing an exponential backoff inside a custom sink that uses
>>>>> an AvroParquetWriter to write to S3. I've change the number of attempts to
>>>>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing
>>>>> a Thread.sleep for X seconds. This is working as intended, and when S3 is
>>>>> offline, it waits until it is online.
>>>>>
>>>>> I also want to test that the back pressure and the checkpoints are
>>>>> working as intended, and for the first one, I can see the back pressure in
>>>>> Flink UI going up, and recover as expected and not reading more data from
>>>>> Kafka.
>>>>>
>>>>> For the checkpoints, and I've added inside the sink invoke function a
>>>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>>>> need to recover from the last good checkpoint), but something strange
>>>>> happens. I can see the job is being canceled and created again, and running
>>>>> fine, other times after a X number of times of being created and canceled,
>>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>>> forever.
>>>>>
>>>>> Do you guys have any thoughts?
>>>>>
>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>>>> exception while processing timer.
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>> at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>> ... 7 more
>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>> org/joda/time/format/DateTimeParserBucket
>>>>> at
>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>> at
>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>> at
>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>> at
>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>> at
>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>> at
>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>> at
>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>> at
>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>> at
>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>> at
>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>> at
>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>> at
>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>> at
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>> at
>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>> at
>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>> at
>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>> at
>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>> at
>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>> at
>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>> at
>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>> at
>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>> at
>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>> at
>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>> ... 7 more
>>>>>
>>>>>
>>>>>
>>>>>

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

Posted by Arvid Heise <ar...@ververica.com>.
Hi David,

upon closer reviewing your stacktrace, it seems like you are trying to
access S3 without our S3 plugin. That's in general not recommended at all.

Best,

Arvid

On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi David,
>
> this seems to be a bug in our s3 plugin. The joda dependency should be
> bundled there.
>
> Are you using s3 as a plugin by any chance? Which flink version are you
> using?
>
> If you are using s3 as a plugin, you could put joda in your plugin folder
> like this
>
> flink-dist
> ├── conf
> ├── lib
> ...
> └── plugins
>     └── s3
>         ├── joda.jar
>         └── flink-s3-fs-hadoop.jar
>
> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into that.
>
> Adding joda to your user code will unfortunately not work.
>
> Best,
>
> Arvid
>
> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <sp...@gmail.com>
> wrote:
>
>> Hi Andrey, thanks for your reply.
>>
>> The class is on the jar created with `*sbt assembly*` that is
>> submitted to Flink to start a Job.
>>
>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep DateTimeParserBucket
>>      1649  05-27-2016 10:24
>> org/joda/time/format/DateTimeParserBucket$SavedField.class
>>      1984  05-27-2016 10:24
>> org/joda/time/format/DateTimeParserBucket$SavedState.class
>>      8651  05-27-2016 10:24
>> org/joda/time/format/DateTimeParserBucket.class
>>
>> Shouldn't this be enough ?
>>
>> I think it uses is when nothing happens, but as soon it have some
>> exceptions, looks like it "forgets" it.
>>
>> Like I said before, this is kind of intermittent.
>>
>> Thanks,
>> David
>>
>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <az...@apache.org>
>> wrote:
>>
>>> Hi David,
>>>
>>> This looks like a problem with resolution of maven dependencies or
>>> something.
>>> The custom WindowParquetGenericRecordListFileSink probably transitively
>>> depends on org/joda/time/format/DateTimeParserBucket
>>> and it is missing on the runtime classpath of Flink.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
>>> wrote:
>>>
>>>> I'm implementing an exponential backoff inside a custom sink that uses
>>>> an AvroParquetWriter to write to S3. I've change the number of attempts to
>>>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing
>>>> a Thread.sleep for X seconds. This is working as intended, and when S3 is
>>>> offline, it waits until it is online.
>>>>
>>>> I also want to test that the back pressure and the checkpoints are
>>>> working as intended, and for the first one, I can see the back pressure in
>>>> Flink UI going up, and recover as expected and not reading more data from
>>>> Kafka.
>>>>
>>>> For the checkpoints, and I've added inside the sink invoke function a
>>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>>> need to recover from the last good checkpoint), but something strange
>>>> happens. I can see the job is being canceled and created again, and running
>>>> fine, other times after a X number of times of being created and canceled,
>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>> forever.
>>>>
>>>> Do you guys have any thoughts?
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>>> exception while processing timer.
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>> org/joda/time/format/DateTimeParserBucket}
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>> ... 7 more
>>>> Caused by: java.lang.NoClassDefFoundError:
>>>> org/joda/time/format/DateTimeParserBucket
>>>> at
>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>> at
>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>> at
>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>> at
>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>> at
>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>> at
>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>> at
>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>> at scala.util.Try$.apply(Try.scala:209)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>> at
>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>> at
>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>> at
>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>> at
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>> ... 7 more
>>>>
>>>>
>>>>
>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
>>> wrote:
>>>
>>>> I'm implementing an exponential backoff inside a custom sink that uses
>>>> an AvroParquetWriter to write to S3. I've change the number of attempts to
>>>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing
>>>> a Thread.sleep for X seconds. This is working as intended, and when S3 is
>>>> offline, it waits until it is online.
>>>>
>>>> I also want to test that the back pressure and the checkpoints are
>>>> working as intended, and for the first one, I can see the back pressure in
>>>> Flink UI going up, and recover as expected and not reading more data from
>>>> Kafka.
>>>>
>>>> For the checkpoints, and I've added inside the sink invoke function a
>>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>>> need to recover from the last good checkpoint), but something strange
>>>> happens. I can see the job is being canceled and created again, and running
>>>> fine, other times after a X number of times of being created and canceled,
>>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>>> forever.
>>>>
>>>> Do you guys have any thoughts?
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>>> exception while processing timer.
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>> org/joda/time/format/DateTimeParserBucket}
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>> ... 7 more
>>>> Caused by: java.lang.NoClassDefFoundError:
>>>> org/joda/time/format/DateTimeParserBucket
>>>> at
>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>> at
>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>> at
>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>> at
>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>> at
>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>> at
>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>> at
>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>> at
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>> at
>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>> at
>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>> at
>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>> at scala.util.Try$.apply(Try.scala:209)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>> at
>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>> at
>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>> at
>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>> at
>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>> at
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>> at
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>> ... 7 more
>>>>
>>>>
>>>>
>>>>

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

Posted by Arvid Heise <ar...@ververica.com>.
Hi David,

this seems to be a bug in our s3 plugin. The joda dependency should be
bundled there.

Are you using s3 as a plugin by any chance? Which flink version are you
using?

If you are using s3 as a plugin, you could put joda in your plugin folder
like this

flink-dist
├── conf
├── lib
...
└── plugins
    └── s3
        ├── joda.jar
        └── flink-s3-fs-hadoop.jar

If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into that.

Adding joda to your user code will unfortunately not work.

Best,

Arvid

On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <sp...@gmail.com>
wrote:

> Hi Andrey, thanks for your reply.
>
> The class is on the jar created with `*sbt assembly*` that is
> submitted to Flink to start a Job.
>
> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep DateTimeParserBucket
>      1649  05-27-2016 10:24
> org/joda/time/format/DateTimeParserBucket$SavedField.class
>      1984  05-27-2016 10:24
> org/joda/time/format/DateTimeParserBucket$SavedState.class
>      8651  05-27-2016 10:24
> org/joda/time/format/DateTimeParserBucket.class
>
> Shouldn't this be enough ?
>
> I think it uses is when nothing happens, but as soon it have some
> exceptions, looks like it "forgets" it.
>
> Like I said before, this is kind of intermittent.
>
> Thanks,
> David
>
> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <az...@apache.org>
> wrote:
>
>> Hi David,
>>
>> This looks like a problem with resolution of maven dependencies or
>> something.
>> The custom WindowParquetGenericRecordListFileSink probably transitively
>> depends on org/joda/time/format/DateTimeParserBucket
>> and it is missing on the runtime classpath of Flink.
>>
>> Best,
>> Andrey
>>
>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
>> wrote:
>>
>>> I'm implementing an exponential backoff inside a custom sink that uses
>>> an AvroParquetWriter to write to S3. I've change the number of attempts to
>>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing
>>> a Thread.sleep for X seconds. This is working as intended, and when S3 is
>>> offline, it waits until it is online.
>>>
>>> I also want to test that the back pressure and the checkpoints are
>>> working as intended, and for the first one, I can see the back pressure in
>>> Flink UI going up, and recover as expected and not reading more data from
>>> Kafka.
>>>
>>> For the checkpoints, and I've added inside the sink invoke function a
>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>> need to recover from the last good checkpoint), but something strange
>>> happens. I can see the job is being canceled and created again, and running
>>> fine, other times after a X number of times of being created and canceled,
>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>> forever.
>>>
>>> Do you guys have any thoughts?
>>>
>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>> exception while processing timer.
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>> org/joda/time/format/DateTimeParserBucket}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>> ... 7 more
>>> Caused by: java.lang.NoClassDefFoundError:
>>> org/joda/time/format/DateTimeParserBucket
>>> at
>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>> at
>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>> at
>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>> at
>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>> at
>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>> at
>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>> at
>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>> at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>> at
>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>> at scala.util.Try$.apply(Try.scala:209)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>> at
>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>> ... 7 more
>>>
>>>
>>>
>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
>> wrote:
>>
>>> I'm implementing an exponential backoff inside a custom sink that uses
>>> an AvroParquetWriter to write to S3. I've change the number of attempts to
>>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing
>>> a Thread.sleep for X seconds. This is working as intended, and when S3 is
>>> offline, it waits until it is online.
>>>
>>> I also want to test that the back pressure and the checkpoints are
>>> working as intended, and for the first one, I can see the back pressure in
>>> Flink UI going up, and recover as expected and not reading more data from
>>> Kafka.
>>>
>>> For the checkpoints, and I've added inside the sink invoke function a
>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>> need to recover from the last good checkpoint), but something strange
>>> happens. I can see the job is being canceled and created again, and running
>>> fine, other times after a X number of times of being created and canceled,
>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>> forever.
>>>
>>> Do you guys have any thoughts?
>>>
>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>> exception while processing timer.
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>> org/joda/time/format/DateTimeParserBucket}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>> ... 7 more
>>> Caused by: java.lang.NoClassDefFoundError:
>>> org/joda/time/format/DateTimeParserBucket
>>> at
>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>> at
>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>> at
>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>> at
>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>> at
>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>> at
>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>> at
>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>> at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>> at
>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>> at scala.util.Try$.apply(Try.scala:209)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>> at
>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>> ... 7 more
>>>
>>>
>>>
>>>

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

Posted by David Magalhães <sp...@gmail.com>.
Hi Andrey, thanks for your reply.

The class is on the jar created with `*sbt assembly*` that is submitted to
Flink to start a Job.

unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep DateTimeParserBucket
     1649  05-27-2016 10:24
org/joda/time/format/DateTimeParserBucket$SavedField.class
     1984  05-27-2016 10:24
org/joda/time/format/DateTimeParserBucket$SavedState.class
     8651  05-27-2016 10:24
org/joda/time/format/DateTimeParserBucket.class

Shouldn't this be enough ?

I think it uses is when nothing happens, but as soon it have some
exceptions, looks like it "forgets" it.

Like I said before, this is kind of intermittent.

Thanks,
David

On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <az...@apache.org> wrote:

> Hi David,
>
> This looks like a problem with resolution of maven dependencies or
> something.
> The custom WindowParquetGenericRecordListFileSink probably transitively
> depends on org/joda/time/format/DateTimeParserBucket
> and it is missing on the runtime classpath of Flink.
>
> Best,
> Andrey
>
> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
> wrote:
>
>> I'm implementing an exponential backoff inside a custom sink that uses an
>> AvroParquetWriter to write to S3. I've change the number of attempts to 0
>> inside the core-site.xml, and I'm capturing the timeout exception, doing a
>> Thread.sleep for X seconds. This is working as intended, and when S3 is
>> offline, it waits until it is online.
>>
>> I also want to test that the back pressure and the checkpoints are
>> working as intended, and for the first one, I can see the back pressure in
>> Flink UI going up, and recover as expected and not reading more data from
>> Kafka.
>>
>> For the checkpoints, and I've added inside the sink invoke function a
>> randomly exception (1 in 100, to simulate that a problem has happen, and
>> need to recover from the last good checkpoint), but something strange
>> happens. I can see the job is being canceled and created again, and running
>> fine, other times after a X number of times of being created and canceled,
>> it gives a *NoClassDefFoundError*, and it will keep giving that
>> forever.
>>
>> Do you guys have any thoughts?
>>
>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>> exception while processing timer.
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>> org/joda/time/format/DateTimeParserBucket}
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>> ... 7 more
>> Caused by: java.lang.NoClassDefFoundError:
>> org/joda/time/format/DateTimeParserBucket
>> at
>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>> at
>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>> at
>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>> at
>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>> at
>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>> at
>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>> at
>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>> at
>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>> at
>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>> at
>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>> at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>> at
>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>> at
>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>> at scala.util.Try$.apply(Try.scala:209)
>> at
>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>> at
>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> at
>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>> at
>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>> at
>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>> ... 7 more
>>
>>
>>
> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
> wrote:
>
>> I'm implementing an exponential backoff inside a custom sink that uses an
>> AvroParquetWriter to write to S3. I've change the number of attempts to 0
>> inside the core-site.xml, and I'm capturing the timeout exception, doing a
>> Thread.sleep for X seconds. This is working as intended, and when S3 is
>> offline, it waits until it is online.
>>
>> I also want to test that the back pressure and the checkpoints are
>> working as intended, and for the first one, I can see the back pressure in
>> Flink UI going up, and recover as expected and not reading more data from
>> Kafka.
>>
>> For the checkpoints, and I've added inside the sink invoke function a
>> randomly exception (1 in 100, to simulate that a problem has happen, and
>> need to recover from the last good checkpoint), but something strange
>> happens. I can see the job is being canceled and created again, and running
>> fine, other times after a X number of times of being created and canceled,
>> it gives a *NoClassDefFoundError*, and it will keep giving that
>> forever.
>>
>> Do you guys have any thoughts?
>>
>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>> exception while processing timer.
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>> org/joda/time/format/DateTimeParserBucket}
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>> ... 7 more
>> Caused by: java.lang.NoClassDefFoundError:
>> org/joda/time/format/DateTimeParserBucket
>> at
>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>> at
>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>> at
>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>> at
>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>> at
>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>> at
>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>> at
>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>> at
>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>> at
>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>> at
>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>> at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>> at
>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>> at
>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>> at scala.util.Try$.apply(Try.scala:209)
>> at
>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>> at
>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> at
>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>> at
>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>> at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>> at
>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>> ... 7 more
>>
>>
>>
>>

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

Posted by Andrey Zagrebin <az...@apache.org>.
Hi David,

This looks like a problem with resolution of maven dependencies or
something.
The custom WindowParquetGenericRecordListFileSink probably transitively
depends on org/joda/time/format/DateTimeParserBucket
and it is missing on the runtime classpath of Flink.

Best,
Andrey

On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
wrote:

> I'm implementing an exponential backoff inside a custom sink that uses an
> AvroParquetWriter to write to S3. I've change the number of attempts to 0
> inside the core-site.xml, and I'm capturing the timeout exception, doing a
> Thread.sleep for X seconds. This is working as intended, and when S3 is
> offline, it waits until it is online.
>
> I also want to test that the back pressure and the checkpoints are working
> as intended, and for the first one, I can see the back pressure in Flink UI
> going up, and recover as expected and not reading more data from Kafka.
>
> For the checkpoints, and I've added inside the sink invoke function a
> randomly exception (1 in 100, to simulate that a problem has happen, and
> need to recover from the last good checkpoint), but something strange
> happens. I can see the job is being canceled and created again, and running
> fine, other times after a X number of times of being created and canceled,
> it gives a *NoClassDefFoundError*, and it will keep giving that forever.
>
> Do you guys have any thoughts?
>
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception while processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: TimerException{java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeParserBucket}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
> ... 7 more
> Caused by: java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeParserBucket
> at
> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
> at
> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
> at
> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
> at
> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
> at
> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
> at
> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
> at
> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
> at
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
> at
> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
> at
> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
> at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
> at
> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
> at
> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.util.Try$.apply(Try.scala:209)
> at
> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
> at
> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
> at
> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
> ... 7 more
>
>
>
On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <sp...@gmail.com>
wrote:

> I'm implementing an exponential backoff inside a custom sink that uses an
> AvroParquetWriter to write to S3. I've change the number of attempts to 0
> inside the core-site.xml, and I'm capturing the timeout exception, doing a
> Thread.sleep for X seconds. This is working as intended, and when S3 is
> offline, it waits until it is online.
>
> I also want to test that the back pressure and the checkpoints are working
> as intended, and for the first one, I can see the back pressure in Flink UI
> going up, and recover as expected and not reading more data from Kafka.
>
> For the checkpoints, and I've added inside the sink invoke function a
> randomly exception (1 in 100, to simulate that a problem has happen, and
> need to recover from the last good checkpoint), but something strange
> happens. I can see the job is being canceled and created again, and running
> fine, other times after a X number of times of being created and canceled,
> it gives a *NoClassDefFoundError*, and it will keep giving that forever.
>
> Do you guys have any thoughts?
>
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception while processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: TimerException{java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeParserBucket}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
> ... 7 more
> Caused by: java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeParserBucket
> at
> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
> at
> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
> at
> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
> at
> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
> at
> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
> at
> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
> at
> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
> at
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
> at
> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
> at
> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
> at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
> at
> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
> at
> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.util.Try$.apply(Try.scala:209)
> at
> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
> at
> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
> at
> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
> at
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
> ... 7 more
>
>
>
>