You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 16:22:54 UTC

[GitHub] [beam] damccorm opened a new issue, #20252: Buggy/Slow FileIO.write()/sink implementation

damccorm opened a new issue, #20252:
URL: https://github.com/apache/beam/issues/20252

   Context:
   I have been experimenting with generating columnar data from prometheus metric data to write to Google Cloud Storage. My pipeline takes input of Prometheus Remote Write HTTP payload from kafka(this is compressed in snappy and protobuf encoded), my first 2 steps of the pipeline do the uncompression and decoding and make a metric object. I window this input to fixed windows of 1 minute and write the window to GCS in ORC format. I have been seeing huge lag in my pipeline.
   
    
   Problem/Bug:
   The custom FileIO.write().sink implementation for ORCIO writes to GCS using the ORC library. In my sink implementation I even implemented all operations as no-ops, even then I saw a huge lag in my pipeline. When I comment out the FileIO transformation(that is actually a no-op), my pipeline keeps up with the input load.
   Looking up online my problem seems to relate to this [https://stackoverflow.com/questions/54094102/beam-pipeline-kafka-to-hdfs-by-time-buckets](https://stackoverflow.com/questions/54094102/beam-pipeline-kafka-to-hdfs-by-time-buckets).
    
   I've tried running this on dataflow.
    
   This is what my code looks like:
   ```
   
   p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers(
           "mykafka:9092")
    
         .withTopic(options.getInputTopic())
           .withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
   "custom-id",
                   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"))
           .withKeyDeserializer(LongDeserializer.class)
    
         .withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
           .apply("UncompressSnappy",
   ParDo.of(new UncompressSnappy()))
           .apply("DecodeProto", ParDo.of(new DecodePromProto()))
    
         .apply("MapTSSample", ParDo.of(new MapTSSample()))
           .apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
    
                 .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
           .apply(new WriteOneFilePerWindow(options.getOutput(),
   1, ".orc"));
   ```
   
    
   This is what WriteOneFilePerWindow.java's expand looks like for me:
   
   ```
   
   public PDone expand(PCollection<TSSample> input) {
       input.apply(FileIO.<TSSample>write().to(filenamePrefix).withNaming(new
   MyFileNaming(filenameSuffix))
           .withNumShards(numShards).via(ORCIO.sink()));
       return PDone.in(input.getPipeline());
   }
   ```
   
   
   Imported from Jira [BEAM-9528](https://issues.apache.org/jira/browse/BEAM-9528). Original Jira may contain additional context.
   Reported by: anandsinghkunwar.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org