You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/19 18:07:37 UTC
[GitHub] [beam] lukecwik opened a new pull request, #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString
lukecwik opened a new pull request, #22345:
URL: https://github.com/apache/beam/pull/22345
This leverages the fact that all encoding is done from a thread safe manner
allowing us to drop the syncrhonization that ByteString.Output adds and it
also optimizes the max chunk size based upon performance measurements and
the ratio for how full a byte[] should be for the final copy vs concatenate
decision.
Below are the results of several scenarios in which we compare the protobuf
vs new solution which mostly have a large perf improvement for tiny writes and
still noticeable improvements for larger writes:
```
Benchmark Mode Cnt Score Error Units
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewLargeWrites thrpt 25 1149267.797 ± 15366.677 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewMixedWritesWithReuse thrpt 25 832816.697 ± 4236.341 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewMixedWritesWithoutReuse thrpt 25 916629.194 ± 5669.323 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewSmallWrites thrpt 25 14175167.566 ± 88540.030 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewTinyWrites thrpt 25 22471597.238 ± 186098.311 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyLargeWrites thrpt 25 610.218 ± 5.019 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyMixedWritesWithReuse thrpt 25 484.413 ± 35.194 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyMixedWritesWithoutReuse thrpt 25 559.983 ± 6.228 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManySmallWrites thrpt 25 10969.839 ± 88.199 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyTinyWrites thrpt 25 40822.925 ± 191.402 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewLargeWrites thrpt 25 1167673.532 ± 9747.507 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewMixedWritesWithReuse thrpt 25 1576528.242 ± 15883.083 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewMixedWritesWithoutReuse thrpt 25 1009766.655 ± 8700.273 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewSmallWrites thrpt 25 33293140.679 ± 233693.771 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewTinyWrites thrpt 25 86841328.763 ± 729741.769 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyLargeWrites thrpt 25 1058.150 ± 15.192 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyMixedWritesWithReuse thrpt 25 937.249 ± 9.264 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyMixedWritesWithoutReuse thrpt 25 959.671 ± 13.989 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManySmallWrites thrpt 25 12601.065 ± 92.375 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyTinyWrites thrpt 25 65277.229 ± 3795.676 ops/s
```
The copy vs concatenate numbers come from these results which show that 256k seems to
be a pretty good chunk size since the larger chunks seem to cost more per byte to allocate.
They also show at what threshold should we currently copy the bytes vs concatenate a partially
full buffer and allocate a new one:
```
Benchmark newSize copyVsNew Mode Cnt Score Error Units
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 512/1024 thrpt 25 19744209.563 ± 148287.185 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 640/1024 thrpt 25 15738981.338 ± 103684.000 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 768/1024 thrpt 25 12778194.652 ± 202212.679 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 896/1024 thrpt 25 11053602.109 ± 103120.446 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 4096/8192 thrpt 25 2961435.128 ± 25895.802 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 5120/8192 thrpt 25 2498594.030 ± 26051.674 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 6144/8192 thrpt 25 2173161.031 ± 20014.569 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 7168/8192 thrpt 25 1917545.913 ± 21470.719 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 20480/65536 thrpt 25 537872.049 ± 5525.024 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 24576/65536 thrpt 25 371312.042 ± 4450.715 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 28672/65536 thrpt 25 306027.442 ± 2830.503 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 32768/65536 thrpt 25 263933.096 ± 1833.603 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 131072/262144 thrpt 25 80224.558 ± 1192.994 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 163840/262144 thrpt 25 65311.283 ± 775.920 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 196608/262144 thrpt 25 54510.877 ± 797.775 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 229376/262144 thrpt 25 46808.185 ± 515.039 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 524288/1048576 thrpt 25 17729.937 ± 301.199 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 655360/1048576 thrpt 25 12996.953 ± 228.552 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 786432/1048576 thrpt 25 11383.122 ± 384.086 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 917504/1048576 thrpt 25 9915.318 ± 285.995 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray 1024 N/A thrpt 25 10023631.563 ± 61317.055 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray 8192 N/A thrpt 25 2109120.041 ± 17482.682 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray 65536 N/A thrpt 25 318492.630 ± 3006.827 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray 262144 N/A thrpt 25 79228.892 ± 725.230 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray 1048576 N/A thrpt 25 13089.221 ± 73.535 ops/s
```
The difference is minor in the `ProcessBundleBenchmark` as there is not
enough data being passed around for it to make a major difference:
```
Before
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 25 1156.159 ± 9.001 ops/s
ProcessBundleBenchmark.testTinyBundle thrpt 25 29641.444 ± 125.041 ops/s
After
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 25 1168.977 ± 25.848 ops/s
ProcessBundleBenchmark.testTinyBundle thrpt 25 29664.783 ± 99.791 ops/s
```
**Please** add a meaningful description for your change here
------------------------
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
- [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString
Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22345:
URL: https://github.com/apache/beam/pull/22345#issuecomment-1189562078
Run GoPortable PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on a diff in pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString
Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22345:
URL: https://github.com/apache/beam/pull/22345#discussion_r924906442
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+ // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+ // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+ // of appending.
+ private static final int DEFAULT_CAPACITY = 128;
+
+ // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+ // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+ // of 2.
+ //
+ // This number should be tuned periodically as hardware changes.
+ private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+ // ByteString to be concatenated to create the result
+ private ByteString result;
+
+ // Current buffer to which we are writing
+ private byte[] buffer;
+
+ // Location in buffer[] to which we write the next byte.
+ private int bufferPos;
+
+ /** Creates a new output stream with a default capacity. */
+ public ByteStringOutputStream() {
+ this(DEFAULT_CAPACITY);
+ }
+
+ /**
+ * Creates a new output stream with the specified initial capacity.
+ *
+ * @param initialCapacity the initial capacity of the output stream.
+ */
+ public ByteStringOutputStream(int initialCapacity) {
+ if (initialCapacity < 0) {
+ throw new IllegalArgumentException("Initial capacity < 0");
+ }
+ this.buffer = new byte[initialCapacity];
+ this.result = ByteString.EMPTY;
+ }
+
+ @Override
+ public void write(int b) {
+ if (bufferPos == buffer.length) {
+ // We want to increase our total capacity by 50% but not larger than the max chunk size.
+ result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+ buffer = new byte[Math.min(Math.max(1, result.size()), MAX_CHUNK_SIZE)];
+ bufferPos = 0;
+ }
+ buffer[bufferPos++] = (byte) b;
+ }
+
+ @Override
+ public void write(byte[] b, int offset, int length) {
+ int remainingSpaceInBuffer = buffer.length - bufferPos;
+ while (length > remainingSpaceInBuffer) {
+ // Use up the current buffer
+ System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer);
+ offset += remainingSpaceInBuffer;
+ length -= remainingSpaceInBuffer;
+
+ result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+ // We want to increase our total capacity but not larger than the max chunk size.
+ remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE);
+ buffer = new byte[remainingSpaceInBuffer];
+ bufferPos = 0;
+ }
+
+ System.arraycopy(b, offset, buffer, bufferPos, length);
+ bufferPos += length;
+ }
+
+ /** Creates a byte string with the size and contents of this output stream. */
+ public ByteString toByteString() {
+ // The only benefit we get by copying here is that there will be a reduction in the amount
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString
Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22345:
URL: https://github.com/apache/beam/pull/22345#issuecomment-1189398844
R: @steveniemitz
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on a diff in pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString
Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22345:
URL: https://github.com/apache/beam/pull/22345#discussion_r924909774
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+ // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+ // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+ // of appending.
+ private static final int DEFAULT_CAPACITY = 128;
+
+ // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+ // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+ // of 2.
+ //
+ // This number should be tuned periodically as hardware changes.
+ private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+ // ByteString to be concatenated to create the result
+ private ByteString result;
+
+ // Current buffer to which we are writing
+ private byte[] buffer;
+
+ // Location in buffer[] to which we write the next byte.
+ private int bufferPos;
+
+ /** Creates a new output stream with a default capacity. */
+ public ByteStringOutputStream() {
+ this(DEFAULT_CAPACITY);
+ }
+
+ /**
+ * Creates a new output stream with the specified initial capacity.
+ *
+ * @param initialCapacity the initial capacity of the output stream.
+ */
+ public ByteStringOutputStream(int initialCapacity) {
+ if (initialCapacity < 0) {
+ throw new IllegalArgumentException("Initial capacity < 0");
+ }
+ this.buffer = new byte[initialCapacity];
+ this.result = ByteString.EMPTY;
+ }
+
+ @Override
+ public void write(int b) {
+ if (bufferPos == buffer.length) {
+ // We want to increase our total capacity by 50% but not larger than the max chunk size.
Review Comment:
I kept the existing implementation to save on returning the size from flushBuffer or looking up the new buffer length.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] codecov[bot] commented on pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString
Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22345:
URL: https://github.com/apache/beam/pull/22345#issuecomment-1189577769
# [Codecov](https://codecov.io/gh/apache/beam/pull/22345?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#22345](https://codecov.io/gh/apache/beam/pull/22345?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d6d27b8) into [master](https://codecov.io/gh/apache/beam/commit/abcc9e05f820b4c21ebef69890d86e9c1ba2c9e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (abcc9e0) will **decrease** coverage by `0.00%`.
> The diff coverage is `n/a`.
```diff
@@ Coverage Diff @@
## master #22345 +/- ##
==========================================
- Coverage 74.17% 74.16% -0.01%
==========================================
Files 706 706
Lines 93190 93190
==========================================
- Hits 69121 69118 -3
- Misses 22801 22804 +3
Partials 1268 1268
```
| Flag | Coverage Δ | |
|---|---|---|
| python | `83.54% <ø> (-0.01%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/beam/pull/22345?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/22345/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.09% <0.00%> (-4.77%)` | :arrow_down: |
| [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/22345/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.06% <0.00%> (-1.33%)` | :arrow_down: |
| [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/22345/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.54% <0.00%> (-0.13%)` | :arrow_down: |
| [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/22345/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `92.02% <0.00%> (+0.30%)` | :arrow_up: |
| [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/22345/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `97.56% <0.00%> (+2.43%)` | :arrow_up: |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] github-actions[bot] commented on pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22345:
URL: https://github.com/apache/beam/pull/22345#issuecomment-1189399810
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik merged pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString
Posted by GitBox <gi...@apache.org>.
lukecwik merged PR #22345:
URL: https://github.com/apache/beam/pull/22345
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] steveniemitz commented on a diff in pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString
Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22345:
URL: https://github.com/apache/beam/pull/22345#discussion_r924974526
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+ // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+ // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+ // of appending.
+ private static final int DEFAULT_CAPACITY = 128;
+
+ // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+ // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+ // of 2.
+ //
+ // This number should be tuned periodically as hardware changes.
+ private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+ // ByteString to be concatenated to create the result
+ private ByteString result;
+
+ // Current buffer to which we are writing
+ private byte[] buffer;
+
+ // Location in buffer[] to which we write the next byte.
+ private int bufferPos;
+
+ /** Creates a new output stream with a default capacity. */
+ public ByteStringOutputStream() {
+ this(DEFAULT_CAPACITY);
+ }
+
+ /**
+ * Creates a new output stream with the specified initial capacity.
+ *
+ * @param initialCapacity the initial capacity of the output stream.
+ */
+ public ByteStringOutputStream(int initialCapacity) {
+ if (initialCapacity < 0) {
+ throw new IllegalArgumentException("Initial capacity < 0");
+ }
+ this.buffer = new byte[initialCapacity];
+ this.result = ByteString.EMPTY;
+ }
+
+ @Override
+ public void write(int b) {
+ if (bufferPos == buffer.length) {
+ // We want to increase our total capacity by 50% but not larger than the max chunk size.
Review Comment:
cool, yeah that makes sense
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] steveniemitz commented on a diff in pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString
Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22345:
URL: https://github.com/apache/beam/pull/22345#discussion_r924840472
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+ // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+ // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+ // of appending.
+ private static final int DEFAULT_CAPACITY = 128;
+
+ // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+ // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+ // of 2.
+ //
+ // This number should be tuned periodically as hardware changes.
+ private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+ // ByteString to be concatenated to create the result
+ private ByteString result;
+
+ // Current buffer to which we are writing
+ private byte[] buffer;
+
+ // Location in buffer[] to which we write the next byte.
+ private int bufferPos;
+
+ /** Creates a new output stream with a default capacity. */
+ public ByteStringOutputStream() {
+ this(DEFAULT_CAPACITY);
+ }
+
+ /**
+ * Creates a new output stream with the specified initial capacity.
+ *
+ * @param initialCapacity the initial capacity of the output stream.
+ */
+ public ByteStringOutputStream(int initialCapacity) {
+ if (initialCapacity < 0) {
+ throw new IllegalArgumentException("Initial capacity < 0");
+ }
+ this.buffer = new byte[initialCapacity];
+ this.result = ByteString.EMPTY;
+ }
+
+ @Override
+ public void write(int b) {
+ if (bufferPos == buffer.length) {
+ // We want to increase our total capacity by 50% but not larger than the max chunk size.
+ result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+ buffer = new byte[Math.min(Math.max(1, result.size()), MAX_CHUNK_SIZE)];
+ bufferPos = 0;
+ }
+ buffer[bufferPos++] = (byte) b;
+ }
+
+ @Override
+ public void write(byte[] b, int offset, int length) {
+ int remainingSpaceInBuffer = buffer.length - bufferPos;
+ while (length > remainingSpaceInBuffer) {
+ // Use up the current buffer
+ System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer);
+ offset += remainingSpaceInBuffer;
+ length -= remainingSpaceInBuffer;
+
+ result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+ // We want to increase our total capacity but not larger than the max chunk size.
+ remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE);
+ buffer = new byte[remainingSpaceInBuffer];
+ bufferPos = 0;
+ }
+
+ System.arraycopy(b, offset, buffer, bufferPos, length);
+ bufferPos += length;
+ }
+
+ /** Creates a byte string with the size and contents of this output stream. */
+ public ByteString toByteString() {
+ // The only benefit we get by copying here is that there will be a reduction in the amount
Review Comment:
this comment is a little confusing, at first read it makes it sound like this is doing a copy, but it seems to be reasoning for NOT doing a copy?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] steveniemitz commented on a diff in pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString
Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22345:
URL: https://github.com/apache/beam/pull/22345#discussion_r924838200
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+ // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+ // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+ // of appending.
+ private static final int DEFAULT_CAPACITY = 128;
+
+ // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+ // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+ // of 2.
+ //
+ // This number should be tuned periodically as hardware changes.
+ private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+ // ByteString to be concatenated to create the result
+ private ByteString result;
+
+ // Current buffer to which we are writing
+ private byte[] buffer;
+
+ // Location in buffer[] to which we write the next byte.
+ private int bufferPos;
+
+ /** Creates a new output stream with a default capacity. */
+ public ByteStringOutputStream() {
+ this(DEFAULT_CAPACITY);
+ }
+
+ /**
+ * Creates a new output stream with the specified initial capacity.
+ *
+ * @param initialCapacity the initial capacity of the output stream.
+ */
+ public ByteStringOutputStream(int initialCapacity) {
+ if (initialCapacity < 0) {
+ throw new IllegalArgumentException("Initial capacity < 0");
+ }
+ this.buffer = new byte[initialCapacity];
+ this.result = ByteString.EMPTY;
+ }
+
+ @Override
+ public void write(int b) {
+ if (bufferPos == buffer.length) {
+ // We want to increase our total capacity by 50% but not larger than the max chunk size.
Review Comment:
I wonder if it'd be useful to extract this out to something like `flushBuffer(int newSizeHint)` and use it here and on line 95?
```
void flushBuffer(int newSizeHint) {
result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
buffer = new byte[Math.min(Math.max(newSizeHint, result.size()), MAX_CHUNK_SIZE)];
bufferPos = 0;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org