You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/19 19:59:03 UTC

[GitHub] [beam] lukecwik commented on a diff in pull request #22345: [BEAM-13015, #21250] Optimize encoding to a ByteString

lukecwik commented on code in PR #22345:
URL: https://github.com/apache/beam/pull/22345#discussion_r924906442


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+  // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+  // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+  // of appending.
+  private static final int DEFAULT_CAPACITY = 128;
+
+  // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+  // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+  // of 2.
+  //
+  // This number should be tuned periodically as hardware changes.
+  private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+  // ByteString to be concatenated to create the result
+  private ByteString result;
+
+  // Current buffer to which we are writing
+  private byte[] buffer;
+
+  // Location in buffer[] to which we write the next byte.
+  private int bufferPos;
+
+  /** Creates a new output stream with a default capacity. */
+  public ByteStringOutputStream() {
+    this(DEFAULT_CAPACITY);
+  }
+
+  /**
+   * Creates a new output stream with the specified initial capacity.
+   *
+   * @param initialCapacity the initial capacity of the output stream.
+   */
+  public ByteStringOutputStream(int initialCapacity) {
+    if (initialCapacity < 0) {
+      throw new IllegalArgumentException("Initial capacity < 0");
+    }
+    this.buffer = new byte[initialCapacity];
+    this.result = ByteString.EMPTY;
+  }
+
+  @Override
+  public void write(int b) {
+    if (bufferPos == buffer.length) {
+      // We want to increase our total capacity by 50% but not larger than the max chunk size.
+      result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+      buffer = new byte[Math.min(Math.max(1, result.size()), MAX_CHUNK_SIZE)];
+      bufferPos = 0;
+    }
+    buffer[bufferPos++] = (byte) b;
+  }
+
+  @Override
+  public void write(byte[] b, int offset, int length) {
+    int remainingSpaceInBuffer = buffer.length - bufferPos;
+    while (length > remainingSpaceInBuffer) {
+      // Use up the current buffer
+      System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer);
+      offset += remainingSpaceInBuffer;
+      length -= remainingSpaceInBuffer;
+
+      result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+      // We want to increase our total capacity but not larger than the max chunk size.
+      remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE);
+      buffer = new byte[remainingSpaceInBuffer];
+      bufferPos = 0;
+    }
+
+    System.arraycopy(b, offset, buffer, bufferPos, length);
+    bufferPos += length;
+  }
+
+  /** Creates a byte string with the size and contents of this output stream. */
+  public ByteString toByteString() {
+    // The only benefit we get by copying here is that there will be a reduction in the amount

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org