You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sridevi Nookala <sn...@parallelwireless.com> on 2019/01/24 12:25:16 UTC

How to disable sharding with FileIO.write()/FileIO.writeDynamic

Hi,


I am writing a program that reads a CSV and writes to parquet.

PCollection<String> input =             pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
       input
        .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
        .apply(
            "Write Parquet files",
            FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));

    pipeline.run();
  }

Q1 ? how can i disable sharding in the above code and write to single parquet ?
Q2? how can i use a custom file naming with FileIO.write, say i want to use the schema name GenericRecord.getSchema().getName() as prefix
Do i have to use FileIO.writeDynamic()

thanks
Sri


Re: How to disable sharding with FileIO.write()/FileIO.writeDynamic

Posted by Jeff Klukas <jk...@mozilla.com>.
Hi Sri,

To Question 1, you should be able to set `...
.to("/tmp/parquet/").withNumShards(1)` to produce a single output file.

To Question 2, yes if your desired output file name depends on contents of
the record itself, that's exactly what FileIO.writeDynamic() is for. If you
can get the name from the static SCHEMA, though, writeDynamic wouldn't be
necessary. You can call `.withNaming(...)` to customize how the output file
will be named.

On Thu, Jan 24, 2019 at 7:25 AM Sridevi Nookala <
snookala@parallelwireless.com> wrote:

> Hi,
>
>
> I am writing a program that reads a CSV and writes to parquet.
>
> PCollection<String> input =             pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
>        input
>         .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn()))
> .setCoder(AvroCoder.of(SCHEMA))
>         .apply(
>             "Write Parquet files",
>             FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));
>
>     pipeline.run();
>   }
>
> Q1 ? how can i disable sharding in the above code and write to single parquet ?
> Q2? how can i use a custom file naming with FileIO.write, say i want to use the schema name GenericRecord.getSchema().getName() as prefix
> Do i have to use FileIO.writeDynamic()
>
> thanks
> Sri
>
>
>