You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/10 04:33:26 UTC

[GitHub] [beam] y1chi opened a new pull request #14182: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunne…

y1chi opened a new pull request #14182:
URL: https://github.com/apache/beam/pull/14182


   …r to avoid accumulating unnecessary state
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #14182: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunne…

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #14182:
URL: https://github.com/apache/beam/pull/14182#issuecomment-796442661


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #14182: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunne…

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #14182:
URL: https://github.com/apache/beam/pull/14182#issuecomment-795796043


   R: @kennknowles @boyuanzz 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #14182: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunne…

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #14182:
URL: https://github.com/apache/beam/pull/14182#discussion_r591933447



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
##########
@@ -151,10 +151,12 @@ public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
         KV<T, Iterable<W>> windowsToMerge) throws Exception {
       currentWindows = Sets.newHashSet(windowsToMerge.getValue());
       windowFn.mergeWindows((MergeContext) mergeContext);
+      ArrayList mergedWindowsCopy = new ArrayList<>(mergedWindows);
       for (KV<W, Collection<W>> mergedWindow : mergedWindows) {
         currentWindows.removeAll(mergedWindow.getValue());
       }
-      return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) mergedWindows));
+      mergedWindows.clear();

Review comment:
       It definitely can be considered wrong, since the previous merge result could have been processed by runner and the related windows could have been cleaned up. Added in unit test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz merged pull request #14182: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunner to avoid accumulating unnecessary state

Posted by GitBox <gi...@apache.org>.
boyuanzz merged pull request #14182:
URL: https://github.com/apache/beam/pull/14182


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #14182: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunne…

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #14182:
URL: https://github.com/apache/beam/pull/14182#discussion_r591916291



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
##########
@@ -151,10 +151,12 @@ public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
         KV<T, Iterable<W>> windowsToMerge) throws Exception {
       currentWindows = Sets.newHashSet(windowsToMerge.getValue());
       windowFn.mergeWindows((MergeContext) mergeContext);

Review comment:
       We can do `mergedWindows.clear();` before we invoke `windowFn.mergeWindows` instead of making a copy, right?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
##########
@@ -151,10 +151,12 @@ public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
         KV<T, Iterable<W>> windowsToMerge) throws Exception {
       currentWindows = Sets.newHashSet(windowsToMerge.getValue());
       windowFn.mergeWindows((MergeContext) mergeContext);
+      ArrayList mergedWindowsCopy = new ArrayList<>(mergedWindows);
       for (KV<W, Collection<W>> mergedWindow : mergedWindows) {
         currentWindows.removeAll(mergedWindow.getValue());
       }
-      return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) mergedWindows));
+      mergedWindows.clear();

Review comment:
       It seems like it's a bug which can result in wrong results before this change, right? If it's not too complicated, can we add a unit test like FnApiDoFnRunnerTest to guard this cleanup logic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #14182: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunne…

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #14182:
URL: https://github.com/apache/beam/pull/14182#discussion_r592000330



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
##########
@@ -154,7 +155,13 @@ public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
       for (KV<W, Collection<W>> mergedWindow : mergedWindows) {
         currentWindows.removeAll(mergedWindow.getValue());
       }
-      return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) mergedWindows));
+      KV result =

Review comment:
       Can we state the type specially here? And please squash all commits into one then this PR is ready to go.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #14182: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunne…

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #14182:
URL: https://github.com/apache/beam/pull/14182#discussion_r591979144



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
##########
@@ -151,10 +151,12 @@ public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
         KV<T, Iterable<W>> windowsToMerge) throws Exception {
       currentWindows = Sets.newHashSet(windowsToMerge.getValue());
       windowFn.mergeWindows((MergeContext) mergeContext);

Review comment:
       Another option is we can stash the return result like:
   ```java
   result = KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) mergedWindows));
   mergedWindows.clear();
   currentWindows.clear();
   return result;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #14182: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunne…

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #14182:
URL: https://github.com/apache/beam/pull/14182#discussion_r591934742



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
##########
@@ -151,10 +151,12 @@ public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
         KV<T, Iterable<W>> windowsToMerge) throws Exception {
       currentWindows = Sets.newHashSet(windowsToMerge.getValue());
       windowFn.mergeWindows((MergeContext) mergeContext);

Review comment:
       You are right. Theoretically we don't have to store the state and wait for next merge window invocation to clean up, I don't have strong opinion on this, I guess only storing last merge results won't take too much memory space.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #14182: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunne…

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #14182:
URL: https://github.com/apache/beam/pull/14182#issuecomment-796426287


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #14182: [BEAM-11952] Clean up merged window result in MergingViaWindowFnRunne…

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #14182:
URL: https://github.com/apache/beam/pull/14182#discussion_r591989888



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
##########
@@ -151,10 +151,12 @@ public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
         KV<T, Iterable<W>> windowsToMerge) throws Exception {
       currentWindows = Sets.newHashSet(windowsToMerge.getValue());
       windowFn.mergeWindows((MergeContext) mergeContext);

Review comment:
       makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org