You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/09 16:37:44 UTC

[GitHub] [beam] lukecwik opened a new pull request, #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

lukecwik opened a new pull request, #22645:
URL: https://github.com/apache/beam/pull/22645

   This will reduce OOMs in the case where we are adding new writes faster then we are able to clean-up older ones.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
scwhittle commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r945611448


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -996,10 +1001,18 @@ public void finishBundle(FinishBundleContext c) throws Exception {
           c.output(result.getValue(), result.getKey(), result.getValue().getWindow());
         }
       } finally {
-        deferredOutput = null;
-        closeFutures = null;
+        deferredOutput.clear();
+        closeFutures.clear();
       }
     }
+
+    // Ensure that transient fields are initialized.

Review Comment:
   Any reason to prefer this to the @ Setup method?
   If not, I'd say that is preferrable since it is a standard Beam DoFn method and the name is more self-explanatory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1211311102

   Run Spotless PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik merged pull request #22645: [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik merged PR #22645:
URL: https://github.com/apache/beam/pull/22645


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1209683952

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1209746198

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22645: [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1289370604

   @kellen If you could review the PR then we could have it merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1211210546

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kellen commented on pull request #22645: [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
kellen commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1289308040

   @lukecwik We (spotify) are seeing `ConcurrentModificationException` errors due to this PR.
   
   The issue starts in [WriteFiles.java](https://github.com/apache/beam/blame/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L964) where you construct a new future, then clear the existing `closeFuture`s.
   
   I think the underlying cause is in [MoreFutures.java](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java#L172) where you close over the input `futures` inside `thenApply`. If you instead used the array result of `futuresToCompletableFutures` I suspect the issue could be avoided.
   
   Something like this?
   ```
       CompletableFuture<? extends T>[] f = futuresToCompletableFutures(futures);
       CompletionStage<Void> blockAndDiscard = CompletableFuture.allOf(f);
   
       return blockAndDiscard.thenApply(
           nothing ->
               f.stream()
                   .map(future -> future.join())
                   .collect(Collectors.toList()));
   ```
   
   Similarly for `allAsListWithExceptions`.
   
   Stack trace for the error (from a scio test):
   ```
   [info]   org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException
   [info]   at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:374)
   [info]   at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:342)
   [info]   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
   [info]   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
   [info]   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
   [info]   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
   [info]   at com.spotify.scio.ScioContext.execute(ScioContext.scala:586)
   [info]   at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:573)
   [info]   at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:561)
   [info]   at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:652)
   [info]   ...
   [info]   Cause: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException
   [info]   at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
   [info]   at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
   [info]   at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
   [info]   at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:984)
   [info]   ...
   [info]   Cause: java.util.ConcurrentModificationException:
   [info]   at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1390)
   [info]   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
   [info]   at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
   [info]   at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
   [info]   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   [info]   at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
   [info]   at org.apache.beam.sdk.util.MoreFutures.lambda$allAsList$5(MoreFutures.java:174)
   [info]   at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
   [info]   at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
   [info]   at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22645: [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1218597744

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
scwhittle commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r941711400


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -954,7 +949,17 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
         writeOrClose(writer, getDynamicDestinations().formatRecord(input));
       }
 
-      // Close all writers.
+      // Clean-up any prior writers that were being closed as part of this bundle before

Review Comment:
   If we have many windows, we'll have many calls to ProcessElement and block sequentially, only overlapping closes with single window processing instead of multiple.
   
   Could we instead block if the # of closing but not yet closed writers exceeds some amount (which could be controlled by an option)?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -996,10 +1001,18 @@ public void finishBundle(FinishBundleContext c) throws Exception {
           c.output(result.getValue(), result.getKey(), result.getValue().getWindow());
         }
       } finally {
-        deferredOutput = null;
-        closeFutures = null;
+        deferredOutput.clear();
+        closeFutures.clear();
       }
     }
+
+    // Ensure that transient fields are initialized.

Review Comment:
   I haven't seen this before, is this preferrable to just initializing these in class constructor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r941724720


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -954,7 +949,17 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
         writeOrClose(writer, getDynamicDestinations().formatRecord(input));
       }
 
-      // Close all writers.
+      // Clean-up any prior writers that were being closed as part of this bundle before

Review Comment:
   Note that the customer was limiting the close calls to 2 at a time max.
   
   I didn't prefer adding the knob because I suspect the `close` call will most of the time be done when the other IO operations are ongoing during the write so we get most of the parallelism this way as it would be rare for the `close` call to span more than the next element's write calls. Was there a benchmark that you used to validate the improvement for the original change so I could test this?
   
   I can add the additional knob for users to control but I had suggested to others that we should bound the amount of buffer memory we use globally within the process to maximize parallelism and not have to write stuff like this. I did this for the combiner table in https://github.com/apache/beam/commit/5b81d140636e3fa774610aeb8a8896d02696b707 but we should extend everywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22645: [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r948485957


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -996,10 +1001,18 @@ public void finishBundle(FinishBundleContext c) throws Exception {
           c.output(result.getValue(), result.getKey(), result.getValue().getWindow());
         }
       } finally {
-        deferredOutput = null;
-        closeFutures = null;
+        deferredOutput.clear();
+        closeFutures.clear();
       }
     }
+
+    // Ensure that transient fields are initialized.

Review Comment:
   I forgot about `@Setup`. Swapped to using it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1211218222

   Run Spotless PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22645: [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r949409598


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -996,10 +1001,18 @@ public void finishBundle(FinishBundleContext c) throws Exception {
           c.output(result.getValue(), result.getKey(), result.getValue().getWindow());
         }
       } finally {
-        deferredOutput = null;
-        closeFutures = null;
+        deferredOutput.clear();
+        closeFutures.clear();
       }
     }
+
+    // Ensure that transient fields are initialized.

Review Comment:
   And reverted back because spot bugs prefers it during deserialization so that we can ensure that the fields are set since spotbugs doesn't know about the DoFn lifecycle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r941718446


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -996,10 +1001,18 @@ public void finishBundle(FinishBundleContext c) throws Exception {
           c.output(result.getValue(), result.getKey(), result.getValue().getWindow());
         }
       } finally {
-        deferredOutput = null;
-        closeFutures = null;
+        deferredOutput.clear();
+        closeFutures.clear();
       }
     }
+
+    // Ensure that transient fields are initialized.

Review Comment:
   Class constructor isn't involved during deserialization of the object and transient fields are always initialized to their defaults (e.g. int = 0, boolean = false, object = null, ...). This allows us to initialize the fields during deserialization to be a non null object.
   
   See https://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html for more details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
scwhittle commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r942294325


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -954,7 +949,17 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
         writeOrClose(writer, getDynamicDestinations().formatRecord(input));
       }
 
-      // Close all writers.
+      // Clean-up any prior writers that were being closed as part of this bundle before

Review Comment:
   I think close triggers a write for I/O that is in the buffer. Can we parallelize the close of both the current batch and the previous batch? It seems we already have memory around for the new writers so might as well flush them before blocking on the previous batch.
   
   so something like:
   previousCloseFutures = closeFutures;
   closeFutures = new List
   
   // Start closing current writers in the background
   ...
   
   // Block on previous windows closing to limit parallelism
   MoreFutures.get(MoreFutures.allAsList(previousCloseFutures));
   
   This was based upon observation in a Kafka->GCS pipeline we were optimizing. I believe that it had 5 minute windowing and on watermark jumps during backlog processing we would be writing 50 windows at a time and doing so serially was slow.  Unfortunately I don't recall how large the generated files were.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22645: [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1289370072

   @kellen Filed https://github.com/apache/beam/issues/23809 with a fix in https://github.com/apache/beam/pull/23811 based upon your supplied patch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22645: [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r949409598


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -996,10 +1001,18 @@ public void finishBundle(FinishBundleContext c) throws Exception {
           c.output(result.getValue(), result.getKey(), result.getValue().getWindow());
         }
       } finally {
-        deferredOutput = null;
-        closeFutures = null;
+        deferredOutput.clear();
+        closeFutures.clear();
       }
     }
+
+    // Ensure that transient fields are initialized.

Review Comment:
   And reverted back because spot bugs prefers it during deserialization so that we can ensure that the fields are set regardless since spotbugs doesn't know about the DoFn lifecycle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22645: [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1219790389

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1209615423

   @scwhittle What do you think about this solution for limiting OOMs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r942850697


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -954,7 +949,17 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
         writeOrClose(writer, getDynamicDestinations().formatRecord(input));
       }
 
-      // Close all writers.
+      // Clean-up any prior writers that were being closed as part of this bundle before

Review Comment:
   Updated with your suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22645: [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r948486244


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -968,6 +970,10 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
                 new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey())));
         closeWriterInBackground(writer);
       }
+
+      // Ensure that the past closes happen before returning and after we started the closes

Review Comment:
   Done, thanks for making this clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r941724720


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -954,7 +949,17 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
         writeOrClose(writer, getDynamicDestinations().formatRecord(input));
       }
 
-      // Close all writers.
+      // Clean-up any prior writers that were being closed as part of this bundle before

Review Comment:
   Note that the customer was limiting the close calls to 2 at a time max.
   
   I didn't prefer adding the knob because I suspect the `close` call will most of the time be done when the other IO operations are ongoing doing the write so we get most of the parallelism this way as it would be rare for the close call to span more than the next element's write calls. Was there a benchmark that you used to validate the improvement for the original change so I could test this?
   
   I can add the additional knob for users to control but I had suggested to others that we should bound the amount of buffer memory we use globally within the process to maximize parallelism and not have to write stuff like this. I did this for the combiner table in https://github.com/apache/beam/commit/5b81d140636e3fa774610aeb8a8896d02696b707 but we should extend everywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #22645: [WIP][BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
scwhittle commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r945609458


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -968,6 +970,10 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
                 new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey())));
         closeWriterInBackground(writer);
       }
+
+      // Ensure that the past closes happen before returning and after we started the closes

Review Comment:
   nit: I find the happen before/after confusing since they may happen whenever we are just blocking on them completing here
   
   Maybe
   Block on completing the past closes before returning. We do so after starting the current closes in the background
   so that they can happen in parallel.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -996,10 +1001,18 @@ public void finishBundle(FinishBundleContext c) throws Exception {
           c.output(result.getValue(), result.getKey(), result.getValue().getWindow());
         }
       } finally {
-        deferredOutput = null;
-        closeFutures = null;
+        deferredOutput.clear();
+        closeFutures.clear();
       }
     }
+
+    // Ensure that transient fields are initialized.

Review Comment:
   Any reason to prefer this to the @Setup method?
   If not, I'd say that is preferrable since it is a standard Beam DoFn method and the name is more self-explanatory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22645: [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22645:
URL: https://github.com/apache/beam/pull/22645#issuecomment-1219740733

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org