You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/13 13:42:56 UTC

[GitHub] [beam] piotr-szuberski opened a new pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

piotr-szuberski opened a new pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122
 
 
   I added the code requested in the Jira issue.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r395415747
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
       }
     }
   }
 
+  public static class ToListCombineFn<ResultT>
 
 Review comment:
   nit: This should have been private class since its an implementation detail and didn't need to expose this to users of the WriteFiles transform.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r395414286
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
 
 Review comment:
   I believe this could be a regression based upon the above comment a few lines higher then where the code was changed:
   ```
   // Pass results via a side input rather than reshuffle, because we need to get an empty
   // iterable to finalize if there are no results.
   ```
   The combine globally in the global window will produce a default output in the global window but will throw an exception if used against non-global window PCollections.
   
   Performance wise this is likely an improvement overall since runners typically do a better job with GBK then with side inputs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] piotr-szuberski commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-598768402
 
 
   Ok, thank you - I was confused that the job has not started yet. Good to know!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chamikaramj commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r395391171
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
 
 Review comment:
   Seems like this was applied to all file-based IO not just TFRecordIO as mentioned in the PR description. Have we done enough experiments to make sure this won't have unintended adverse performance consequences in batch/streaming ? For example, what if the list does not fit in memory ?
   
   Apologies if this was already discussed in the dev list.
   
   cc: @robertwb @lukecwik @iemejia 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] piotr-szuberski commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r395475008
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
       }
     }
   }
 
+  public static class ToListCombineFn<ResultT>
 
 Review comment:
   You're right, I just copy pasted the code from Jira. It was my first PR in Beam and I wanted to get the workflow without digging the code proposed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-598728311
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-599411463
 
 
   Run Java TFRecordIO Performance Test

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chamikaramj commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r406877630
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
 
 Review comment:
   Thanks for reverting.
   
   My suggestion was to fork the transform expansion to use GlobalCombine when specifically requested through a transform parameter (for example see how we fork TextIO.Read expansion based on withHintMatchesManyFiles()).
   
   https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L371
   https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L401
   
   I agree that the more flags we add the more confusing it becomes for users. So we should make sure that there are significant advantages in adding this option through performance benchmarking. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-599410945
 
 
   Run TFRecord IOIT

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chamikaramj commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r406478334
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
 
 Review comment:
   What's the verdict here ? I suggest we revert this and introduce as an optional change since this is a potential regression for Dataflow runner.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mwalenia merged pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
mwalenia merged pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mwalenia commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
mwalenia commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r406672878
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
 
 Review comment:
   I'm not sure what's the verdict right now. I think we should revert this and discuss further steps with repo free of regressions. Tagging @lukecwik for input about leaving the optimization for global windows. @chamikaramj how do you think the optional change could be applied? A runtime flag for the runner?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] piotr-szuberski commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r406668632
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
 
 Review comment:
   Imo just a revert would be better - giving an option to something that is not validated and can have unexpected adverse effects would result in user's confusion

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] piotr-szuberski commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-598760549
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chamikaramj commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r406479773
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
 
 Review comment:
   Created https://issues.apache.org/jira/browse/BEAM-9734 to make sure that this does not get into 2.21.0

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aaltay commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-599709980
 
 
   If this is a common pattern (another one from: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L412), should this be abstracted out to happen in a single place?
   
   /cc @chamikaramj @kennknowles 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r395414286
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
 
 Review comment:
   I believe this could be a regression based upon the above comment a few lines higher then where the code was changed:
   ```
   // Pass results via a side input rather than reshuffle, because we need to get an empty
   // iterable to finalize if there are no results.
   ```
   The combine globally in the global window will produce a default output in the global window but will throw an exception if used against non-global window PCollections.
   
   Performance wise this is likely an improvement overall since runners typically do a better job with GBK then with side inputs. So we could keep this change as an optimization in the global window case and keep the existing behavior for non global windows.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kamilwu commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
kamilwu commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-598767326
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-599390000
 
 
   run java precommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mwalenia removed a comment on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
mwalenia removed a comment on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-599389930
 
 
   Run Java Precommig

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-599421354
 
 
   LGTM, thanks for the contribution!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] kamilwu commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
kamilwu commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-598766998
 
 
   @piotr-szuberski As a non-commiter, you are not allowed to trigger Jenkins tests on your own, unfortunately. Let me do this for you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r406875232
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
 
 Review comment:
   You could always apply the trigger if it is guaranteed to produce output by checking what the trigger definition is. I don't think you need a flag.
   
   I'm not sure how much of a holistic picture of the whole WriteFiles transform was done to see if there is a better way to structure the transforms but that will likely be much harder because WriteFiles isn't a trivial transform.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mwalenia removed a comment on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
mwalenia removed a comment on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-599410945
 
 
   Run TFRecord IOIT

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r406875232
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
 
 Review comment:
   You could always apply the trigger if it is guaranteed to produce output by checking what the trigger definition is. I don't think you need a flag.
   
   I'm not sure how much of a holistic picture of the whole WriteFiles transform was done to see if there is a better way to structure the transforms.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chamikaramj commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#discussion_r395658938
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
 ##########
 @@ -410,13 +412,44 @@ private GatherResults(Coder<ResultT> resultCoder) {
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
-        return input
-            .getPipeline()
-            .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
+        return input.apply("ToList", Combine.globally(new ToListCombineFn<>()));
 
 Review comment:
   Thanks. Agree with Luke. Combine globally has shuffle (GBK) inside hence breaks above statements. This could be a regression at least for Dataflow when there are no outputs. We should try following cases.
   
   (1) Dataflow (and possibly other runners may have similar regressions ?) with an empty output. 
   (2) Writing using a non-global window while WriteFiles.withWindowedWrites() not set.
   
   Luke, does that make sense ? Anything else to try out to make sure there's no regression here ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO

Posted by GitBox <gi...@apache.org>.
mwalenia commented on issue #11122: [BEAM-9346] Improve the efficiency of TFRecordIO
URL: https://github.com/apache/beam/pull/11122#issuecomment-599389930
 
 
   Run Java Precommig

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services