You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/18 17:30:21 UTC

[GitHub] [beam] lukecwik opened a new pull request, #22780: [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow streaming hot paths.

lukecwik opened a new pull request, #22780:
URL: https://github.com/apache/beam/pull/22780

   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik merged pull request #22780: [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow streaming hot paths.

Posted by GitBox <gi...@apache.org>.
lukecwik merged PR #22780:
URL: https://github.com/apache/beam/pull/22780


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22780: [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow streaming hot paths.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22780:
URL: https://github.com/apache/beam/pull/22780#issuecomment-1219751925

   R: @scwhittle 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22780: [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow streaming hot paths.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22780:
URL: https://github.com/apache/beam/pull/22780#issuecomment-1219941617

   > Nice! Just curious was this noticable in a benchmark/profile? It certainly seems like it will help especially with the many encodes per windmill output
   
   The swap to ByteStringOutputStream caused a test pipeline to flake with OOMs because of a lot partially filled buffers so this removed that issue.
   
   The original swap to use ByteStringOutputStream had noticeable improvements for encoding small things with anywhere from 3 to 10x improvement (https://github.com/apache/beam/pull/22345) but in the big picture of things it lead to about a 0.5% CPU usage reduction in bigshuffle.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22780: [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow streaming hot paths.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22780:
URL: https://github.com/apache/beam/pull/22780#discussion_r949578284


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java:
##########
@@ -161,13 +163,11 @@ public long add(WindowedValue<T> data) throws IOException {
         if (formatted.getAttributeMap() != null) {
           pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
         }
-        ByteStringOutputStream output = new ByteStringOutputStream();
-        pubsubMessageBuilder.build().writeTo(output);
-        byteString = output.toByteString();
+        pubsubMessageBuilder.build().writeTo(stream);

Review Comment:
   Done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java:
##########
@@ -129,16 +129,17 @@ public SinkWriter<WindowedValue<T>> writer() {
   class WindmillStreamWriter implements SinkWriter<WindowedValue<T>> {
     private Map<ByteString, Windmill.KeyedMessageBundle.Builder> productionMap;
     private final String destinationName;
+    private final ByteStringOutputStream stream;
 
     private WindmillStreamWriter(String destinationName) {
       this.destinationName = destinationName;
       productionMap = new HashMap<>();
+      stream = new ByteStringOutputStream();
     }
 
     private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object) throws IOException {
-      ByteStringOutputStream stream = new ByteStringOutputStream();
       coder.encode(object, stream, Coder.Context.OUTER);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22780: [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow streaming hot paths.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22780:
URL: https://github.com/apache/beam/pull/22780#discussion_r949578622


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java:
##########
@@ -141,6 +141,7 @@ public SinkWriter<WindowedValue<T>> writer() {
   /** The SinkWriter for a PubsubSink. */
   class PubsubWriter implements SinkWriter<WindowedValue<T>> {
     private Windmill.PubSubMessageBundle.Builder outputBuilder;
+    private ByteStringOutputStream stream;

Review Comment:
   Done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java:
##########
@@ -129,16 +129,17 @@ public SinkWriter<WindowedValue<T>> writer() {
   class WindmillStreamWriter implements SinkWriter<WindowedValue<T>> {
     private Map<ByteString, Windmill.KeyedMessageBundle.Builder> productionMap;
     private final String destinationName;
+    private final ByteStringOutputStream stream;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #22780: [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow streaming hot paths.

Posted by GitBox <gi...@apache.org>.
scwhittle commented on code in PR #22780:
URL: https://github.com/apache/beam/pull/22780#discussion_r949503843


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java:
##########
@@ -129,16 +129,17 @@ public SinkWriter<WindowedValue<T>> writer() {
   class WindmillStreamWriter implements SinkWriter<WindowedValue<T>> {
     private Map<ByteString, Windmill.KeyedMessageBundle.Builder> productionMap;
     private final String destinationName;
+    private final ByteStringOutputStream stream;
 
     private WindmillStreamWriter(String destinationName) {
       this.destinationName = destinationName;
       productionMap = new HashMap<>();
+      stream = new ByteStringOutputStream();
     }
 
     private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object) throws IOException {
-      ByteStringOutputStream stream = new ByteStringOutputStream();
       coder.encode(object, stream, Coder.Context.OUTER);

Review Comment:
   can we assert that the stream is empty?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java:
##########
@@ -129,16 +129,17 @@ public SinkWriter<WindowedValue<T>> writer() {
   class WindmillStreamWriter implements SinkWriter<WindowedValue<T>> {
     private Map<ByteString, Windmill.KeyedMessageBundle.Builder> productionMap;
     private final String destinationName;
+    private final ByteStringOutputStream stream;

Review Comment:
   // Kept across encodes for buffer reuse.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java:
##########
@@ -141,6 +141,7 @@ public SinkWriter<WindowedValue<T>> writer() {
   /** The SinkWriter for a PubsubSink. */
   class PubsubWriter implements SinkWriter<WindowedValue<T>> {
     private Windmill.PubSubMessageBundle.Builder outputBuilder;
+    private ByteStringOutputStream stream;

Review Comment:
   // Kept across adds for buffer reuse.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java:
##########
@@ -161,13 +163,11 @@ public long add(WindowedValue<T> data) throws IOException {
         if (formatted.getAttributeMap() != null) {
           pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
         }
-        ByteStringOutputStream output = new ByteStringOutputStream();
-        pubsubMessageBuilder.build().writeTo(output);
-        byteString = output.toByteString();
+        pubsubMessageBuilder.build().writeTo(stream);

Review Comment:
   can we assert the stream is empty at the beginning of this function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22780: [BEAM-13015, #21250] Reuse buffers when possible when writing on Dataflow streaming hot paths.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22780:
URL: https://github.com/apache/beam/pull/22780#issuecomment-1219756042

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org