You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/12 18:03:56 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

chamikaramj commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r592923840



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -301,6 +321,21 @@
     return toBuilder().setMaxNumWritersPerBundle(-1).build();
   }
 
+  /**
+   * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with
+   * runner-determined sharding for unbounded data specifically. Currently manual sharding is
+   * required for writing unbounded data with a fixed number of shards or a predefined sharding
+   * function. This option allows the runners to get around that requirement and perform automatic
+   * sharding.
+   *
+   * <p>Intended to only be used by runners. Users should use {@link

Review comment:
       How does the runner invoke this ? Usually a runner will be behind the Fn API boundary so will not be able to directly invoke this.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -672,13 +740,102 @@ private WriteShardedBundlesToTempFiles(
                   .withSideInputs(shardingSideInputs))
           .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
           .apply("GroupIntoShards", GroupByKey.create())
+          .apply(
+              "KeyedByShardNum",
+              MapElements.via(
+                  new SimpleFunction<
+                      KV<ShardedKey<Integer>, Iterable<UserT>>, KV<Integer, Iterable<UserT>>>() {
+                    @Override
+                    public KV<Integer, Iterable<UserT>> apply(
+                        KV<ShardedKey<Integer>, Iterable<UserT>> input) {
+                      return KV.of(input.getKey().getShardNumber(), input.getValue());
+                    }
+                  }))
+          .setCoder(KvCoder.of(VarIntCoder.of(), IterableCoder.of(input.getCoder())))
           .apply(
               "WriteShardsIntoTempFiles",
               ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
           .setCoder(fileResultCoder);
     }
   }
 
+  private class WriteAutoShardedBundlesToTempFiles
+      extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
+    private final Coder<DestinationT> destinationCoder;
+    private final Coder<FileResult<DestinationT>> fileResultCoder;
+
+    private WriteAutoShardedBundlesToTempFiles(
+        Coder<DestinationT> destinationCoder, Coder<FileResult<DestinationT>> fileResultCoder) {
+      this.destinationCoder = destinationCoder;
+      this.fileResultCoder = fileResultCoder;
+    }
+
+    @Override
+    public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
+      checkArgument(
+          getWithRunnerDeterminedShardingUnbounded(),
+          "Runner determined sharding for unbounded data is not supported by the runner.");
+      // Auto-sharding is achieved via GroupIntoBatches.WithShardedKey which shards, groups and at
+      // the same time batches the input records. The sharding behavior depends on runners. The
+      // batching is per window and we also emit the batches if there are a certain number of
+      // records buffered or they have been buffered for a certain time, controlled by
+      // FILE_TRIGGERING_RECORD_COUNT and BUFFERING_DURATION respectively.
+      return input
+          .apply(
+              "KeyedByDestination",
+              ParDo.of(
+                  new DoFn<UserT, KV<Integer, UserT>>() {
+                    @ProcessElement
+                    public void processElement(@Element UserT element, ProcessContext context)
+                        throws Exception {
+                      getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
+                      DestinationT destination =
+                          getDynamicDestinations().getDestination(context.element());
+                      context.output(
+                          KV.of(hashDestination(destination, destinationCoder), element));
+                    }
+                  }))
+          .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+          .apply(
+              "ShardAndGroup",
+              GroupIntoBatches.<Integer, UserT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+                  .withMaxBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
+                  .withShardedKey())
+          .setCoder(
+              KvCoder.of(
+                  org.apache.beam.sdk.util.ShardedKey.Coder.of(VarIntCoder.of()),
+                  IterableCoder.of(input.getCoder())))
+          // Add dummy shard since it is required by WriteShardsIntoTempFilesFn. It will be dropped

Review comment:
       Should we update "WriteShardsIntoTempFilesFn" instead ?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
     @Override
     public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
       if (getWindowedWrites()) {
-        // Reshuffle the results to make them stable against retries.
+        // Group the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
         return input
-            .apply("Add void key", WithKeys.of((Void) null))
-            .apply("Reshuffle", Reshuffle.of())
-            .apply("Drop key", Values.create())
-            .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<>()))
+            .apply("AddVoidKey", WithKeys.of((Void) null))

Review comment:
       What's the reason for replacing Reshuffle here ?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -314,12 +349,16 @@ public void validate(PipelineOptions options) {
           "Must use windowed writes when applying %s to an unbounded PCollection",
           WriteFiles.class.getSimpleName());
       // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
-      // and similar behavior in other runners.
-      checkArgument(
-          getComputeNumShards() != null || getNumShardsProvider() != null,
-          "When applying %s to an unbounded PCollection, "
-              + "must specify number of output shards explicitly",
-          WriteFiles.class.getSimpleName());
+      // and similar behavior in other runners. Runners can choose to ignore this check and perform
+      // runner determined sharding for unbounded data by overriding the option

Review comment:
       Ditto. I'm not sure about "Runners can choose to ignore this check" language.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -672,13 +740,102 @@ private WriteShardedBundlesToTempFiles(
                   .withSideInputs(shardingSideInputs))
           .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
           .apply("GroupIntoShards", GroupByKey.create())
+          .apply(

Review comment:
       Ditto.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -465,6 +520,19 @@ private WriteUnshardedBundlesToTempFiles(
               // shard numbers are assigned together to both the spilled and non-spilled files in
               // finalize.
               .apply("GroupUnwritten", GroupByKey.create())
+              .apply(

Review comment:
       Does the GBK have to be after adding keys here ?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -354,9 +393,13 @@ public void validate(PipelineOptions options) {
 
     PCollection<FileResult<DestinationT>> tempFileResults =
         (getComputeNumShards() == null && getNumShardsProvider() == null)
-            ? input.apply(
-                "WriteUnshardedBundlesToTempFiles",
-                new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder))
+            ? input.isBounded() == IsBounded.BOUNDED
+                ? input.apply(
+                    "WriteUnshardedBundlesToTempFiles",
+                    new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder))
+                : input.apply(
+                    "WriteAutoShardedBundlesToTempFiles",

Review comment:
       Probably we should check that auto-sharding is explicitly enabled by performing this call ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org