You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Anand Singh Kunwar <an...@gmail.com> on 2020/03/13 09:25:48 UTC

Buggy/Slow FileIO.write()/sink implementation

Hi beam devs,

Context:
I have been experimenting with generating columnar data from prometheus
metric data to write to Google Cloud Storage. My pipeline takes input of
Prometheus Remote Write HTTP payload from kafka(this is compressed in
snappy and protobuf encoded), my first 2 steps of the pipeline do the
uncompression and decoding and make a metric object. I window this input to
fixed windows of 1 minute and write the window to GCS in ORC format. I have
been seeing huge lag in my pipeline.

Problem/Bug:
The custom FileIO.write().sink implementation for ORCIO writes to GCS using
the ORC library. In my sink implementation I even implemented all
operations as no-ops, even then I saw a huge lag in my pipeline. When I
comment out the FileIO transformation(that is acutally a no-op), my
pipeline keeps up with the input load.
Looking up online my problem seems to relate to this
https://stackoverflow.com/questions/54094102/beam-pipeline-kafka-to-hdfs-by-time-buckets
.

This is what my code looks like:

p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers(
        "mykafka:9092")
        .withTopic(options.getInputTopic())

.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
"custom-id",
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"))
        .withKeyDeserializer(LongDeserializer.class)

.withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
        .apply("UncompressSnappy", ParDo.of(new UncompressSnappy()))
        .apply("DecodeProto", ParDo.of(new DecodePromProto()))
        .apply("MapTSSample", ParDo.of(new MapTSSample()))

.apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
                .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
        .apply(new WriteOneFilePerWindow(options.getOutput(), 1, ".orc"));

This is what WriteOneFilePerWindow.java's expand looks like for me:

public PDone expand(PCollection<TSSample> input) {
    input.apply(FileIO.<TSSample>write().to(filenamePrefix).withNaming(new
MyFileNaming(filenameSuffix))
        .withNumShards(numShards).via(ORCIO.sink()));
    return PDone.in(input.getPipeline());
}


Best
Anand Singh Kunwar

Re: Buggy/Slow FileIO.write()/sink implementation

Posted by Reuven Lax <re...@google.com>.
What are you setting numShards to? This is likely to be the main bottleneck
on your pipeline.

On Fri, Mar 13, 2020 at 9:38 AM Anand Singh Kunwar <an...@gmail.com>
wrote:

> Hi beam devs,
>
> Context:
> I have been experimenting with generating columnar data from prometheus
> metric data to write to Google Cloud Storage. My pipeline takes input of
> Prometheus Remote Write HTTP payload from kafka(this is compressed in
> snappy and protobuf encoded), my first 2 steps of the pipeline do the
> uncompression and decoding and make a metric object. I window this input to
> fixed windows of 1 minute and write the window to GCS in ORC format. I have
> been seeing huge lag in my pipeline.
>
> Problem/Bug:
> The custom FileIO.write().sink implementation for ORCIO writes to GCS
> using the ORC library. In my sink implementation I even implemented all
> operations as no-ops, even then I saw a huge lag in my pipeline. When I
> comment out the FileIO transformation(that is acutally a no-op), my
> pipeline keeps up with the input load.
> Looking up online my problem seems to relate to this
> https://stackoverflow.com/questions/54094102/beam-pipeline-kafka-to-hdfs-by-time-buckets
> .
>
> This is what my code looks like:
>
> p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers(
>         "mykafka:9092")
>         .withTopic(options.getInputTopic())
>
> .withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
> "custom-id",
>                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"))
>         .withKeyDeserializer(LongDeserializer.class)
>
> .withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
>         .apply("UncompressSnappy", ParDo.of(new UncompressSnappy()))
>         .apply("DecodeProto", ParDo.of(new DecodePromProto()))
>         .apply("MapTSSample", ParDo.of(new MapTSSample()))
>
> .apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
>                 .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
>         .apply(new WriteOneFilePerWindow(options.getOutput(), 1, ".orc"));
>
> This is what WriteOneFilePerWindow.java's expand looks like for me:
>
> public PDone expand(PCollection<TSSample> input) {
>     input.apply(FileIO.<TSSample>write().to(filenamePrefix).withNaming(new
> MyFileNaming(filenameSuffix))
>         .withNumShards(numShards).via(ORCIO.sink()));
>     return PDone.in(input.getPipeline());
> }
>
>
> Best
> Anand Singh Kunwar
>