You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Dave Martin (Jira)" <ji...@apache.org> on 2020/05/27 11:41:00 UTC
[jira] [Created] (BEAM-10100) FileIO writeDynamic with AvroIO.sink
not writing all data
Dave Martin created BEAM-10100:
----------------------------------
Summary: FileIO writeDynamic with AvroIO.sink not writing all data
Key: BEAM-10100
URL: https://issues.apache.org/jira/browse/BEAM-10100
Project: Beam
Issue Type: Bug
Components: beam-community
Affects Versions: 2.20.0, 2.17.0
Environment: Mac OSX Catalina, tested with SparkRunner - Spark 2.4.5.
Reporter: Dave Martin
Assignee: Aizhamal Nurmamat kyzy
{code:java}
Pipeline p = Pipeline.create(options);
PCollection<KV<String, AvroRecord>> records = p.apply(TextIO.read().from("/tmp/input.csv"))
.apply(ParDo.of(new StringToDatasetIDAvroRecordFcn()));
//write out into AVRO in each separate directory
records.apply("Write avro file per dataset", FileIO.<String, KV<String, AvroRecord>>writeDynamic()
.by(KV::getKey)
.via(Contextful.fn(KV::getValue), Contextful.fn(x -> AvroIO.sink(AvroRecord.class).withCodec(BASE_CODEC)))
.to(options.getTargetPath())
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(key -> defaultNaming(key + "/export", PipelinesVariables.Pipeline.AVRO_EXTENSION)));
p.run().waitUntilFinish();
{code}
If i replace AvroIO.sink() with TextIO.sink() (and replace the initial mapping function) then the correct number of records are written to the separate directories.
e.g.
{code:java}
// Initialise pipeline
Pipeline p = Pipeline.create(options);
PCollection<KV<String, AvroRecord>> records = p.apply(TextIO.read().from("/tmp/input.csv"))
.apply(ParDo.of(new StringToDatasetIDKVFcn()));
//write out into AVRO in each separate directory
records.apply("Write CSV file per dataset", FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(options.getTargetPath())
.withDestinationCoder(StringUtf8Coder.of())
.withNaming(datasetID -> defaultNaming(key + "/export", ".csv"));
p.run().waitUntilFinish();
{code}
cc [~timrobertson100]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)