You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Tapan Upadhyay <ta...@gmail.com> on 2021/02/22 08:42:46 UTC

Apache Beam - Flink runner - FileIO.write issues in S3 writes after deploys

Hi,

I am currently working on a Beam pipeline (2.23) (Flink runner - 1.8) where
we read JSON events from Kafka and write the output in parquet format to S3.

We write to S3 after every 10 min.

We have observed that our pipeline sometimes stops writing to S3 after
making minor non breaking code changes and deploying pipeline, if we change
kafka offset and restart pipeline it starts writing to S3 again.

While FileIO does not write to s3, Pipeline runs fine without any
error/exception and it processes records until FileIO stage. It gives no
error/exceptions in logs but silently fails to process anything at FileIO
stage.

Checkpointing is enabled in our cluster.

Watermark also does not progress for that stage and it shows watermark of
the time when pipeline was stopped for deploy (savepoint time)

We have checked our Windowing function by logging records after windowing,
windowing works fine.

Also if we replace FileIO with Kafka as output, the pipeline runs fine and
keeps outputting records to kafka after deploys.

This is our code snippet -

parquetRecord.apply("Batch Events", Window.<GenericRecord>into(

FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime))))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    .withAllowedLateness(Duration.ZERO,
Window.ClosingBehavior.FIRE_ALWAYS)
                    .discardingFiredPanes())

                    .apply(Distinct.create())

                    .apply(FileIO.<GenericRecord>write()
                            .via(ParquetIO.sink(getOutput_schema()))
                            .to(outputPath.isEmpty() ? outputPath() :
outputPath)
                            .withNumShards(1)
                            .withNaming(new
CustomFileNaming("snappy.parquet")));

Flink UI screenshot. It shows records are coming till FileIO.Write.

This is the stage where it is not sending any records out -

FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards ->
FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
-> FileIO.Write/WriteFiles/GatherTempFileResults/Add void
key/AddKeys/Map/ParMultiDo(Anonymous)

[image: Flink UI screenshot] <https://i.stack.imgur.com/yGWsw.png>

Any idea what could be wrong here or any open bugs in Beam/Flink?


PS - reposting this question, I had some issues in subscribing to mailing
list


Regards,
Tapan Upadhyay