You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/06 15:46:39 UTC

[beam] branch master updated: Tune ByteStringCoder allocations (#22144)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dea0d15d0a Tune ByteStringCoder allocations (#22144)
6dea0d15d0a is described below

commit 6dea0d15d0a97d243a2fe56684c2e193cbea14d2
Author: Steven Niemitz <st...@gmail.com>
AuthorDate: Wed Jul 6 11:46:33 2022 -0400

    Tune ByteStringCoder allocations (#22144)
---
 .../beam/runners/fnexecution/wire/ByteStringCoder.java   | 15 +++++++++++----
 .../beam/sdk/extensions/protobuf/ByteStringCoder.java    | 16 ++++++++++++----
 2 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoder.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoder.java
index 6af773a7474..0c158be86b7 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoder.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/ByteStringCoder.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
 
 /**
@@ -80,10 +81,16 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
     }
 
     int size = VarInt.decodeInt(inStream);
-    // ByteString reads to the end of the input stream, so give it a limited stream of exactly
-    // the right length. Also set its chunk size so that the ByteString will contain exactly
-    // one chunk.
-    return ByteString.readFrom(ByteStreams.limit(inStream, size), size);
+    if (size == 0) {
+      return ByteString.EMPTY;
+    }
+
+    // we pre-allocate a byte[] and read into it, then wrap it with a ByteString rather than using
+    // ByteString.readFrom since we know the length.  Doing so is significantly more efficient
+    // because we don't need an intermediate buffer list.
+    byte[] buf = new byte[size];
+    ByteStreams.readFully(inStream, buf, 0, size);
+    return UnsafeByteOperations.unsafeWrap(buf, 0, size);
   }
 
   @Override
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
index effae4733ec..09ea7973b54 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.protobuf;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.UnsafeByteOperations;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -79,10 +80,17 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
     }
 
     int size = VarInt.decodeInt(inStream);
-    // ByteString reads to the end of the input stream, so give it a limited stream of exactly
-    // the right length. Also set its chunk size so that the ByteString will contain exactly
-    // one chunk.
-    return ByteString.readFrom(ByteStreams.limit(inStream, size), size);
+    if (size == 0) {
+      return ByteString.EMPTY;
+    }
+
+    // we pre-allocate a byte[] and read into it, then wrap it with a ByteString, rather than using
+    // ByteString.readFrom since we know the length.  Doing so is significantly more efficient
+    // because we don't need an intermediate buffer list and can save an extra byte[] allocation of
+    // `size`.
+    byte[] buf = new byte[size];
+    ByteStreams.readFully(inStream, buf, 0, size);
+    return UnsafeByteOperations.unsafeWrap(buf, 0, size);
   }
 
   @Override