You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2022/03/20 17:26:00 UTC

[jira] [Updated] (BEAM-12279) Implement destination-dependent sharding in FileIO.writeDynamic

     [ https://issues.apache.org/jira/browse/BEAM-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-12279:
---------------------------------
    Labels: stale-P2  (was: )

> Implement destination-dependent sharding in FileIO.writeDynamic
> ---------------------------------------------------------------
>
>                 Key: BEAM-12279
>                 URL: https://issues.apache.org/jira/browse/BEAM-12279
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files, sdk-java-core
>    Affects Versions: 2.28.0
>            Reporter: Inessa Yakubov
>            Priority: P2
>              Labels: stale-P2
>
> Destination dependent sharding feature is very much needed in order to maintain manageable files sizes and file counts in google storage especially when data volumes are very large.
> Current implementation doesn't allow that (per documentation ) [https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html]
>  
> _*Note that currently sharding can not be destination-dependent: every window/pane for every destination will use the same number of shards specified via [{{FileIO.Write.withNumShards(int)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withNumShards-int-] or [{{FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withSharding-org.apache.beam.sdk.transforms.PTransform-].*_
>  
> **We use it as follows and end up with either very small or very large files per destination in the same window. Large files are not possible to open/process and small files clutter the bucket.
> {code}
> Pipeline pipeline = Pipeline.create(options);
>  pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
>  .apply(options.getWindowDuration() + " Window",
>  Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
>  .triggering(AfterWatermark.pastEndOfWindow()) 
>  .discardingFiredPanes()
>  .withAllowedLateness(parseDuration("24h")).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())))
> .apply(FileIO.<String,PubsubMessage>writeDynamic()
>  .by(new datePartition(options.getOutputFilenamePrefix(), options.getTimestampName()))
>  .via(Contextful.fn(
>  (SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
>  TextIO.sink())
>  .withDestinationCoder(StringUtf8Coder.of())
>  .to(options.getOutputDirectory())
>  .withNaming(type -> new CrowdStrikeFileNaming(type))
>  .withNumShards(options.getNumShards())
>  .withTempDirectory(options.getTempLocation()));
> pipeline.run();
> {code}  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)