You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/08 00:56:38 UTC

[GitHub] [beam] nehsyc opened a new pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

nehsyc opened a new pull request #14164:
URL: https://github.com/apache/beam/pull/14164


   Currently users are required to provide `numShards` or `shardingFn` for streaming writes. This PR adds an internal option to `WriteFiles` transform for runners to perform dynamic sharding for unbounded data. So if a runner supports it, users can also use `withRunnerDeterminedSharding` for streaming processing.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-794274460


   Didn't mean to push Dataflow related changes to this PR. Reverted.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-792929362


   Failed tests look unrelated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj merged pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
chamikaramj merged pull request #14164:
URL: https://github.com/apache/beam/pull/14164


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r595710982



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -133,7 +139,14 @@
   // We could consider making this a parameter.
   private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
 
+  // The record count and buffering duration to trigger flushing records to a tmp file. Mainly used
+  // for writing unbounded data to avoid generating too many small files.
+  private static final int FILE_TRIGGERING_RECORD_COUNT = 10000;
+  private static final Duration FILE_TRIGGERING_RECORD_BUFFERING_DURATION =
+      Duration.standardSeconds(1);

Review comment:
       These values are rather random :\ Let me know if you have insights of what values would be more proper.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593526147



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -672,13 +740,102 @@ private WriteShardedBundlesToTempFiles(
                   .withSideInputs(shardingSideInputs))
           .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
           .apply("GroupIntoShards", GroupByKey.create())
+          .apply(
+              "KeyedByShardNum",
+              MapElements.via(
+                  new SimpleFunction<
+                      KV<ShardedKey<Integer>, Iterable<UserT>>, KV<Integer, Iterable<UserT>>>() {
+                    @Override
+                    public KV<Integer, Iterable<UserT>> apply(
+                        KV<ShardedKey<Integer>, Iterable<UserT>> input) {
+                      return KV.of(input.getKey().getShardNumber(), input.getValue());
+                    }
+                  }))
+          .setCoder(KvCoder.of(VarIntCoder.of(), IterableCoder.of(input.getCoder())))
           .apply(
               "WriteShardsIntoTempFiles",
               ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
           .setCoder(fileResultCoder);
     }
   }
 
+  private class WriteAutoShardedBundlesToTempFiles
+      extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
+    private final Coder<DestinationT> destinationCoder;
+    private final Coder<FileResult<DestinationT>> fileResultCoder;
+
+    private WriteAutoShardedBundlesToTempFiles(
+        Coder<DestinationT> destinationCoder, Coder<FileResult<DestinationT>> fileResultCoder) {
+      this.destinationCoder = destinationCoder;
+      this.fileResultCoder = fileResultCoder;
+    }
+
+    @Override
+    public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
+      checkArgument(
+          getWithRunnerDeterminedShardingUnbounded(),
+          "Runner determined sharding for unbounded data is not supported by the runner.");
+      // Auto-sharding is achieved via GroupIntoBatches.WithShardedKey which shards, groups and at
+      // the same time batches the input records. The sharding behavior depends on runners. The
+      // batching is per window and we also emit the batches if there are a certain number of
+      // records buffered or they have been buffered for a certain time, controlled by
+      // FILE_TRIGGERING_RECORD_COUNT and BUFFERING_DURATION respectively.
+      return input
+          .apply(
+              "KeyedByDestination",
+              ParDo.of(
+                  new DoFn<UserT, KV<Integer, UserT>>() {
+                    @ProcessElement
+                    public void processElement(@Element UserT element, ProcessContext context)
+                        throws Exception {
+                      getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
+                      DestinationT destination =
+                          getDynamicDestinations().getDestination(context.element());
+                      context.output(
+                          KV.of(hashDestination(destination, destinationCoder), element));
+                    }
+                  }))
+          .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+          .apply(
+              "ShardAndGroup",
+              GroupIntoBatches.<Integer, UserT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+                  .withMaxBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
+                  .withShardedKey())
+          .setCoder(
+              KvCoder.of(
+                  org.apache.beam.sdk.util.ShardedKey.Coder.of(VarIntCoder.of()),
+                  IterableCoder.of(input.getCoder())))
+          // Add dummy shard since it is required by WriteShardsIntoTempFilesFn. It will be dropped

Review comment:
       Not sure. `WriteShardsIntoTempFilesFn` is also used in the fixed-sharding case where we do want to ensure that a shard is assigned properly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593525983



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -314,12 +349,16 @@ public void validate(PipelineOptions options) {
           "Must use windowed writes when applying %s to an unbounded PCollection",
           WriteFiles.class.getSimpleName());
       // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
