You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Brian Hulette (Jira)" <ji...@apache.org> on 2021/09/29 18:56:00 UTC
[jira] [Commented] (BEAM-12279) Implement destination-dependent
sharding in FileIO.writeDynamic
[ https://issues.apache.org/jira/browse/BEAM-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17422343#comment-17422343 ]
Brian Hulette commented on BEAM-12279:
--------------------------------------
This looks like it could be a dupe of BEAM-10068
> Implement destination-dependent sharding in FileIO.writeDynamic
> ---------------------------------------------------------------
>
> Key: BEAM-12279
> URL: https://issues.apache.org/jira/browse/BEAM-12279
> Project: Beam
> Issue Type: Improvement
> Components: io-java-files, sdk-java-core
> Affects Versions: 2.28.0
> Reporter: Inessa Yakubov
> Priority: P1
>
> Destination dependent sharding feature is very much needed in order to maintain manageable files sizes and file counts in google storage especially when data volumes are very large.
> Current implementation doesn't allow that (per documentation ) [https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html]
>
> _*Note that currently sharding can not be destination-dependent: every window/pane for every destination will use the same number of shards specified via [{{FileIO.Write.withNumShards(int)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withNumShards-int-] or [{{FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withSharding-org.apache.beam.sdk.transforms.PTransform-].*_
>
> **We use it as follows and end up with either very small or very large files per destination in the same window. Large files are not possible to open/process and small files clutter the bucket.
> {code}
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
> .apply(options.getWindowDuration() + " Window",
> Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
> .triggering(AfterWatermark.pastEndOfWindow())
> .discardingFiredPanes()
> .withAllowedLateness(parseDuration("24h")).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())))
> .apply(FileIO.<String,PubsubMessage>writeDynamic()
> .by(new datePartition(options.getOutputFilenamePrefix(), options.getTimestampName()))
> .via(Contextful.fn(
> (SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
> TextIO.sink())
> .withDestinationCoder(StringUtf8Coder.of())
> .to(options.getOutputDirectory())
> .withNaming(type -> new CrowdStrikeFileNaming(type))
> .withNumShards(options.getNumShards())
> .withTempDirectory(options.getTempLocation()));
> pipeline.run();
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)