You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Tapan Upadhyay <ta...@gmail.com> on 2021/02/18 01:32:41 UTC

FileIO.Write fails silently

Hi,

I am currently working on a Beam pipeline (Flink runner) where we read JSON
events from Kafka and write the output in parquet format to S3.

We write to S3 after every 10 min.

We have observed that our pipeline sometimes stops writing to S3 after
restarts (even for a non breaking minor code change), if we change kafka
offset and restart pipeline it starts writing to S3 again.

While s3 write fails, Pipeline runs fine without any issues and it
processes records until FileIO stage. It gives no error/exceptions in logs
but silently fails to write to S3 at FileIO stage.

This is the stage where it is not sending any records out -
FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards ->
FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
-> FileIO.Write/WriteFiles/GatherTempFileResults/Add void
key/AddKeys/Map/ParMultiDo(Anonymous)

We have checked our Windowing function by logging records after windowing,
windowing works fine.

This is our code snippet -

parquetRecord.apply("Batch Events", Window.<GenericRecord>into(

FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime))))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    .withAllowedLateness(Duration.ZERO,
Window.ClosingBehavior.FIRE_ALWAYS)
                    .discardingFiredPanes())

                    .apply(Distinct.create())

                    .apply(FileIO.<GenericRecord>write()
                            .via(ParquetIO.sink(getOutput_schema()))
                            .to(outputPath.isEmpty() ? outputPath() :
outputPath)
                            .withNumShards(1)
                            .withNaming(new
CustomFileNaming("snappy.parquet")));

Any idea what could be wrong here or any open bugs in Beam?


Regards,
Tapan Upadhyay

Re: FileIO.Write fails silently

Posted by Tapan Upadhyay <ta...@gmail.com>.
Yes we have checkpointing enabled in our cluster.

Pipeline runs fine when I do a restart without making any code changes but after adding a DoFn class (that too not used in pipeline), we have observed pipeline still reads from kafka, does windowing and distinct but does not write to S3.

I have checked the watermark for FileIO.Write does not progess in Flink UI and it shows the watermark when the savepoint was done.

On 2021/02/18 20:36:45, Reuven Lax <re...@google.com> wrote: 
> Do you have checkpointing enabled in your Flink cluster?
> 
> On Thu, Feb 18, 2021 at 11:50 AM Tapan Upadhyay <ta...@gmail.com> wrote:
> 
> > Hi,
> >
> > I am currently working on a Beam pipeline (Flink runner) where we read
> > JSON events from Kafka and write the output in parquet format to S3.
> >
> > We write to S3 after every 10 min.
> >
> > We have observed that our pipeline sometimes stops writing to S3 after
> > restarts (even for a non breaking minor code change), if we change kafka
> > offset and restart pipeline it starts writing to S3 again.
> >
> > While s3 write fails, Pipeline runs fine without any issues and it
> > processes records until FileIO stage. It gives no error/exceptions in logs
> > but silently fails to write to S3 at FileIO stage.
> >
> > This is the stage where it is not sending any records out -
> > FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards ->
> > FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
> > -> FileIO.Write/WriteFiles/GatherTempFileResults/Add void
> > key/AddKeys/Map/ParMultiDo(Anonymous)
> >
> > We have checked our Windowing function by logging records after windowing,
> > windowing works fine.
> >
> > This is our code snippet -
> >
> > parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
> >
> > FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime))))
> >                     .triggering(AfterWatermark.pastEndOfWindow())
> >                     .withAllowedLateness(Duration.ZERO,
> > Window.ClosingBehavior.FIRE_ALWAYS)
> >                     .discardingFiredPanes())
> >
> >                     .apply(Distinct.create())
> >
> >                     .apply(FileIO.<GenericRecord>write()
> >                             .via(ParquetIO.sink(getOutput_schema()))
> >                             .to(outputPath.isEmpty() ? outputPath() :
> > outputPath)
> >                             .withNumShards(1)
> >                             .withNaming(new
> > CustomFileNaming("snappy.parquet")));
> >
> > Any idea what could be wrong here or any open bugs in Beam?
> >
> >
> > Regards,
> > Tapan Upadhyay
> >
> >
> 

Re: FileIO.Write fails silently

Posted by Reuven Lax <re...@google.com>.
Do you have checkpointing enabled in your Flink cluster?

On Thu, Feb 18, 2021 at 11:50 AM Tapan Upadhyay <ta...@gmail.com> wrote:

> Hi,
>
> I am currently working on a Beam pipeline (Flink runner) where we read
> JSON events from Kafka and write the output in parquet format to S3.
>
> We write to S3 after every 10 min.
>
> We have observed that our pipeline sometimes stops writing to S3 after
> restarts (even for a non breaking minor code change), if we change kafka
> offset and restart pipeline it starts writing to S3 again.
>
> While s3 write fails, Pipeline runs fine without any issues and it
> processes records until FileIO stage. It gives no error/exceptions in logs
> but silently fails to write to S3 at FileIO stage.
>
> This is the stage where it is not sending any records out -
> FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards ->
> FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
> -> FileIO.Write/WriteFiles/GatherTempFileResults/Add void
> key/AddKeys/Map/ParMultiDo(Anonymous)
>
> We have checked our Windowing function by logging records after windowing,
> windowing works fine.
>
> This is our code snippet -
>
> parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
>
> FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime))))
>                     .triggering(AfterWatermark.pastEndOfWindow())
>                     .withAllowedLateness(Duration.ZERO,
> Window.ClosingBehavior.FIRE_ALWAYS)
>                     .discardingFiredPanes())
>
>                     .apply(Distinct.create())
>
>                     .apply(FileIO.<GenericRecord>write()
>                             .via(ParquetIO.sink(getOutput_schema()))
>                             .to(outputPath.isEmpty() ? outputPath() :
> outputPath)
>                             .withNumShards(1)
>                             .withNaming(new
> CustomFileNaming("snappy.parquet")));
>
> Any idea what could be wrong here or any open bugs in Beam?
>
>
> Regards,
> Tapan Upadhyay
>
>