You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jaya Ananthram <ja...@ft.com.INVALID> on 2022/07/11 19:00:58 UTC

Not able to run a simple table API file streaming sink

Hello There,

I am trying to write a simple table API S3 streaming sink using flink
1.15.1 and I am facing the following exception,

Caused by: org.apache.flink.util.SerializedThrowable:
S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to
create a persistent recoverable intermediate point.
at
org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.sync(RefCountedBufferingFileStream.java:111)
~[flink-s3-fs-hadoop-1.15.1.jar:1.15.1]
at
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.sync(S3RecoverableFsDataOutputStream.java:129)
~[flink-s3-fs-hadoop-1.15.1.jar:1.15.1]
at
org.apache.flink.formats.csv.CsvBulkWriter.finish(CsvBulkWriter.java:110)
~[flink-csv-1.15.1.jar:1.15.1]
at
org.apache.flink.connector.file.table.FileSystemTableSink$ProjectionBulkFactory$1.finish(FileSystemTableSink.java:642)
~[flink-connector-files-1.15.1.jar:1.15.1]
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:64)
~[flink-file-sink-common-1.15.1.jar:1.15.1]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
~[flink-streaming-java-1.15.1.jar:1.15.1]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:305)
~[flink-streaming-java-1.15.1.jar:1.15.1]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:277)
~[flink-streaming-java-1.15.1.jar:1.15.1]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:270)
~[flink-streaming-java-1.15.1.jar:1.15.1]
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:261)
~[flink-streaming-java-1.15.1.jar:1.15.1]
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(StreamingFileSinkHelper.java:87)
~[flink-streaming-java-1.15.1.jar:1.15.1]
at
org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(AbstractStreamingWriter.java:129)
~[flink-connector-files-1.15.1.jar:1.15.1]

In my config, I am trying to read from Kafka and write to S3 (s3a) using
table API and checkpoint configuration using s3p (presto). Even I tried
with a simple datagen example instead of Kafka and I am getting the same
issue. I think I am following all the exact steps mentioned in the docs and
the above exceptions are not much helpful. Exactly it is failing when the
code triggers the checkpoint but I don't have any clue after this. Could
someone please help me to understand what I am missing here? I don't find
any open issue with such logs.

Hoping for your reply.

-- 


*This email was
sent by a company owned by Financial Times Group Limited 
("FT Group <https://aboutus.ft.com/company/ft-group>"), registered office 
at Bracken House, One Friday Street, London, EC4M 9BT. Registered in 
England and Wales with company
number 879531. This e-mail may contain 
confidential information. If you
are not the intended recipient, please 
notify the sender immediately, delete
all copies and do not distribute it 
further.  It could* *also
contain personal views which are not necessarily 
those of the FT Group. 
We may monitor outgoing or incoming emails as 
permitted by law.*

Re: Not able to run a simple table API file streaming sink

Posted by Martijn Visser <ma...@apache.org>.
Hi all,

For completeness, for this issue a ticket has been created at
https://issues.apache.org/jira/browse/FLINK-28513

Best regards,

Martijn

Op di 12 jul. 2022 om 03:49 schreef Jaya Ananthram
<ja...@ft.com.invalid>:

> Hello There,
>
> I am trying to write a simple table API S3 streaming sink using flink
> 1.15.1 and I am facing the following exception,
>
> Caused by: org.apache.flink.util.SerializedThrowable:
> S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to
> create a persistent recoverable intermediate point.
> at
>
> org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.sync(RefCountedBufferingFileStream.java:111)
> ~[flink-s3-fs-hadoop-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.sync(S3RecoverableFsDataOutputStream.java:129)
> ~[flink-s3-fs-hadoop-1.15.1.jar:1.15.1]
> at
> org.apache.flink.formats.csv.CsvBulkWriter.finish(CsvBulkWriter.java:110)
> ~[flink-csv-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.connector.file.table.FileSystemTableSink$ProjectionBulkFactory$1.finish(FileSystemTableSink.java:642)
> ~[flink-connector-files-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:64)
> ~[flink-file-sink-common-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:305)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:277)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:270)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:261)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(StreamingFileSinkHelper.java:87)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(AbstractStreamingWriter.java:129)
> ~[flink-connector-files-1.15.1.jar:1.15.1]
>
> In my config, I am trying to read from Kafka and write to S3 (s3a) using
> table API and checkpoint configuration using s3p (presto). Even I tried
> with a simple datagen example instead of Kafka and I am getting the same
> issue. I think I am following all the exact steps mentioned in the docs and
> the above exceptions are not much helpful. Exactly it is failing when the
> code triggers the checkpoint but I don't have any clue after this. Could
> someone please help me to understand what I am missing here? I don't find
> any open issue with such logs.
>
> Hoping for your reply.
>
> --
>
>
> *This email was
> sent by a company owned by Financial Times Group Limited
> ("FT Group <https://aboutus.ft.com/company/ft-group>"), registered office
> at Bracken House, One Friday Street, London, EC4M 9BT. Registered in
> England and Wales with company
> number 879531. This e-mail may contain
> confidential information. If you
> are not the intended recipient, please
> notify the sender immediately, delete
> all copies and do not distribute it
> further.  It could* *also
> contain personal views which are not necessarily
> those of the FT Group.
> We may monitor outgoing or incoming emails as
> permitted by law.*
>