You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/19 18:07:37 UTC

[GitHub] [beam] lukecwik opened a new pull request, #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

lukecwik opened a new pull request, #22345:
URL: https://github.com/apache/beam/pull/22345

   This leverages the fact that all encoding is done from a thread safe manner
   allowing us to drop the syncrhonization that ByteString.Output adds and it
   also optimizes the max chunk size based upon performance measurements and
   the ratio for how full a byte[] should be for the final copy vs concatenate
   decision.
   
   Below are the results of several scenarios in which we compare the protobuf
   vs new solution which mostly have a large perf improvement for tiny writes and
   still noticeable improvements for larger writes:
   ```
   Benchmark                                                                                       Mode  Cnt         Score        Error  Units
   ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewLargeWrites               thrpt   25   1149267.797 ±  15366.677  ops/s
   ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewMixedWritesWithReuse      thrpt   25    832816.697 ±   4236.341  ops/s
   ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewMixedWritesWithoutReuse   thrpt   25    916629.194 ±   5669.323  ops/s
   ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewSmallWrites               thrpt   25  14175167.566 ±  88540.030  ops/s
   ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewTinyWrites                thrpt   25  22471597.238 ± 186098.311  ops/s
   ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyLargeWrites              thrpt   25       610.218 ±      5.019  ops/s
   ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyMixedWritesWithReuse     thrpt   25       484.413 ±     35.194  ops/s
   ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyMixedWritesWithoutReuse  thrpt   25       559.983 ±      6.228  ops/s
   ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManySmallWrites              thrpt   25     10969.839 ±     88.199  ops/s
   ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyTinyWrites               thrpt   25     40822.925 ±    191.402  ops/s
   ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewLargeWrites                thrpt   25   1167673.532 ±   9747.507  ops/s
   ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewMixedWritesWithReuse       thrpt   25   1576528.242 ±  15883.083  ops/s
   ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewMixedWritesWithoutReuse    thrpt   25   1009766.655 ±   8700.273  ops/s
   ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewSmallWrites                thrpt   25  33293140.679 ± 233693.771  ops/s
   ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewTinyWrites                 thrpt   25  86841328.763 ± 729741.769  ops/s
   ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyLargeWrites               thrpt   25      1058.150 ±     15.192  ops/s
   ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyMixedWritesWithReuse      thrpt   25       937.249 ±      9.264  ops/s
   ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyMixedWritesWithoutReuse   thrpt   25       959.671 ±     13.989  ops/s
   ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManySmallWrites               thrpt   25     12601.065 ±     92.375  ops/s
   ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyTinyWrites                thrpt   25     65277.229 ±   3795.676  ops/s
   ```
   
   The copy vs concatenate numbers come from these results which show that 256k seems to
   be a pretty good chunk size since the larger chunks seem to cost more per byte to allocate.
   They also show at what threshold should we currently copy the bytes vs concatenate a partially
   full buffer and allocate a new one:
   ```
   Benchmark                                                                                              newSize       copyVsNew   Mode  Cnt         Score        Error  Units
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        512/1024  thrpt   25  19744209.563 ± 148287.185  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        640/1024  thrpt   25  15738981.338 ± 103684.000  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        768/1024  thrpt   25  12778194.652 ± 202212.679  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        896/1024  thrpt   25  11053602.109 ± 103120.446  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       4096/8192  thrpt   25   2961435.128 ±  25895.802  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       5120/8192  thrpt   25   2498594.030 ±  26051.674  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       6144/8192  thrpt   25   2173161.031 ±  20014.569  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       7168/8192  thrpt   25   1917545.913 ±  21470.719  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     20480/65536  thrpt   25    537872.049 ±   5525.024  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     24576/65536  thrpt   25    371312.042 ±   4450.715  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     28672/65536  thrpt   25    306027.442 ±   2830.503  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     32768/65536  thrpt   25    263933.096 ±   1833.603  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   131072/262144  thrpt   25     80224.558 ±   1192.994  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   163840/262144  thrpt   25     65311.283 ±    775.920  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   196608/262144  thrpt   25     54510.877 ±    797.775  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   229376/262144  thrpt   25     46808.185 ±    515.039  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  524288/1048576  thrpt   25     17729.937 ±    301.199  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  655360/1048576  thrpt   25     12996.953 ±    228.552  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  786432/1048576  thrpt   25     11383.122 ±    384.086  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  917504/1048576  thrpt   25      9915.318 ±    285.995  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                    1024             N/A  thrpt   25  10023631.563 ±  61317.055  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                    8192             N/A  thrpt   25   2109120.041 ±  17482.682  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                   65536             N/A  thrpt   25    318492.630 ±   3006.827  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                  262144             N/A  thrpt   25     79228.892 ±    725.230  ops/s
   ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                 1048576             N/A  thrpt   25     13089.221 ±     73.535  ops/s
   ```
   
   The difference is minor in the `ProcessBundleBenchmark` as there is not
   enough data being passed around for it to make a major difference:
   ```
   Before
   Benchmark                                        Mode  Cnt      Score     Error  Units
   ProcessBundleBenchmark.testLargeBundle          thrpt   25   1156.159 ±   9.001  ops/s
   ProcessBundleBenchmark.testTinyBundle           thrpt   25  29641.444 ± 125.041  ops/s
   
   After
   Benchmark                                        Mode  Cnt      Score    Error  Units
   ProcessBundleBenchmark.testLargeBundle          thrpt   25   1168.977 ± 25.848  ops/s
   ProcessBundleBenchmark.testTinyBundle           thrpt   25  29664.783 ± 99.791  ops/s
   ```
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22345:
URL: https://github.com/apache/beam/pull/22345#issuecomment-1189562078

   Run GoPortable PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22345:
