You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/06 15:46:39 UTC
[beam] branch master updated: Tune ByteStringCoder allocations (#22144)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6dea0d15d0a Tune ByteStringCoder allocations (#22144)
6dea0d15d0a is described below
commit 6dea0d15d0a97d243a2fe56684c2e193cbea14d2
Author: Steven Niemitz <st...@gmail.com>
AuthorDate: Wed Jul 6 11:46:33 2022 -0400
Tune ByteStringCoder allocations (#22144)
---
.../beam/runners/fnexecution/wire/ByteStringCoder.java | 15 +++++++++++----
.../beam/sdk/extensions/protobuf/ByteStringCoder.java | 16 ++++++++++++----
2 files changed, 23 insertions(+), 8 deletions(-)
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoder.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoder.java
index 6af773a7474..0c158be86b7 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoder.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoder.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
/**
@@ -80,10 +81,16 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
}
int size = VarInt.decodeInt(inStream);
- // ByteString reads to the end of the input stream, so give it a limited stream of exactly
- // the right length. Also set its chunk size so that the ByteString will contain exactly
- // one chunk.
- return ByteString.readFrom(ByteStreams.limit(inStream, size), size);
+ if (size == 0) {
+ return ByteString.EMPTY;
+ }
+
+ // we pre-allocate a byte[] and read into it, then wrap it with a ByteString rather than using
+ // ByteString.readFrom since we know the length. Doing so is significantly more efficient
+ // because we don't need an intermediate buffer list.
+ byte[] buf = new byte[size];
+ ByteStreams.readFully(inStream, buf, 0, size);
+ return UnsafeByteOperations.unsafeWrap(buf, 0, size);
}
@Override
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
index effae4733ec..09ea7973b54 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.protobuf;
import com.google.protobuf.ByteString;
+import com.google.protobuf.UnsafeByteOperations;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -79,10 +80,17 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
}
int size = VarInt.decodeInt(inStream);
- // ByteString reads to the end of the input stream, so give it a limited stream of exactly
- // the right length. Also set its chunk size so that the ByteString will contain exactly
- // one chunk.
- return ByteString.readFrom(ByteStreams.limit(inStream, size), size);
+ if (size == 0) {
+ return ByteString.EMPTY;
+ }
+
+ // we pre-allocate a byte[] and read into it, then wrap it with a ByteString, rather than using
+ // ByteString.readFrom since we know the length. Doing so is significantly more efficient
+ // because we don't need an intermediate buffer list and can save an extra byte[] allocation of
+ // `size`.
+ byte[] buf = new byte[size];
+ ByteStreams.readFully(inStream, buf, 0, size);
+ return UnsafeByteOperations.unsafeWrap(buf, 0, size);
}
@Override