-      // and similar behavior in other runners.
-      checkArgument(
-          getComputeNumShards() != null || getNumShardsProvider() != null,
-          "When applying %s to an unbounded PCollection, "
-              + "must specify number of output shards explicitly",
-          WriteFiles.class.getSimpleName());
+      // and similar behavior in other runners. Runners can choose to ignore this check and perform
+      // runner determined sharding for unbounded data by overriding the option

Review comment:
       Resolved.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593405175



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -301,6 +321,21 @@
     return toBuilder().setMaxNumWritersPerBundle(-1).build();
   }
 
+  /**
+   * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with
+   * runner-determined sharding for unbounded data specifically. Currently manual sharding is
+   * required for writing unbounded data with a fixed number of shards or a predefined sharding
+   * function. This option allows the runners to get around that requirement and perform automatic
+   * sharding.
+   *
+   * <p>Intended to only be used by runners. Users should use {@link

Review comment:
       Ok, makes sense. I think a PTransformOverride should work for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-805148216


   @chamikaramj Hey Cham, I uploaded a new commit. You are right about the risk of changing `Reshuffle` to `GBK` since rewindowing would be needed if using session windows. The new code path for auto-sharding however needs a true `GBK` to gather temp file results in the same window. The current implementation does not yet work with session windows and I'd like to do that in a separate PR. Could you take a pass again to see if everything makes sense to you now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593525898



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -465,6 +520,19 @@ private WriteUnshardedBundlesToTempFiles(
               // shard numbers are assigned together to both the spilled and non-spilled files in
               // finalize.
               .apply("GroupUnwritten", GroupByKey.create())
+              .apply(

Review comment:
       Note that this is the fixed-sharding case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r599834872



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
     @Override
     public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
       if (getWindowedWrites()) {
-        // Reshuffle the results to make them stable against retries.
+        // Group the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
         return input
-            .apply("Add void key", WithKeys.of((Void) null))
-            .apply("Reshuffle", Reshuffle.of())
-            .apply("Drop key", Values.create())
-            .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<>()))
+            .apply("AddVoidKey", WithKeys.of((Void) null))

Review comment:
       After some investigation and discussions with @robertwb, the cause of `Reshuffle` not working for the new path is that it applies a `ReshuffleTrigger` that triggers on every element so we will get only one element per bundle downstream. So even though we call the following DoFn `GatherBundlesPerWindowFn`, it doesn't actually gather the elements per window. That works OK in the fixed sharding case since we encode the shard idx explicitly. However, in the auto-sharding case, we don't know the shard idx and have to rely on counting the shards within a bundle, meaning that we need to guarantee that the elements in the same window are gathered in the same bundle to perform the final renaming.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -465,6 +520,19 @@ private WriteUnshardedBundlesToTempFiles(
               // shard numbers are assigned together to both the spilled and non-spilled files in
               // finalize.
               .apply("GroupUnwritten", GroupByKey.create())
+              .apply(

Review comment:
       This was made mainly because I changed the signature of `WriteShardsIntoTempFilesFn` to accept ((key, shard), V) to (shard, V). I reverted it in the latest commit to make the changes more clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r592923840



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -301,6 +321,21 @@
     return toBuilder().setMaxNumWritersPerBundle(-1).build();
   }
 
+  /**
+   * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with
+   * runner-determined sharding for unbounded data specifically. Currently manual sharding is
+   * required for writing unbounded data with a fixed number of shards or a predefined sharding
+   * function. This option allows the runners to get around that requirement and perform automatic
+   * sharding.
+   *
+   * <p>Intended to only be used by runners. Users should use {@link

Review comment:
       How does the runner invoke this ? Usually a runner will be behind the Fn API boundary so will not be able to directly invoke this.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -672,13 +740,102 @@ private WriteShardedBundlesToTempFiles(
                   .withSideInputs(shardingSideInputs))
           .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
           .apply("GroupIntoShards", GroupByKey.create())
+          .apply(
+              "KeyedByShardNum",
+              MapElements.via(
+                  new SimpleFunction<
+                      KV<ShardedKey<Integer>, Iterable<UserT>>, KV<Integer, Iterable<UserT>>>() {
+                    @Override
+                    public KV<Integer, Iterable<UserT>> apply(
+                        KV<ShardedKey<Integer>, Iterable<UserT>> input) {
+                      return KV.of(input.getKey().getShardNumber(), input.getValue());
+                    }
+                  }))
+          .setCoder(KvCoder.of(VarIntCoder.of(), IterableCoder.of(input.getCoder())))
           .apply(
               "WriteShardsIntoTempFiles",
               ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
           .setCoder(fileResultCoder);
     }
   }
 
+  private class WriteAutoShardedBundlesToTempFiles
+      extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
+    private final Coder<DestinationT> destinationCoder;
+    private final Coder<FileResult<DestinationT>> fileResultCoder;
+
+    private WriteAutoShardedBundlesToTempFiles(
+        Coder<DestinationT> destinationCoder, Coder<FileResult<DestinationT>> fileResultCoder) {
+      this.destinationCoder = destinationCoder;
+      this.fileResultCoder = fileResultCoder;
+    }
+
+    @Override
+    public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
+      checkArgument(
+          getWithRunnerDeterminedShardingUnbounded(),
+          "Runner determined sharding for unbounded data is not supported by the runner.");
+      // Auto-sharding is achieved via GroupIntoBatches.WithShardedKey which shards, groups and at
+      // the same time batches the input records. The sharding behavior depends on runners. The
+      // batching is per window and we also emit the batches if there are a certain number of
+      // records buffered or they have been buffered for a certain time, controlled by
+      // FILE_TRIGGERING_RECORD_COUNT and BUFFERING_DURATION respectively.
+      return input
+          .apply(
+              "KeyedByDestination",
+              ParDo.of(
+                  new DoFn<UserT, KV<Integer, UserT>>() {
+                    @ProcessElement
+                    public void processElement(@Element UserT element, ProcessContext context)
+                        throws Exception {
+                      getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
+                      DestinationT destination =
+                          getDynamicDestinations().getDestination(context.element());
+                      context.output(
+                          KV.of(hashDestination(destination, destinationCoder), element));
+                    }
+                  }))
+          .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+          .apply(
+              "ShardAndGroup",
+              GroupIntoBatches.<Integer, UserT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+                  .withMaxBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
+                  .withShardedKey())
+          .setCoder(
+              KvCoder.of(
+                  org.apache.beam.sdk.util.ShardedKey.Coder.of(VarIntCoder.of()),
+                  IterableCoder.of(input.getCoder())))
+          // Add dummy shard since it is required by WriteShardsIntoTempFilesFn. It will be dropped

Review comment:
       Should we update "WriteShardsIntoTempFilesFn" instead ?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
     @Override
     public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
       if (getWindowedWrites()) {
-        // Reshuffle the results to make them stable against retries.
+        // Group the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
         return input
-            .apply("Add void key", WithKeys.of((Void) null))
-            .apply("Reshuffle", Reshuffle.of())
-            .apply("Drop key", Values.create())
-            .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<>()))
+            .apply("AddVoidKey", WithKeys.of((Void) null))

Review comment:
       What's the reason for replacing Reshuffle here ?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -314,12 +349,16 @@ public void validate(PipelineOptions options) {
           "Must use windowed writes when applying %s to an unbounded PCollection",
           WriteFiles.class.getSimpleName());
       // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
-      // and similar behavior in other runners.
-      checkArgument(
-          getComputeNumShards() != null || getNumShardsProvider() != null,
-          "When applying %s to an unbounded PCollection, "
-              + "must specify number of output shards explicitly",
-          WriteFiles.class.getSimpleName());
+      // and similar behavior in other runners. Runners can choose to ignore this check and perform
+      // runner determined sharding for unbounded data by overriding the option

Review comment:
       Ditto. I'm not sure about "Runners can choose to ignore this check" language.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -672,13 +740,102 @@ private WriteShardedBundlesToTempFiles(
                   .withSideInputs(shardingSideInputs))
           .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
           .apply("GroupIntoShards", GroupByKey.create())
+          .apply(

Review comment:
       Ditto.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -465,6 +520,19 @@ private WriteUnshardedBundlesToTempFiles(
               // shard numbers are assigned together to both the spilled and non-spilled files in
               // finalize.
               .apply("GroupUnwritten", GroupByKey.create())
+              .apply(

Review comment:
       Does the GBK have to be after adding keys here ?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -354,9 +393,13 @@ public void validate(PipelineOptions options) {
 
     PCollection<FileResult<DestinationT>> tempFileResults =
         (getComputeNumShards() == null && getNumShardsProvider() == null)
-            ? input.apply(
-                "WriteUnshardedBundlesToTempFiles",
-                new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder))
+            ? input.isBounded() == IsBounded.BOUNDED
+                ? input.apply(
+                    "WriteUnshardedBundlesToTempFiles",
+                    new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder))
+                : input.apply(
+                    "WriteAutoShardedBundlesToTempFiles",

Review comment:
       Probably we should check that auto-sharding is explicitly enabled by performing this call ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-806172590


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-796486792


   Added one more test using `TestStream` in the latest commit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r597878833



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
     @Override
     public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
       if (getWindowedWrites()) {
-        // Reshuffle the results to make them stable against retries.
+        // Group the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
         return input
-            .apply("Add void key", WithKeys.of((Void) null))
-            .apply("Reshuffle", Reshuffle.of())
-            .apply("Drop key", Values.create())
-            .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<>()))
+            .apply("AddVoidKey", WithKeys.of((Void) null))

Review comment:
       Reshuffle re-windows data in addition to the GBK so I don't think replacing the Reshuffle here with a GBK preserves the behavior.
   
   Probably it's good to figure out why Reshuffle did not work for the new path.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -465,6 +520,19 @@ private WriteUnshardedBundlesToTempFiles(
               // shard numbers are assigned together to both the spilled and non-spilled files in
               // finalize.
               .apply("GroupUnwritten", GroupByKey.create())
+              .apply(

Review comment:
       Still confused :).
   
   Could you clarify why this was needed (since this is not in the auto-sharding path) ? Is this a bug fix for existing code ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-800789123


   @chamikaramj Cham, do you have any more comments?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-792946225


   R: @chamikaramj 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-806979590


   Thanks Cham!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593385604



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -301,6 +321,21 @@
     return toBuilder().setMaxNumWritersPerBundle(-1).build();
   }
 
+  /**
+   * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with
+   * runner-determined sharding for unbounded data specifically. Currently manual sharding is
+   * required for writing unbounded data with a fixed number of shards or a predefined sharding
+   * function. This option allows the runners to get around that requirement and perform automatic
+   * sharding.
+   *
+   * <p>Intended to only be used by runners. Users should use {@link

Review comment:
       How does a runner using FnAPI typically override a non-standard transform? Or it always requires a transform to be added to FnAPI for runner to do something different?
   
   This is what I am going to do to set this in Dataflow runner: https://github.com/apache/beam/pull/14164/commits/3382d706ff62518fa3c8f450faa5fafc2d534d5c.
   
   The main reason I added this was that `WriteFiles` already has an interface `withRunnerDeterminedSharding` but it is disabled for streaming. Removing the condition to allow `withRunnerDeterminedSharding` for streaming will enable the new implementation for every runner - for those who don't support dynamic sharding the default implementation might perform badly. Is there a better way to allow runners to choose whether they support this option? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593525924



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -672,13 +740,102 @@ private WriteShardedBundlesToTempFiles(
                   .withSideInputs(shardingSideInputs))
           .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
           .apply("GroupIntoShards", GroupByKey.create())
+          .apply(

Review comment:
       As explained above.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -354,9 +393,13 @@ public void validate(PipelineOptions options) {
 
     PCollection<FileResult<DestinationT>> tempFileResults =
         (getComputeNumShards() == null && getNumShardsProvider() == null)
-            ? input.apply(
-                "WriteUnshardedBundlesToTempFiles",
-                new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder))
+            ? input.isBounded() == IsBounded.BOUNDED
+                ? input.apply(
+                    "WriteUnshardedBundlesToTempFiles",
+                    new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder))
+                : input.apply(
+                    "WriteAutoShardedBundlesToTempFiles",

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593525839



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -465,6 +520,19 @@ private WriteUnshardedBundlesToTempFiles(
               // shard numbers are assigned together to both the spilled and non-spilled files in
               // finalize.
               .apply("GroupUnwritten", GroupByKey.create())
+              .apply(

Review comment:
       Yes we rely on adding sharded keys and GBK to expand the parallelism otherwise the spilling factor will have no effect.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-801315144


   Presubmit failure looks unrelated, e.g., 
   
   ```
   17:39:14 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Commit/src/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java:174: error: cannot find symbol
   17:39:14       extends PTransform<PCollection<InputT>, PCollection<KV<InputT, OutputT>>> {
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-797661428


   cc: @reuvenlax @robertwb @kennknowles in case they have additional comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593392042



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
     @Override
     public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
       if (getWindowedWrites()) {
-        // Reshuffle the results to make them stable against retries.
+        // Group the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
         return input
-            .apply("Add void key", WithKeys.of((Void) null))
-            .apply("Reshuffle", Reshuffle.of())
-            .apply("Drop key", Values.create())
-            .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<>()))
+            .apply("AddVoidKey", WithKeys.of((Void) null))

Review comment:
       Two reasons:
   - `Reshuffle` expands the iterable after GBK and "Gather bundles" effectively reverts the expansion by re-gathering the elements in a bundle. Why not just use GBK?
   - `Reshuffle` didn't appear to work properly with auto-sharding where we might emit multiple outputs per window and 
   `Reshuffle` didn't group those outputs in the same window (causing the added test to fail). My rough guess was that it might be due to that we added timestamp in the key to group on in `Reshuffle`:https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L87
   but I could be entirely wrong.
   
   One argument of restoring this change I can think of is that runners might do optimizations for `Reshuffle` and using `explicit` GBK will drop such optimization if any. To keep the original behavior I could refactor the code to only use explicit GBK in the new path. Any thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-801571830


   Sorry, will get back to this tomorrow.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #14164: [BEAM-11934] Add runner determined sharding option for unbounded data to WriteFiles (Java)

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #14164:
URL: https://github.com/apache/beam/pull/14164#issuecomment-806172337


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org