You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 16:34:03 UTC

[GitHub] [beam] damccorm opened a new issue, #20332: FileIO writeDynamic with AvroIO.sink not writing all data

damccorm opened a new issue, #20332:
URL: https://github.com/apache/beam/issues/20332

   FileIO writeDynamic with AvroIO.sink is not writing all data in the following pipeline. The amount of data written varies between runs but it is consistently dropping records. This is with a very small test dataset - 6 records, which should produce 3 directories.
   
   ```
   
   Pipeline p = Pipeline.create(options);
   PCollection<KV<String, AvroRecord>> records = p.apply(TextIO.read().from("/tmp/input.csv"))
   .apply(ParDo.of(new
   StringToDatasetIDAvroRecordFcn()));
   
   //write out into AVRO in each separate directory
   records.apply("Write
   avro file per dataset", FileIO.<String, KV<String, AvroRecord>>writeDynamic()
     .by(KV::getKey)
    
   .via(Contextful.fn(KV::getValue), Contextful.fn(x -> AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))
   
    .to(options.getTargetPath())
     .withDestinationCoder(StringUtf8Coder.of())
     .withNaming(key -> defaultNaming(key
   + "/export", PipelinesVariables.Pipeline.AVRO_EXTENSION)));
   
   p.run().waitUntilFinish();
   
   ```
   
   
   
   If i replace AvroIO.sink() with TextIO.sink() (and replace the initial mapping function) then the correct number of records are written to the separate directories. This is working consistently.
   
   e.g.
   
   ```
   
   // Initialise pipeline
   Pipeline p = Pipeline.create(options);
   
   PCollection<KV<String, String>> records
   = p.apply(TextIO.read().from("/tmp/input.csv")).apply(ParDo.of(new StringToDatasetIDKVFcn()));
   
   //write
   out into AVRO in each separate directory
   records.apply("Write CSV file per dataset", FileIO.<String,
   KV<String, String>>writeDynamic()
       .by(KV::getKey)
       .via(Contextful.fn(KV::getValue), TextIO.sink())
   
      .to(options.getTargetPath())
       .withDestinationCoder(StringUtf8Coder.of())
       .withNaming(datasetID
   -> defaultNaming(key + "/export", ".csv"));
   
    p.run().waitUntilFinish();
   
   ```
   
   
   cc [~timrobertson100]
   
   
   
   Imported from Jira [BEAM-10100](https://issues.apache.org/jira/browse/BEAM-10100). Original Jira may contain additional context.
   Reported by: djtfmartin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #20332: FileIO writeDynamic with AvroIO.sink not writing all data

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #20332:
URL: https://github.com/apache/beam/issues/20332#issuecomment-1246027969

   @johnjcasey this is reminiscent of something discussed today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #20332: FileIO writeDynamic with AvroIO.sink not writing all data

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on issue #20332:
URL: https://github.com/apache/beam/issues/20332#issuecomment-1246933572

   @Abacn you were attempting to repro this, did you have any success?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #20332: FileIO writeDynamic with AvroIO.sink not writing all data

Posted by GitBox <gi...@apache.org>.
Abacn commented on issue #20332:
URL: https://github.com/apache/beam/issues/20332#issuecomment-1246935493

   @johnjcasey I was not reproduce the issue with flink running locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey closed issue #20332: FileIO writeDynamic with AvroIO.sink not writing all data

Posted by GitBox <gi...@apache.org>.
johnjcasey closed issue #20332: FileIO writeDynamic with AvroIO.sink not writing all data
URL: https://github.com/apache/beam/issues/20332


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #20332: FileIO writeDynamic with AvroIO.sink not writing all data

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on issue #20332:
URL: https://github.com/apache/beam/issues/20332#issuecomment-1251199000

   I'm going to close this as not reproduceable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org