You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Inessa Yakubov (Jira)" <ji...@apache.org> on 2021/05/04 15:36:00 UTC

[jira] [Created] (BEAM-12279) Implement destination-dependent sharding in FileIO.writeDynamic

Inessa Yakubov created BEAM-12279:
-------------------------------------

             Summary: Implement destination-dependent sharding in FileIO.writeDynamic
                 Key: BEAM-12279
                 URL: https://issues.apache.org/jira/browse/BEAM-12279
             Project: Beam
          Issue Type: Improvement
          Components: beam-model, java-fn-execution
    Affects Versions: 2.28.0
            Reporter: Inessa Yakubov


Destination dependent sharding feature is very much needed in order to maintain manageable files sizes and file counts in google storage especially when data volumes are very large.

Current implementation doesn't allow that (per documentation ) [https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html]

 

_*Note that currently sharding can not be destination-dependent: every window/pane for every destination will use the same number of shards specified via [{{FileIO.Write.withNumShards(int)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withNumShards-int-] or [{{FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withSharding-org.apache.beam.sdk.transforms.PTransform-].*_

 

**We use it as follows and end up with either very small or very large files per destination in the same window. Large files are not possible to open/process and small files clutter the bucket.

Pipeline pipeline = Pipeline.create(options);
 pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
 .apply(options.getWindowDuration() + " Window",
 Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
 .triggering(AfterWatermark.pastEndOfWindow()) 
 .discardingFiredPanes()
 .withAllowedLateness(parseDuration("24h")).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())))

.apply(FileIO.<String,PubsubMessage>writeDynamic()
 .by(new datePartition(options.getOutputFilenamePrefix(), options.getTimestampName()))
 .via(Contextful.fn(
 (SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
 TextIO.sink())
 .withDestinationCoder(StringUtf8Coder.of())
 .to(options.getOutputDirectory())
 .withNaming(type -> new CrowdStrikeFileNaming(type))
 .withNumShards(options.getNumShards())
 .withTempDirectory(options.getTempLocation()));

pipeline.run();
  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)