URL: https://github.com/apache/beam/pull/22345#discussion_r924906442


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+  // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+  // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+  // of appending.
+  private static final int DEFAULT_CAPACITY = 128;
+
+  // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+  // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+  // of 2.
+  //
+  // This number should be tuned periodically as hardware changes.
+  private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+  // ByteString to be concatenated to create the result
+  private ByteString result;
+
+  // Current buffer to which we are writing
+  private byte[] buffer;
+
+  // Location in buffer[] to which we write the next byte.
+  private int bufferPos;
+
+  /** Creates a new output stream with a default capacity. */
+  public ByteStringOutputStream() {
+    this(DEFAULT_CAPACITY);
+  }
+
+  /**
+   * Creates a new output stream with the specified initial capacity.
+   *
+   * @param initialCapacity the initial capacity of the output stream.
+   */
+  public ByteStringOutputStream(int initialCapacity) {
+    if (initialCapacity < 0) {
+      throw new IllegalArgumentException("Initial capacity < 0");
+    }
+    this.buffer = new byte[initialCapacity];
+    this.result = ByteString.EMPTY;
+  }
+
+  @Override
+  public void write(int b) {
+    if (bufferPos == buffer.length) {
+      // We want to increase our total capacity by 50% but not larger than the max chunk size.
+      result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+      buffer = new byte[Math.min(Math.max(1, result.size()), MAX_CHUNK_SIZE)];
+      bufferPos = 0;
+    }
+    buffer[bufferPos++] = (byte) b;
+  }
+
+  @Override
+  public void write(byte[] b, int offset, int length) {
+    int remainingSpaceInBuffer = buffer.length - bufferPos;
+    while (length > remainingSpaceInBuffer) {
+      // Use up the current buffer
+      System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer);
+      offset += remainingSpaceInBuffer;
+      length -= remainingSpaceInBuffer;
+
+      result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+      // We want to increase our total capacity but not larger than the max chunk size.
+      remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE);
+      buffer = new byte[remainingSpaceInBuffer];
+      bufferPos = 0;
+    }
+
+    System.arraycopy(b, offset, buffer, bufferPos, length);
+    bufferPos += length;
+  }
+
+  /** Creates a byte string with the size and contents of this output stream. */
+  public ByteString toByteString() {
+    // The only benefit we get by copying here is that there will be a reduction in the amount

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22345:
URL: https://github.com/apache/beam/pull/22345#issuecomment-1189398844

   R: @steveniemitz 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22345:
URL: https://github.com/apache/beam/pull/22345#discussion_r924909774


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+  // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+  // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+  // of appending.
+  private static final int DEFAULT_CAPACITY = 128;
+
+  // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+  // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+  // of 2.
+  //
+  // This number should be tuned periodically as hardware changes.
+  private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+  // ByteString to be concatenated to create the result
+  private ByteString result;
+
+  // Current buffer to which we are writing
+  private byte[] buffer;
+
+  // Location in buffer[] to which we write the next byte.
+  private int bufferPos;
+
+  /** Creates a new output stream with a default capacity. */
+  public ByteStringOutputStream() {
+    this(DEFAULT_CAPACITY);
+  }
+
+  /**
+   * Creates a new output stream with the specified initial capacity.
+   *
+   * @param initialCapacity the initial capacity of the output stream.
+   */
+  public ByteStringOutputStream(int initialCapacity) {
+    if (initialCapacity < 0) {
+      throw new IllegalArgumentException("Initial capacity < 0");
+    }
+    this.buffer = new byte[initialCapacity];
+    this.result = ByteString.EMPTY;
+  }
+
+  @Override
+  public void write(int b) {
+    if (bufferPos == buffer.length) {
+      // We want to increase our total capacity by 50% but not larger than the max chunk size.

Review Comment:
   I kept the existing implementation to save on returning the size from flushBuffer or looking up the new buffer length.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22345:
URL: https://github.com/apache/beam/pull/22345#issuecomment-1189577769

   # [Codecov](https://codecov.io/gh/apache/beam/pull/22345?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22345](https://codecov.io/gh/apache/beam/pull/22345?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d6d27b8) into [master](https://codecov.io/gh/apache/beam/commit/abcc9e05f820b4c21ebef69890d86e9c1ba2c9e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (abcc9e0) will **decrease** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #22345      +/-   ##
   ==========================================
   - Coverage   74.17%   74.16%   -0.01%     
   ==========================================
     Files         706      706              
     Lines       93190    93190              
   ==========================================
   - Hits        69121    69118       -3     
   - Misses      22801    22804       +3     
     Partials     1268     1268              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.54% <ø> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/22345?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/22345/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.09% <0.00%> (-4.77%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/22345/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.06% <0.00%> (-1.33%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/22345/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.54% <0.00%> (-0.13%)` | :arrow_down: |
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/22345/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `92.02% <0.00%> (+0.30%)` | :arrow_up: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/22345/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `97.56% <0.00%> (+2.43%)` | :arrow_up: |
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22345:
URL: https://github.com/apache/beam/pull/22345#issuecomment-1189399810

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik merged pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

Posted by GitBox <gi...@apache.org>.
lukecwik merged PR #22345:
URL: https://github.com/apache/beam/pull/22345


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22345:
URL: https://github.com/apache/beam/pull/22345#discussion_r924974526


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+  // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+  // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+  // of appending.
+  private static final int DEFAULT_CAPACITY = 128;
+
+  // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+  // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+  // of 2.
+  //
+  // This number should be tuned periodically as hardware changes.
+  private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+  // ByteString to be concatenated to create the result
+  private ByteString result;
+
+  // Current buffer to which we are writing
+  private byte[] buffer;
+
+  // Location in buffer[] to which we write the next byte.
+  private int bufferPos;
+
+  /** Creates a new output stream with a default capacity. */
+  public ByteStringOutputStream() {
+    this(DEFAULT_CAPACITY);
+  }
+
+  /**
+   * Creates a new output stream with the specified initial capacity.
+   *
+   * @param initialCapacity the initial capacity of the output stream.
+   */
+  public ByteStringOutputStream(int initialCapacity) {
+    if (initialCapacity < 0) {
+      throw new IllegalArgumentException("Initial capacity < 0");
+    }
+    this.buffer = new byte[initialCapacity];
+    this.result = ByteString.EMPTY;
+  }
+
+  @Override
+  public void write(int b) {
+    if (bufferPos == buffer.length) {
+      // We want to increase our total capacity by 50% but not larger than the max chunk size.

Review Comment:
   cool, yeah that makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22345:
URL: https://github.com/apache/beam/pull/22345#discussion_r924840472


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+  // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+  // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+  // of appending.
+  private static final int DEFAULT_CAPACITY = 128;
+
+  // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+  // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+  // of 2.
+  //
+  // This number should be tuned periodically as hardware changes.
+  private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+  // ByteString to be concatenated to create the result
+  private ByteString result;
+
+  // Current buffer to which we are writing
+  private byte[] buffer;
+
+  // Location in buffer[] to which we write the next byte.
+  private int bufferPos;
+
+  /** Creates a new output stream with a default capacity. */
+  public ByteStringOutputStream() {
+    this(DEFAULT_CAPACITY);
+  }
+
+  /**
+   * Creates a new output stream with the specified initial capacity.
+   *
+   * @param initialCapacity the initial capacity of the output stream.
+   */
+  public ByteStringOutputStream(int initialCapacity) {
+    if (initialCapacity < 0) {
+      throw new IllegalArgumentException("Initial capacity < 0");
+    }
+    this.buffer = new byte[initialCapacity];
+    this.result = ByteString.EMPTY;
+  }
+
+  @Override
+  public void write(int b) {
+    if (bufferPos == buffer.length) {
+      // We want to increase our total capacity by 50% but not larger than the max chunk size.
+      result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+      buffer = new byte[Math.min(Math.max(1, result.size()), MAX_CHUNK_SIZE)];
+      bufferPos = 0;
+    }
+    buffer[bufferPos++] = (byte) b;
+  }
+
+  @Override
+  public void write(byte[] b, int offset, int length) {
+    int remainingSpaceInBuffer = buffer.length - bufferPos;
+    while (length > remainingSpaceInBuffer) {
+      // Use up the current buffer
+      System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer);
+      offset += remainingSpaceInBuffer;
+      length -= remainingSpaceInBuffer;
+
+      result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+      // We want to increase our total capacity but not larger than the max chunk size.
+      remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE);
+      buffer = new byte[remainingSpaceInBuffer];
+      bufferPos = 0;
+    }
+
+    System.arraycopy(b, offset, buffer, bufferPos, length);
+    bufferPos += length;
+  }
+
+  /** Creates a byte string with the size and contents of this output stream. */
+  public ByteString toByteString() {
+    // The only benefit we get by copying here is that there will be a reduction in the amount

Review Comment:
   this comment is a little confusing, at first read it makes it sound like this is doing a copy, but it seems to be reasoning for NOT doing a copy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22345:
URL: https://github.com/apache/beam/pull/22345#discussion_r924838200


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+  // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+  // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+  // of appending.
+  private static final int DEFAULT_CAPACITY = 128;
+
+  // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+  // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+  // of 2.
+  //
+  // This number should be tuned periodically as hardware changes.
+  private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+  // ByteString to be concatenated to create the result
+  private ByteString result;
+
+  // Current buffer to which we are writing
+  private byte[] buffer;
+
+  // Location in buffer[] to which we write the next byte.
+  private int bufferPos;
+
+  /** Creates a new output stream with a default capacity. */
+  public ByteStringOutputStream() {
+    this(DEFAULT_CAPACITY);
+  }
+
+  /**
+   * Creates a new output stream with the specified initial capacity.
+   *
+   * @param initialCapacity the initial capacity of the output stream.
+   */
+  public ByteStringOutputStream(int initialCapacity) {
+    if (initialCapacity < 0) {
+      throw new IllegalArgumentException("Initial capacity < 0");
+    }
+    this.buffer = new byte[initialCapacity];
+    this.result = ByteString.EMPTY;
+  }
+
+  @Override
+  public void write(int b) {
+    if (bufferPos == buffer.length) {
+      // We want to increase our total capacity by 50% but not larger than the max chunk size.

Review Comment:
   I wonder if it'd be useful to extract this out to something like `flushBuffer(int newSizeHint)` and use it here and on line 95?
   
   ```
   void flushBuffer(int newSizeHint) {
         result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
         buffer = new byte[Math.min(Math.max(newSizeHint, result.size()), MAX_CHUNK_SIZE)];
         bufferPos = 0;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org