You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/20 06:22:22 UTC

[beam] branch master updated: [BEAM-13015, #21250] Optimize encoding to a ByteString (#22345)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4821e035c14 [BEAM-13015, #21250] Optimize encoding to a ByteString (#22345)
4821e035c14 is described below

commit 4821e035c148df1ed7eb9e7054e47fe2a7003a1f
Author: Luke Cwik <lc...@google.com>
AuthorDate: Tue Jul 19 23:22:16 2022 -0700

    [BEAM-13015, #21250] Optimize encoding to a ByteString (#22345)
    
    * [BEAM-13015, #21250] Optimize encoding to a ByteString
    
    This leverages the fact that all encoding is done from a thread safe manner
    allowing us to drop the syncrhonization that ByteString.Output adds and it
    also optimizes the max chunk size based upon performance measurements and
    the ratio for how full a byte[] should be for the final copy vs concatenate
    decision.
    
    Below are the results of several scenarios in which we compare the protobuf
    vs new solution:
    ```
    Benchmark                                                                                       Mode  Cnt         Score        Error  Units
    ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewLargeWrites               thrpt   25   1149267.797 ±  15366.677  ops/s
    ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewMixedWritesWithReuse      thrpt   25    832816.697 ±   4236.341  ops/s
    ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewMixedWritesWithoutReuse   thrpt   25    916629.194 ±   5669.323  ops/s
    ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewSmallWrites               thrpt   25  14175167.566 ±  88540.030  ops/s
    ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewTinyWrites                thrpt   25  22471597.238 ± 186098.311  ops/s
    ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyLargeWrites              thrpt   25       610.218 ±      5.019  ops/s
    ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyMixedWritesWithReuse     thrpt   25       484.413 ±     35.194  ops/s
    ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyMixedWritesWithoutReuse  thrpt   25       559.983 ±      6.228  ops/s
    ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManySmallWrites              thrpt   25     10969.839 ±     88.199  ops/s
    ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyTinyWrites               thrpt   25     40822.925 ±    191.402  ops/s
    ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewLargeWrites                thrpt   25   1167673.532 ±   9747.507  ops/s
    ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewMixedWritesWithReuse       thrpt   25   1576528.242 ±  15883.083  ops/s
    ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewMixedWritesWithoutReuse    thrpt   25   1009766.655 ±   8700.273  ops/s
    ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewSmallWrites                thrpt   25  33293140.679 ± 233693.771  ops/s
    ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewTinyWrites                 thrpt   25  86841328.763 ± 729741.769  ops/s
    ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyLargeWrites               thrpt   25      1058.150 ±     15.192  ops/s
    ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyMixedWritesWithReuse      thrpt   25       937.249 ±      9.264  ops/s
    ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyMixedWritesWithoutReuse   thrpt   25       959.671 ±     13.989  ops/s
    ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManySmallWrites               thrpt   25     12601.065 ±     92.375  ops/s
    ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyTinyWrites                thrpt   25     65277.229 ±   3795.676  ops/s
    ```
    
    The copy vs concatenate numbers come from these results which show that 256k seems to
    be a pretty good chunk size since the larger chunks seem to cost more per byte to allocate.
    They also show at what threshold should we currently copy the bytes vs concatenate a partially
    full buffer and allocate a new one:
    ```
    Benchmark                                                                                              newSize       copyVsNew   Mode  Cnt         Score        Error  Units
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        512/1024  thrpt   25  19744209.563 ± 148287.185  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        640/1024  thrpt   25  15738981.338 ± 103684.000  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        768/1024  thrpt   25  12778194.652 ± 202212.679  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        896/1024  thrpt   25  11053602.109 ± 103120.446  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       4096/8192  thrpt   25   2961435.128 ±  25895.802  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       5120/8192  thrpt   25   2498594.030 ±  26051.674  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       6144/8192  thrpt   25   2173161.031 ±  20014.569  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       7168/8192  thrpt   25   1917545.913 ±  21470.719  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     20480/65536  thrpt   25    537872.049 ±   5525.024  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     24576/65536  thrpt   25    371312.042 ±   4450.715  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     28672/65536  thrpt   25    306027.442 ±   2830.503  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     32768/65536  thrpt   25    263933.096 ±   1833.603  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   131072/262144  thrpt   25     80224.558 ±   1192.994  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   163840/262144  thrpt   25     65311.283 ±    775.920  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   196608/262144  thrpt   25     54510.877 ±    797.775  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   229376/262144  thrpt   25     46808.185 ±    515.039  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  524288/1048576  thrpt   25     17729.937 ±    301.199  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  655360/1048576  thrpt   25     12996.953 ±    228.552  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  786432/1048576  thrpt   25     11383.122 ±    384.086  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  917504/1048576  thrpt   25      9915.318 ±    285.995  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                    1024             N/A  thrpt   25  10023631.563 ±  61317.055  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                    8192             N/A  thrpt   25   2109120.041 ±  17482.682  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                   65536             N/A  thrpt   25    318492.630 ±   3006.827  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                  262144             N/A  thrpt   25     79228.892 ±    725.230  ops/s
    ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                 1048576             N/A  thrpt   25     13089.221 ±     73.535  ops/s
    ```
    
    The difference is minor in the `ProcessBundleBenchmark` as there is not
    enough data being passed around for it to make a major difference:
    ```
    Before
    Benchmark                                        Mode  Cnt      Score     Error  Units
    ProcessBundleBenchmark.testLargeBundle          thrpt   25   1156.159 ±   9.001  ops/s
    ProcessBundleBenchmark.testTinyBundle           thrpt   25  29641.444 ± 125.041  ops/s
    
    After
    Benchmark                                        Mode  Cnt      Score    Error  Units
    ProcessBundleBenchmark.testLargeBundle          thrpt   25   1168.977 ± 25.848  ops/s
    ProcessBundleBenchmark.testTinyBundle           thrpt   25  29664.783 ± 99.791  ops/s
    ```
    
    * fixup comment and address analyzeClassDependencies failures
---
 .../core/construction/ValidateRunnerXlangTest.java |   4 +-
 .../core/metrics/MonitoringInfoEncodings.java      |  11 +-
 .../beam/runners/dataflow/worker/PubsubSink.java   |   5 +-
 .../beam/runners/dataflow/worker/StateFetcher.java |   4 +-
 .../worker/StreamingModeExecutionContext.java      |   7 +-
 .../dataflow/worker/StreamingSideInputFetcher.java |   4 +-
 .../beam/runners/dataflow/worker/WindmillSink.java |   5 +-
 .../dataflow/worker/WindmillStateInternals.java    |  19 +-
 .../control/RegisterAndProcessBundleOperation.java |   3 +-
 .../graph/CreateExecutableStageNodeFunction.java   |   3 +-
 .../worker/graph/RegisterNodeFunction.java         |   3 +-
 .../runners/dataflow/worker/StateFetcherTest.java  |  10 +-
 .../worker/StreamingDataflowWorkerTest.java        |   4 +-
 .../worker/StreamingGroupAlsoByWindowFnsTest.java  |   3 +-
 ...reamingGroupAlsoByWindowsReshuffleDoFnTest.java |   3 +-
 .../worker/WindmillStateInternalsTest.java         |   3 +-
 .../dataflow/worker/WindmillStateReaderTest.java   |   4 +-
 .../fnexecution/control/RemoteExecutionTest.java   |   5 +-
 sdks/java/core/jmh/build.gradle                    |   3 +
 .../jmh/util/ByteStringOutputStreamBenchmark.java  | 416 +++++++++++++++++++++
 .../apache/beam/sdk/jmh/util/package-info.java}    |  24 +-
 .../util/ByteStringOutputStreamBenchmarkTest.java  |  88 +++++
 .../beam/sdk/util/ByteStringOutputStream.java      | 171 +++++++++
 .../beam/sdk/util/ByteStringOutputStreamTest.java  | 115 ++++++
 .../expansion/service/ExpansionServiceTest.java    |   4 +-
 .../JavaClassLookupTransformProviderTest.java      |   3 +-
 .../sdk/fn/data/BeamFnDataOutboundAggregator.java  |  27 +-
 .../org/apache/beam/sdk/fn/stream/DataStreams.java |   8 +-
 .../fn/data/BeamFnDataInboundObserver2Test.java    |   6 +-
 .../sdk/fn/data/BeamFnDataInboundObserverTest.java |   3 +-
 .../fn/data/BeamFnDataOutboundAggregatorTest.java  |   8 +-
 .../apache/beam/sdk/fn/stream/DataStreamsTest.java |   5 +-
 sdks/java/harness/jmh/build.gradle                 |   8 +-
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |  11 +-
 .../apache/beam/fn/harness/state/BagUserState.java |   4 +-
 .../beam/fn/harness/state/FnApiStateAccessor.java  |   7 +-
 .../fn/harness/state/FnApiTimerBundleTracker.java  |  11 +-
 .../beam/fn/harness/state/MultimapSideInput.java   |   3 +-
 .../beam/fn/harness/state/MultimapUserState.java   |   5 +-
 .../beam/fn/harness/FnApiDoFnRunnerTest.java       |  11 +-
 .../harness/control/ProcessBundleHandlerTest.java  |  13 +-
 .../beam/fn/harness/state/BagUserStateTest.java    |   3 +-
 .../fn/harness/state/FakeBeamFnStateClient.java    |   6 +-
 .../fn/harness/state/MultimapSideInputTest.java    |   3 +-
 .../fn/harness/state/MultimapUserStateTest.java    |   5 +-
 .../fn/harness/state/StateBackedIterableTest.java  |   3 +-
 .../beam/sdk/io/gcp/pubsublite/internal/Uuid.java  |   3 +-
 .../sdk/io/gcp/pubsub/PubsubIOExternalTest.java    |   4 +-
 .../beam/sdk/io/kafka/KafkaIOExternalTest.java     |   4 +-
 49 files changed, 950 insertions(+), 135 deletions(-)

diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
index 8a13df9abd4..1ecd9041a75 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
@@ -34,13 +34,13 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -93,7 +93,7 @@ public class ValidateRunnerXlangTest {
               .withFieldValue("data", data)
               .build();
 
-      ByteString.Output outputStream = ByteString.newOutput();
+      ByteStringOutputStream outputStream = new ByteStringOutputStream();
       try {
         RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
       } catch (IOException e) {
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
index cfa082c6cb4..3d8cf2d941e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.joda.time.Instant;
 
@@ -32,7 +33,7 @@ public class MonitoringInfoEncodings {
 
   /** Encodes to {@link MonitoringInfoConstants.TypeUrns#DISTRIBUTION_INT64_TYPE}. */
   public static ByteString encodeInt64Distribution(DistributionData data) {
-    ByteString.Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     try {
       VARINT_CODER.encode(data.count(), output);
       VARINT_CODER.encode(data.sum(), output);
@@ -62,7 +63,7 @@ public class MonitoringInfoEncodings {
   // TODO(BEAM-4374): Implement decodeDoubleDistribution(...)
   public static ByteString encodeDoubleDistribution(
       long count, double sum, double min, double max) {
-    ByteString.Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     try {
       VARINT_CODER.encode(count, output);
       DOUBLE_CODER.encode(sum, output);
@@ -76,7 +77,7 @@ public class MonitoringInfoEncodings {
 
   /** Encodes to {@link MonitoringInfoConstants.TypeUrns#LATEST_INT64_TYPE}. */
   public static ByteString encodeInt64Gauge(GaugeData data) {
-    ByteString.Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     try {
       VARINT_CODER.encode(data.timestamp().getMillis(), output);
       VARINT_CODER.encode(data.value(), output);
@@ -99,7 +100,7 @@ public class MonitoringInfoEncodings {
 
   /** Encodes to {@link MonitoringInfoConstants.TypeUrns#SUM_INT64_TYPE}. */
   public static ByteString encodeInt64Counter(long value) {
-    ByteString.Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     try {
       VARINT_CODER.encode(value, output);
     } catch (IOException e) {
@@ -119,7 +120,7 @@ public class MonitoringInfoEncodings {
 
   /** Encodes to {@link MonitoringInfoConstants.TypeUrns#SUM_DOUBLE_TYPE}. */
   public static ByteString encodeDoubleCounter(double value) {
-    ByteString.Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     try {
       DOUBLE_CODER.encode(value, output);
     } catch (IOException e) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
index 6bd58fd9821..d8c670b7c55 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -160,11 +161,11 @@ class PubsubSink<T> extends Sink<WindowedValue<T>> {
         if (formatted.getAttributeMap() != null) {
           pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
         }
-        ByteString.Output output = ByteString.newOutput();
+        ByteStringOutputStream output = new ByteStringOutputStream();
         pubsubMessageBuilder.build().writeTo(output);
         byteString = output.toByteString();
       } else {
-        ByteString.Output stream = ByteString.newOutput();
+        ByteStringOutputStream stream = new ByteStringOutputStream();
         coder.encode(data.getValue(), stream, Coder.Context.OUTER);
         byteString = stream.toByteString();
       }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
index 883822f4fc7..82ca67c8fb3 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
@@ -35,10 +35,10 @@ import org.apache.beam.sdk.transforms.Materializations.IterableView;
 import org.apache.beam.sdk.transforms.Materializations.MultimapView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
@@ -124,7 +124,7 @@ class StateFetcher {
 
           Coder<SideWindowT> windowCoder = sideWindowStrategy.getWindowFn().windowCoder();
 
-          ByteString.Output windowStream = ByteString.newOutput();
+          ByteStringOutputStream windowStream = new ByteStringOutputStream();
           windowCoder.encode(sideWindow, windowStream, Coder.Context.OUTER);
 
           @SuppressWarnings("unchecked")
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index da18f865500..601efcd5f01 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
@@ -429,7 +430,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
           ((UnboundedSource<?, UnboundedSource.CheckpointMark>) activeReader.getCurrentSource())
               .getCheckpointMarkCoder();
       if (checkpointCoder != null) {
-        ByteString.Output stream = ByteString.newOutput();
+        ByteStringOutputStream stream = new ByteStringOutputStream();
         try {
           checkpointCoder.encode(checkpointMark, stream, Coder.Context.OUTER);
         } catch (IOException e) {
@@ -738,10 +739,10 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
         throw new IllegalStateException("writePCollectionViewData must follow a Combine.globally");
       }
 
-      ByteString.Output dataStream = ByteString.newOutput();
+      ByteStringOutputStream dataStream = new ByteStringOutputStream();
       dataCoder.encode(data, dataStream, Coder.Context.OUTER);
 
-      ByteString.Output windowStream = ByteString.newOutput();
+      ByteStringOutputStream windowStream = new ByteStringOutputStream();
       windowCoder.encode(window, windowStream, Coder.Context.OUTER);
 
       ensureStateful("Tried to write view data");
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
index c8c7e04cad1..12d98a885bd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
@@ -45,10 +45,10 @@ import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Parser;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -305,7 +305,7 @@ public class StreamingSideInputFetcher<InputT, W extends BoundedWindow> {
     SideWindowT sideInputWindow =
         (SideWindowT) view.getWindowMappingFn().getSideInputWindow(mainWindow);
 
-    ByteString.Output windowStream = ByteString.newOutput();
+    ByteStringOutputStream windowStream = new ByteStringOutputStream();
     try {
       sideInputWindowCoder.encode(sideInputWindow, windowStream, Coder.Context.OUTER);
     } catch (IOException e) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index 6696de6ca74..a755f318350 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
@@ -69,7 +70,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
       Collection<? extends BoundedWindow> windows,
       PaneInfo pane)
       throws IOException {
-    ByteString.Output stream = ByteString.newOutput();
+    ByteStringOutputStream stream = new ByteStringOutputStream();
     PaneInfoCoder.INSTANCE.encode(pane, stream);
     windowsCoder.encode(windows, stream, Coder.Context.OUTER);
     return stream.toByteString();
@@ -135,7 +136,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
     }
 
     private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object) throws IOException {
-      ByteString.Output stream = ByteString.newOutput();
+      ByteStringOutputStream stream = new ByteStringOutputStream();
       coder.encode(object, stream, Coder.Context.OUTER);
       return stream.toByteString();
     }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
index 5093b21c5a8..7eca9d3da39 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
@@ -80,6 +80,7 @@ import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.util.Weighted;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -351,7 +352,7 @@ class WindmillStateInternals<K> implements StateInternals {
     try {
       // Use ByteString.Output rather than concatenation and String.format. We build these keys
       // a lot, and this leads to better performance results. See associated benchmarks.
-      ByteString.Output stream = ByteString.newOutput();
+      ByteStringOutputStream stream = new ByteStringOutputStream();
       OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8);
 
       // stringKey starts and ends with a slash.  We separate it from the
@@ -522,7 +523,7 @@ class WindmillStateInternals<K> implements StateInternals {
 
       ByteString encoded = null;
       if (cachedSize == -1 || modified) {
-        ByteString.Output stream = ByteString.newOutput();
+        ByteStringOutputStream stream = new ByteStringOutputStream();
         if (value != null) {
           coder.encode(value, stream, Coder.Context.OUTER);
         }
@@ -1047,7 +1048,7 @@ class WindmillStateInternals<K> implements StateInternals {
               pendingAdds,
               (elem, id) -> {
                 try {
-                  ByteString.Output elementStream = ByteString.newOutput();
+                  ByteStringOutputStream elementStream = new ByteStringOutputStream();
                   elemCoder.encode(elem.getValue(), elementStream, Context.OUTER);
                   insertBuilder.addEntries(
                       SortedListEntry.newBuilder()
@@ -1249,7 +1250,7 @@ class WindmillStateInternals<K> implements StateInternals {
     }
 
     private ByteString protoKeyFromUserKey(K key) throws IOException {
-      ByteString.Output keyStream = ByteString.newOutput();
+      ByteStringOutputStream keyStream = new ByteStringOutputStream();
       stateKeyPrefix.writeTo(keyStream);
       keyCoder.encode(key, keyStream, Context.OUTER);
       return keyStream.toByteString();
@@ -1275,7 +1276,7 @@ class WindmillStateInternals<K> implements StateInternals {
 
       for (K key : localAdditions) {
         ByteString keyBytes = protoKeyFromUserKey(key);
-        ByteString.Output valueStream = ByteString.newOutput();
+        ByteStringOutputStream valueStream = new ByteStringOutputStream();
         valueCoder.encode(cachedValues.get(key), valueStream, Context.OUTER);
         ByteString valueBytes = valueStream.toByteString();
 
@@ -1290,7 +1291,7 @@ class WindmillStateInternals<K> implements StateInternals {
       localAdditions.clear();
 
       for (K key : localRemovals) {
-        ByteString.Output keyStream = ByteString.newOutput();
+        ByteStringOutputStream keyStream = new ByteStringOutputStream();
         stateKeyPrefix.writeTo(keyStream);
         keyCoder.encode(key, keyStream, Context.OUTER);
         ByteString keyBytes = keyStream.toByteString();
@@ -1304,7 +1305,7 @@ class WindmillStateInternals<K> implements StateInternals {
 
         V cachedValue = cachedValues.remove(key);
         if (cachedValue != null) {
-          ByteString.Output valueStream = ByteString.newOutput();
+          ByteStringOutputStream valueStream = new ByteStringOutputStream();
           valueCoder.encode(cachedValues.get(key), valueStream, Context.OUTER);
         }
       }
@@ -1555,7 +1556,7 @@ class WindmillStateInternals<K> implements StateInternals {
 
     private Future<V> getFutureForKey(K key) {
       try {
-        ByteString.Output keyStream = ByteString.newOutput();
+        ByteStringOutputStream keyStream = new ByteStringOutputStream();
         stateKeyPrefix.writeTo(keyStream);
         keyCoder.encode(key, keyStream, Context.OUTER);
         return reader.valueFuture(keyStream.toByteString(), stateFamily, valueCoder);
@@ -1703,7 +1704,7 @@ class WindmillStateInternals<K> implements StateInternals {
           bagUpdatesBuilder = commitBuilder.addBagUpdatesBuilder();
         }
         for (T value : localAdditions) {
-          ByteString.Output stream = ByteString.newOutput();
+          ByteStringOutputStream stream = new ByteStringOutputStream();
           // Encode the value
           elemCoder.encode(value, stream, Coder.Context.OUTER);
           ByteString encoded = stream.toByteString();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index 7be2820dee7..580450aed4d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -71,6 +71,7 @@ import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.MoreFutures;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
@@ -668,7 +669,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
   }
 
   static ByteString encodeAndConcat(Iterable<Object> values, Coder valueCoder) throws IOException {
-    ByteString.Output out = ByteString.newOutput();
+    ByteStringOutputStream out = new ByteStringOutputStream();
     if (values != null) {
       for (Object value : values) {
         int size = out.size();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index ee79ee39a46..0e9b86beb9b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -79,6 +79,7 @@ import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -216,7 +217,7 @@ public class CreateExecutableStageNodeFunction
 
       String coderId = "generatedCoder" + idGenerator.getId();
       String windowingStrategyId;
-      try (ByteString.Output output = ByteString.newOutput()) {
+      try (ByteStringOutputStream output = new ByteStringOutputStream()) {
         try {
           Coder<?> javaCoder =
               CloudObjects.coderFromCloudObject(CloudObject.fromSpec(instructionOutput.getCodec()));
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
index befaccbd6ad..927bea9f55a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
@@ -72,6 +72,7 @@ import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -234,7 +235,7 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
 
       String coderId = "generatedCoder" + idGenerator.getId();
       instructionOutputNodeToCoderIdBuilder.put(node, coderId);
-      try (ByteString.Output output = ByteString.newOutput()) {
+      try (ByteStringOutputStream output = new ByteStringOutputStream()) {
         try {
           Coder<?> javaCoder =
               CloudObjects.coderFromCloudObject(CloudObject.fromSpec(instructionOutput.getCodec()));
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StateFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StateFetcherTest.java
index f16ff49536f..e87ec6ae34c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StateFetcherTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StateFetcherTest.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
@@ -73,7 +74,7 @@ public class StateFetcherTest {
   public void testFetchGlobalDataBasic() throws Exception {
     StateFetcher fetcher = new StateFetcher(server);
 
-    ByteString.Output stream = ByteString.newOutput();
+    ByteStringOutputStream stream = new ByteStringOutputStream();
     ListCoder.of(StringUtf8Coder.of()).encode(Arrays.asList("data"), stream, Coder.Context.OUTER);
     ByteString encodedIterable = stream.toByteString();
 
@@ -126,7 +127,7 @@ public class StateFetcherTest {
   public void testFetchGlobalDataNull() throws Exception {
     StateFetcher fetcher = new StateFetcher(server);
 
-    ByteString.Output stream = ByteString.newOutput();
+    ByteStringOutputStream stream = new ByteStringOutputStream();
     ListCoder.of(VoidCoder.of()).encode(Arrays.asList((Void) null), stream, Coder.Context.OUTER);
     ByteString encodedIterable = stream.toByteString();
 
@@ -179,10 +180,9 @@ public class StateFetcherTest {
   public void testFetchGlobalDataCacheOverflow() throws Exception {
     Coder<List<String>> coder = ListCoder.of(StringUtf8Coder.of());
 
-    ByteString.Output stream = ByteString.newOutput();
+    ByteStringOutputStream stream = new ByteStringOutputStream();
     coder.encode(Arrays.asList("data1"), stream, Coder.Context.OUTER);
-    ByteString encodedIterable1 = stream.toByteString();
-    stream = ByteString.newOutput();
+    ByteString encodedIterable1 = stream.toByteStringAndReset();
     coder.encode(Arrays.asList("data2"), stream, Coder.Context.OUTER);
     ByteString encodedIterable2 = stream.toByteString();
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index 338a1d7eb14..d52680fcdda 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -131,6 +131,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.DoFnInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
@@ -144,7 +145,6 @@ import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString.Output;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.TextFormat;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheStats;
@@ -661,7 +661,7 @@ public class StreamingDataflowWorkerTest {
 
   private ByteString addPaneTag(PaneInfo pane, byte[] windowBytes)
       throws CoderException, IOException {
-    Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     PaneInfo.PaneInfoCoder.INSTANCE.encode(pane, output, Context.OUTER);
     output.write(windowBytes);
     return output.toByteString();
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
index 0a3a997a2cf..7b8e8991a90 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
@@ -71,6 +71,7 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
@@ -169,7 +170,7 @@ public class StreamingGroupAlsoByWindowFnsTest {
     Coder<Collection<? extends BoundedWindow>> windowsCoder =
         (Coder) CollectionCoder.of(windowCoder);
 
-    ByteString.Output dataOutput = ByteString.newOutput();
+    ByteStringOutputStream dataOutput = new ByteStringOutputStream();
     valueCoder.encode(value, dataOutput, Context.OUTER);
     messageBundle
         .addMessagesBuilder()
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
index 1b9f96e45cc..757e49d3233 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
@@ -109,7 +110,7 @@ public class StreamingGroupAlsoByWindowsReshuffleDoFnTest {
     Coder<Collection<? extends BoundedWindow>> windowsCoder =
         (Coder) CollectionCoder.of(windowCoder);
 
-    ByteString.Output dataOutput = ByteString.newOutput();
+    ByteStringOutputStream dataOutput = new ByteStringOutputStream();
     valueCoder.encode(value, dataOutput, Context.OUTER);
     messageBundle
         .addMessagesBuilder()
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java
index 2abc4f0e3ba..157347c52ed 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
@@ -198,7 +199,7 @@ public class WindmillStateInternalsTest {
 
   private <K> ByteString protoKeyFromUserKey(@Nullable K tag, Coder<K> keyCoder)
       throws IOException {
-    ByteString.Output keyStream = ByteString.newOutput();
+    ByteStringOutputStream keyStream = new ByteStringOutputStream();
     key(NAMESPACE, "map").writeTo(keyStream);
     if (tag != null) {
       keyCoder.encode(tag, keyStream, Context.OUTER);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java
index edcb1915c04..934487d291b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java
@@ -33,9 +33,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.SortedListRange
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString.Output;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Range;
 import org.hamcrest.Matchers;
@@ -93,7 +93,7 @@ public class WindmillStateReaderTest {
   }
 
   private ByteString intData(int value) throws IOException {
-    Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     INT_CODER.encode(value, output, Coder.Context.OUTER);
     return output.toByteString();
   }
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 7e28a799c46..8f5dbfa4eb1 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -130,6 +130,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -819,7 +820,7 @@ public class RemoteExecutionTest implements Serializable {
   }
 
   private static ByteString encode(String value) throws Exception {
-    ByteString.Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     StringUtf8Coder.of().encode(value, output);
     return output.toByteString();
   }
@@ -1524,7 +1525,7 @@ public class RemoteExecutionTest implements Serializable {
 
     // 3 Requests expected: state read, state2 read, and state2 clear
     assertEquals(3, stateRequestHandler.getRequestCount());
-    ByteString.Output out = ByteString.newOutput();
+    ByteStringOutputStream out = new ByteStringOutputStream();
     StringUtf8Coder.of().encode("X", out);
 
     assertEquals(
diff --git a/sdks/java/core/jmh/build.gradle b/sdks/java/core/jmh/build.gradle
index 7d9948ba517..06df6abf739 100644
--- a/sdks/java/core/jmh/build.gradle
+++ b/sdks/java/core/jmh/build.gradle
@@ -29,6 +29,9 @@ ext.summary = "This contains JMH benchmarks for the SDK Core for Beam Java"
 dependencies {
   implementation project(path: ":sdks:java:core", configuration: "shadow")
   implementation library.java.joda_time
+  implementation library.java.vendored_grpc_1_43_2
   implementation library.java.vendored_guava_26_0_jre
   runtimeOnly library.java.slf4j_jdk14
+  testImplementation library.java.junit
+  testImplementation library.java.hamcrest
 }
diff --git a/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmark.java b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmark.java
new file mode 100644
index 00000000000..2a33c76ebaf
--- /dev/null
+++ b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmark.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.jmh.util;
+
+import java.util.List;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.infra.Blackhole;
+
+/** Benchmarks for {@link ByteStringOutputStream}. */
+public class ByteStringOutputStreamBenchmark {
+
+  private static final int MANY_WRITES = 10_000;
+  private static final int FEW_WRITES = 5;
+  private static final byte[] LARGE_BUFFER = new byte[1000];
+  private static final byte[] SMALL_BUFFER = new byte[20];
+
+  @State(Scope.Thread)
+  public static class ProtobufByteStringOutputStream {
+    final ByteString.Output output = ByteString.newOutput();
+
+    @TearDown
+    public void tearDown() throws Exception {
+      output.close();
+    }
+  }
+
+  @State(Scope.Thread)
+  public static class SdkCoreByteStringOutputStream {
+    final ByteStringOutputStream output = new ByteStringOutputStream();
+
+    @TearDown
+    public void tearDown() throws Exception {
+      output.close();
+    }
+  }
+
+  @Benchmark
+  public void testSdkCoreByteStringOutputStreamManyMixedWritesWithoutReuse() throws Exception {
+    ByteStringOutputStream output = new ByteStringOutputStream();
+    for (int i = 0; i < MANY_WRITES; i++) {
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+      output.write(LARGE_BUFFER);
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+    }
+    if (output.toByteString().size()
+        != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * MANY_WRITES) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testSdkCoreByteStringOutputStreamFewMixedWritesWithoutReuse() throws Exception {
+    ByteStringOutputStream output = new ByteStringOutputStream();
+    for (int i = 0; i < FEW_WRITES; i++) {
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+      output.write(LARGE_BUFFER);
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+    }
+    if (output.toByteString().size()
+        != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * FEW_WRITES) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testProtobufByteStringOutputStreamManyMixedWritesWithoutReuse() throws Exception {
+    ByteString.Output output = ByteString.newOutput();
+    for (int i = 0; i < MANY_WRITES; i++) {
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+      output.write(LARGE_BUFFER);
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+    }
+    if (output.toByteString().size()
+        != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * MANY_WRITES) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testProtobufByteStringOutputStreamFewMixedWritesWithoutReuse() throws Exception {
+    ByteString.Output output = ByteString.newOutput();
+    for (int i = 0; i < FEW_WRITES; i++) {
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+      output.write(LARGE_BUFFER);
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+    }
+    if (output.toByteString().size()
+        != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * FEW_WRITES) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testProtobufByteStringOutputStreamManyTinyWrites() throws Exception {
+    ByteString.Output output = ByteString.newOutput();
+    for (int i = 0; i < MANY_WRITES; ++i) {
+      output.write(1);
+    }
+    if (output.toByteString().size() != MANY_WRITES) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testProtobufByteStringOutputStreamManySmallWrites() throws Exception {
+    ByteString.Output output = ByteString.newOutput();
+    for (int i = 0; i < MANY_WRITES; ++i) {
+      output.write(SMALL_BUFFER);
+    }
+    if (output.toByteString().size() != MANY_WRITES * SMALL_BUFFER.length) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testProtobufByteStringOutputStreamManyLargeWrites() throws Exception {
+    ByteString.Output output = ByteString.newOutput();
+    for (int i = 0; i < MANY_WRITES; ++i) {
+      output.write(LARGE_BUFFER);
+    }
+    if (output.toByteString().size() != MANY_WRITES * LARGE_BUFFER.length) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testProtobufByteStringOutputStreamFewTinyWrites() throws Exception {
+    ByteString.Output output = ByteString.newOutput();
+    for (int i = 0; i < FEW_WRITES; ++i) {
+      output.write(1);
+    }
+    if (output.toByteString().size() != FEW_WRITES) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testProtobufByteStringOutputStreamFewSmallWrites() throws Exception {
+    ByteString.Output output = ByteString.newOutput();
+    for (int i = 0; i < FEW_WRITES; ++i) {
+      output.write(SMALL_BUFFER);
+    }
+    if (output.toByteString().size() != FEW_WRITES * SMALL_BUFFER.length) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testProtobufByteStringOutputStreamFewLargeWrites() throws Exception {
+    ByteString.Output output = ByteString.newOutput();
+    for (int i = 0; i < FEW_WRITES; ++i) {
+      output.write(LARGE_BUFFER);
+    }
+    if (output.toByteString().size() != FEW_WRITES * LARGE_BUFFER.length) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testProtobufByteStringOutputStreamManyMixedWritesWithReuse(
+      ProtobufByteStringOutputStream state) throws Exception {
+    ByteString.Output output = state.output;
+    for (int i = 0; i < 9850; i++) {
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+      output.write(LARGE_BUFFER);
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+    }
+    if (output.toByteString().size()
+        != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * 9850) {
+      throw new IllegalArgumentException();
+    }
+    output.reset();
+  }
+
+  @Benchmark
+  public void testProtobufByteStringOutputStreamFewMixedWritesWithReuse(
+      ProtobufByteStringOutputStream state) throws Exception {
+    ByteString.Output output = state.output;
+    for (int i = 0; i < FEW_WRITES; i++) {
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+      output.write(LARGE_BUFFER);
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+    }
+    if (output.toByteString().size()
+        != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * FEW_WRITES) {
+      throw new IllegalArgumentException();
+    }
+    output.reset();
+  }
+
+  @Benchmark
+  public void testSdkCoreByteStringOutputStreamManyTinyWrites() throws Exception {
+    ByteStringOutputStream output = new ByteStringOutputStream();
+    for (int i = 0; i < MANY_WRITES; ++i) {
+      output.write(1);
+    }
+    if (output.toByteString().size() != MANY_WRITES) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testSdkCoreByteStringOutputStreamManySmallWrites() throws Exception {
+    ByteStringOutputStream output = new ByteStringOutputStream();
+    for (int i = 0; i < MANY_WRITES; ++i) {
+      output.write(SMALL_BUFFER);
+    }
+    if (output.toByteString().size() != MANY_WRITES * SMALL_BUFFER.length) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testSdkCoreByteStringOutputStreamManyLargeWrites() throws Exception {
+    ByteStringOutputStream output = new ByteStringOutputStream();
+    for (int i = 0; i < MANY_WRITES; ++i) {
+      output.write(LARGE_BUFFER);
+    }
+    if (output.toByteString().size() != MANY_WRITES * LARGE_BUFFER.length) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testSdkCoreByteStringOutputStreamFewTinyWrites() throws Exception {
+    ByteStringOutputStream output = new ByteStringOutputStream();
+    for (int i = 0; i < FEW_WRITES; ++i) {
+      output.write(1);
+    }
+    if (output.toByteString().size() != FEW_WRITES) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testSdkCoreByteStringOutputStreamFewSmallWrites() throws Exception {
+    ByteStringOutputStream output = new ByteStringOutputStream();
+    for (int i = 0; i < FEW_WRITES; ++i) {
+      output.write(SMALL_BUFFER);
+    }
+    if (output.toByteString().size() != FEW_WRITES * SMALL_BUFFER.length) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testSdkCoreByteStringOutputStreamFewLargeWrites() throws Exception {
+    ByteStringOutputStream output = new ByteStringOutputStream();
+    for (int i = 0; i < FEW_WRITES; ++i) {
+      output.write(LARGE_BUFFER);
+    }
+    if (output.toByteString().size() != FEW_WRITES * LARGE_BUFFER.length) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testSdkCoreByteStringOutputStreamManyMixedWritesWithReuse(
+      SdkCoreByteStringOutputStream state) throws Exception {
+    ByteStringOutputStream output = state.output;
+    for (int i = 0; i < 9850; i++) {
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+      output.write(LARGE_BUFFER);
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+    }
+    if (output.toByteStringAndReset().size()
+        != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * 9850) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  @Benchmark
+  public void testSdkCoreByteStringOutputStreamFewMixedWritesWithReuse(
+      SdkCoreByteStringOutputStream state) throws Exception {
+    ByteStringOutputStream output = state.output;
+    for (int i = 0; i < FEW_WRITES; i++) {
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+      output.write(LARGE_BUFFER);
+      output.write(1);
+      output.write(SMALL_BUFFER);
+      output.write(1);
+    }
+    if (output.toByteStringAndReset().size()
+        != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * FEW_WRITES) {
+      throw new IllegalArgumentException();
+    }
+    output.close();
+  }
+
+  /**
+   * These benchmarks below provide good details as to the cost of creating a new buffer vs copying
+   * a subset of the existing one and re-using the larger one.
+   */
+  public static class NewVsCopy {
+    @State(Scope.Thread)
+    public static class ArrayCopyState {
+      @Param({
+        "512/1024", "640/1024", "768/1024", "896/1024",
+        "4096/8192", "5120/8192", "6144/8192", "7168/8192",
+        "20480/65536", "24576/65536", "28672/65536", "32768/65536",
+        "131072/262144", "163840/262144", "196608/262144", "229376/262144",
+        "524288/1048576", "655360/1048576", "786432/1048576", "917504/1048576"
+      })
+      String copyVsNew;
+
+      int copyThreshold;
+      int byteArraySize;
+      public byte[] src;
+
+      @Setup
+      public void setup() {
+        List<String> parts = Splitter.on('/').splitToList(copyVsNew);
+        copyThreshold = Integer.parseInt(parts.get(0));
+        byteArraySize = Integer.parseInt(parts.get(1));
+        src = new byte[byteArraySize];
+      }
+    }
+
+    @Benchmark
+    public void testCopyArray(ArrayCopyState state, Blackhole bh) {
+      byte[] dest = new byte[state.copyThreshold];
+      System.arraycopy(state.src, 0, dest, 0, state.copyThreshold);
+      bh.consume(UnsafeByteOperations.unsafeWrap(dest));
+    }
+
+    @State(Scope.Benchmark)
+    public static class ArrayNewState {
+      @Param({"1024", "8192", "65536", "262144", "1048576"})
+      int byteArraySize;
+
+      public byte[] src;
+
+      @Setup
+      public void setup() {
+        src = new byte[byteArraySize];
+      }
+    }
+
+    @Benchmark
+    public void testNewArray(ArrayNewState state, Blackhole bh) {
+      bh.consume(UnsafeByteOperations.unsafeWrap(state.src, 0, state.byteArraySize));
+      state.src = new byte[state.byteArraySize];
+    }
+  }
+}
diff --git a/sdks/java/core/jmh/build.gradle b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/package-info.java
similarity index 51%
copy from sdks/java/core/jmh/build.gradle
copy to sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/package-info.java
index 7d9948ba517..647f31b7276 100644
--- a/sdks/java/core/jmh/build.gradle
+++ b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/package-info.java
@@ -4,31 +4,21 @@
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
+ * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
+ * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
-plugins { id 'org.apache.beam.module' }
+/** Benchmarks for core SDK utility classes. */
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.jmh.util;
 
-applyJavaNature(
-  automaticModuleName: 'org.apache.beam.sdk.jmh',
-  enableJmh: true,
-  publish: false)
-
-description = "Apache Beam :: SDKs :: Java :: Core :: JMH"
-ext.summary = "This contains JMH benchmarks for the SDK Core for Beam Java"
-
-dependencies {
-  implementation project(path: ":sdks:java:core", configuration: "shadow")
-  implementation library.java.joda_time
-  implementation library.java.vendored_guava_26_0_jre
-  runtimeOnly library.java.slf4j_jdk14
-}
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import org.checkerframework.checker.nullness.qual.NonNull;
diff --git a/sdks/java/core/jmh/src/test/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmarkTest.java b/sdks/java/core/jmh/src/test/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmarkTest.java
new file mode 100644
index 00000000000..31e15fb4f08
--- /dev/null
+++ b/sdks/java/core/jmh/src/test/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmarkTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.jmh.util;
+
+import org.apache.beam.sdk.jmh.util.ByteStringOutputStreamBenchmark.NewVsCopy.ArrayCopyState;
+import org.apache.beam.sdk.jmh.util.ByteStringOutputStreamBenchmark.NewVsCopy.ArrayNewState;
+import org.apache.beam.sdk.jmh.util.ByteStringOutputStreamBenchmark.ProtobufByteStringOutputStream;
+import org.apache.beam.sdk.jmh.util.ByteStringOutputStreamBenchmark.SdkCoreByteStringOutputStream;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.openjdk.jmh.infra.Blackhole;
+
+/** Tests for {@link ByteStringOutputStreamBenchmark}. */
+@RunWith(JUnit4.class)
+public class ByteStringOutputStreamBenchmarkTest {
+  @Test
+  public void testProtobufByteStringOutputStream() throws Exception {
+    new ByteStringOutputStreamBenchmark()
+        .testProtobufByteStringOutputStreamFewMixedWritesWithoutReuse();
+    new ByteStringOutputStreamBenchmark()
+        .testProtobufByteStringOutputStreamFewMixedWritesWithReuse(
+            new ProtobufByteStringOutputStream());
+    new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamFewLargeWrites();
+    new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamFewSmallWrites();
+    new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamFewTinyWrites();
+    new ByteStringOutputStreamBenchmark()
+        .testProtobufByteStringOutputStreamManyMixedWritesWithoutReuse();
+    new ByteStringOutputStreamBenchmark()
+        .testProtobufByteStringOutputStreamManyMixedWritesWithReuse(
+            new ProtobufByteStringOutputStream());
+    new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamManyLargeWrites();
+    new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamManySmallWrites();
+    new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamManyTinyWrites();
+  }
+
+  @Test
+  public void testSdkCoreByteStringOutputStream() throws Exception {
+    new ByteStringOutputStreamBenchmark()
+        .testSdkCoreByteStringOutputStreamFewMixedWritesWithoutReuse();
+    new ByteStringOutputStreamBenchmark()
+        .testSdkCoreByteStringOutputStreamFewMixedWritesWithReuse(
+            new SdkCoreByteStringOutputStream());
+    new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamFewLargeWrites();
+    new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamFewSmallWrites();
+    new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamFewTinyWrites();
+    new ByteStringOutputStreamBenchmark()
+        .testSdkCoreByteStringOutputStreamManyMixedWritesWithoutReuse();
+    new ByteStringOutputStreamBenchmark()
+        .testSdkCoreByteStringOutputStreamManyMixedWritesWithReuse(
+            new SdkCoreByteStringOutputStream());
+    new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamManyLargeWrites();
+    new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamManySmallWrites();
+    new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamManyTinyWrites();
+  }
+
+  @Test
+  public void testNewVsCopy() throws Exception {
+    Blackhole bh =
+        new Blackhole(
+            "Today's password is swordfish. I understand instantiating Blackholes directly is dangerous.");
+    ArrayCopyState copyState = new ArrayCopyState();
+    copyState.copyVsNew = "512/2048";
+    copyState.setup();
+
+    ArrayNewState newState = new ArrayNewState();
+    newState.byteArraySize = 2048;
+    newState.setup();
+
+    new ByteStringOutputStreamBenchmark.NewVsCopy().testCopyArray(copyState, bh);
+    new ByteStringOutputStreamBenchmark.NewVsCopy().testNewArray(newState, bh);
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java
new file mode 100644
index 00000000000..112032e1af0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+  // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+  // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+  // of appending.
+  private static final int DEFAULT_CAPACITY = 128;
+
+  // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+  // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+  // of 2.
+  //
+  // This number should be tuned periodically as hardware changes.
+  private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+  // ByteString to be concatenated to create the result
+  private ByteString result;
+
+  // Current buffer to which we are writing
+  private byte[] buffer;
+
+  // Location in buffer[] to which we write the next byte.
+  private int bufferPos;
+
+  /** Creates a new output stream with a default capacity. */
+  public ByteStringOutputStream() {
+    this(DEFAULT_CAPACITY);
+  }
+
+  /**
+   * Creates a new output stream with the specified initial capacity.
+   *
+   * @param initialCapacity the initial capacity of the output stream.
+   */
+  public ByteStringOutputStream(int initialCapacity) {
+    if (initialCapacity < 0) {
+      throw new IllegalArgumentException("Initial capacity < 0");
+    }
+    this.buffer = new byte[initialCapacity];
+    this.result = ByteString.EMPTY;
+  }
+
+  @Override
+  public void write(int b) {
+    if (bufferPos == buffer.length) {
+      // We want to increase our total capacity by 50% but not larger than the max chunk size.
+      result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+      buffer = new byte[Math.min(Math.max(1, result.size()), MAX_CHUNK_SIZE)];
+      bufferPos = 0;
+    }
+    buffer[bufferPos++] = (byte) b;
+  }
+
+  @Override
+  public void write(byte[] b, int offset, int length) {
+    int remainingSpaceInBuffer = buffer.length - bufferPos;
+    while (length > remainingSpaceInBuffer) {
+      // Use up the current buffer
+      System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer);
+      offset += remainingSpaceInBuffer;
+      length -= remainingSpaceInBuffer;
+
+      result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+      // We want to increase our total capacity but not larger than the max chunk size.
+      remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE);
+      buffer = new byte[remainingSpaceInBuffer];
+      bufferPos = 0;
+    }
+
+    System.arraycopy(b, offset, buffer, bufferPos, length);
+    bufferPos += length;
+  }
+
+  /**
+   * Creates a byte string with the size and contents of this output stream.
+   *
+   * <p>Note that the caller must not invoke {#link {@link #toByteStringAndReset} as the internal
+   * buffer maybe mutated by a future {@link #write} mutating {@link ByteString}s returned in the
+   * past.
+   */
+  public ByteString toByteString() {
+    // We specifically choose to concatenate here since the user won't be re-using the buffer.
+    return result.concat(UnsafeByteOperations.unsafeWrap(buffer, 0, bufferPos));
+  }
+
+  /**
+   * Creates a byte string with the size and contents of this output stream and resets the output
+   * stream to be re-used possibly re-using any existing buffers.
+   */
+  public ByteString toByteStringAndReset() {
+    ByteString rval;
+    if (bufferPos > 0) {
+      final boolean copy;
+      // These thresholds are from the results of ByteStringOutputStreamBenchmark.CopyVewNew
+      // which show that at these thresholds we should copy the bytes instead to re-use
+      // the existing buffer since creating a new one is more expensive.
+      if (buffer.length <= 128) {
+        // Always copy small byte arrays to prevent large chunks of wasted space
+        // when dealing with very small amounts of data.
+        copy = true;
+      } else if (buffer.length <= 1024) {
+        copy = bufferPos <= buffer.length * 0.875;
+      } else if (buffer.length <= 8192) {
+        copy = bufferPos <= buffer.length * 0.75;
+      } else {
+        copy = bufferPos <= buffer.length * 0.4375;
+      }
+      if (copy) {
+        byte[] bufferCopy = new byte[bufferPos];
+        System.arraycopy(buffer, 0, bufferCopy, 0, bufferPos);
+        rval = result.concat(UnsafeByteOperations.unsafeWrap(bufferCopy));
+      } else {
+        rval = result.concat(UnsafeByteOperations.unsafeWrap(buffer, 0, bufferPos));
+        buffer = new byte[Math.min(rval.size(), MAX_CHUNK_SIZE)];
+      }
+      bufferPos = 0;
+    } else {
+      rval = result;
+    }
+    result = ByteString.EMPTY;
+    return rval;
+  }
+
+  /**
+   * Returns the current size of the output stream.
+   *
+   * @return the current size of the output stream
+   */
+  public int size() {
+    return result.size() + bufferPos;
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "<ByteStringOutputStream@%s size=%d>",
+        Integer.toHexString(System.identityHashCode(this)), size());
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java
new file mode 100644
index 00000000000..faa77cf6467
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ByteStringOutputStreamTest {
+
+  @Test
+  public void testInvalidInitialCapacity() throws Exception {
+    assertThrows(
+        "Initial capacity < 0",
+        IllegalArgumentException.class,
+        () -> new ByteStringOutputStream(-1));
+  }
+
+  @Test
+  public void testWriteBytes() throws Exception {
+    ByteStringOutputStream out = new ByteStringOutputStream();
+    assertEquals(0, out.size());
+    for (int numElements = 0; numElements < 1024 * 1024; numElements = next(numElements)) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dataOut = new DataOutputStream(baos);
+      try {
+        for (int i = 0; i < numElements; ++i) {
+          dataOut.writeInt(i);
+        }
+        dataOut.close();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      dataOut.close();
+      byte[] testBuffer = baos.toByteArray();
+
+      for (int pos = 0; pos < testBuffer.length; ) {
+        if (testBuffer[pos] == 0) {
+          out.write(testBuffer[pos]);
+          pos += 1;
+        } else {
+          int len = Math.min(testBuffer.length - pos, Math.abs(testBuffer[pos]));
+          out.write(testBuffer, pos, len);
+          pos += len;
+        }
+        assertEquals(pos, out.size());
+      }
+      assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteString());
+      assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteStringAndReset());
+    }
+  }
+
+  @Test
+  public void testWriteBytesWithZeroInitialCapacity() throws Exception {
+    for (int numElements = 0; numElements < 1024 * 1024; numElements = next(numElements)) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dataOut = new DataOutputStream(baos);
+      try {
+        for (int i = 0; i < numElements; ++i) {
+          dataOut.writeInt(i);
+        }
+        dataOut.close();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      dataOut.close();
+      byte[] testBuffer = baos.toByteArray();
+
+      ByteStringOutputStream out = new ByteStringOutputStream(0);
+      assertEquals(0, out.size());
+
+      for (int pos = 0; pos < testBuffer.length; ) {
+        if (testBuffer[pos] == 0) {
+          out.write(testBuffer[pos]);
+          pos += 1;
+        } else {
+          int len = Math.min(testBuffer.length - pos, Math.abs(testBuffer[pos]));
+          out.write(testBuffer, pos, len);
+          pos += len;
+        }
+        assertEquals(pos, out.size());
+      }
+      assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteString());
+      assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteStringAndReset());
+    }
+  }
+
+  // Grow the elements based upon an approximation of the fibonacci sequence.
+  private static int next(int current) {
+    double a = Math.max(1, current * (1 + Math.sqrt(5)) / 2.0);
+    return (int) Math.round(a);
+  }
+}
diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
index b9cbdc779d4..39076d02268 100644
--- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
+++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
@@ -53,8 +53,8 @@ import org.apache.beam.sdk.schemas.SchemaTranslation;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -353,7 +353,7 @@ public class ExpansionServiceTest {
 
   private static ExternalTransforms.ExternalConfigurationPayload
       encodeRowIntoExternalConfigurationPayload(Row row) {
-    ByteString.Output outputStream = ByteString.newOutput();
+    ByteStringOutputStream outputStream = new ByteStringOutputStream();
     try {
       SchemaCoder.of(row.getSchema()).encode(row, outputStream);
     } catch (IOException e) {
diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java
index 631d208d6e8..26b6a591738 100644
--- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java
+++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
@@ -1144,7 +1145,7 @@ public class JavaClassLookupTransformProviderTest {
   }
 
   private ByteString getProtoPayloadFromRow(Row row) {
-    ByteString.Output outputStream = ByteString.newOutput();
+    ByteStringOutputStream outputStream = new ByteStringOutputStream();
     try {
       SchemaCoder.of(row.getSchema()).encode(row, outputStream);
     } catch (IOException e) {
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
index d78191ba8b0..09422e07950 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
@@ -35,6 +35,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
@@ -236,29 +237,27 @@ public class BeamFnDataOutboundAggregator {
   private Elements.Builder convertBufferForTransmission() {
     Elements.Builder bufferedElements = Elements.newBuilder();
     for (Map.Entry<String, Receiver<?>> entry : outputDataReceivers.entrySet()) {
-      if (entry.getValue().getOutput().size() == 0) {
+      if (entry.getValue().bufferedSize() == 0) {
         continue;
       }
-      ByteString bytes = entry.getValue().getOutput().toByteString();
+      ByteString bytes = entry.getValue().toByteStringAndResetBuffer();
       bufferedElements
           .addDataBuilder()
           .setInstructionId(processBundleRequestIdSupplier.get())
           .setTransformId(entry.getKey())
           .setData(bytes);
-      entry.getValue().resetOutput();
     }
     for (Map.Entry<TimerEndpoint, Receiver<?>> entry : outputTimersReceivers.entrySet()) {
-      if (entry.getValue().getOutput().size() == 0) {
+      if (entry.getValue().bufferedSize() == 0) {
         continue;
       }
-      ByteString bytes = entry.getValue().getOutput().toByteString();
+      ByteString bytes = entry.getValue().toByteStringAndResetBuffer();
       bufferedElements
           .addTimersBuilder()
           .setInstructionId(processBundleRequestIdSupplier.get())
           .setTransformId(entry.getKey().pTransformId)
           .setTimerFamilyId(entry.getKey().timerFamilyId)
           .setTimers(bytes);
-      entry.getValue().resetOutput();
     }
     bytesWrittenSinceFlush = 0L;
     return bufferedElements;
@@ -323,13 +322,13 @@ public class BeamFnDataOutboundAggregator {
 
   @VisibleForTesting
   class Receiver<T> implements FnDataReceiver<T> {
-    private final ByteString.Output output;
+    private final ByteStringOutputStream output;
     private final Coder<T> coder;
     private long perBundleByteCount;
     private long perBundleElementCount;
 
     public Receiver(Coder<T> coder) {
-      this.output = ByteString.newOutput();
+      this.output = new ByteStringOutputStream();
       this.coder = coder;
       this.perBundleByteCount = 0L;
       this.perBundleElementCount = 0L;
@@ -351,10 +350,6 @@ public class BeamFnDataOutboundAggregator {
       }
     }
 
-    public ByteString.Output getOutput() {
-      return output;
-    }
-
     public long getByteCount() {
       return perBundleByteCount;
     }
@@ -363,8 +358,12 @@ public class BeamFnDataOutboundAggregator {
       return perBundleElementCount;
     }
 
-    public void resetOutput() {
-      this.output.reset();
+    public int bufferedSize() {
+      return output.size();
+    }
+
+    public ByteString toByteStringAndResetBuffer() {
+      return this.output.toByteStringAndReset();
     }
 
     public void resetStats() {
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
index 9a0b15f5927..d55c09120ef 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 
 /**
@@ -80,7 +81,7 @@ public class DataStreams {
    */
   public static final class ElementDelimitedOutputStream extends OutputStream {
     private final OutputChunkConsumer<ByteString> consumer;
-    private final ByteString.Output output;
+    private final ByteStringOutputStream output;
     private final int maximumChunkSize;
     int previousPosition;
 
@@ -88,7 +89,7 @@ public class DataStreams {
         OutputChunkConsumer<ByteString> consumer, int maximumChunkSize) {
       this.consumer = consumer;
       this.maximumChunkSize = maximumChunkSize;
-      this.output = ByteString.newOutput(maximumChunkSize);
+      this.output = new ByteStringOutputStream(maximumChunkSize);
     }
 
     public void delimitElement() throws IOException {
@@ -139,8 +140,7 @@ public class DataStreams {
 
     /** Can only be called if at least one byte has been written. */
     private void internalFlush() throws IOException {
-      consumer.read(output.toByteString());
-      output.reset();
+      consumer.read(output.toByteStringAndReset());
       // Set the previous position to an invalid position representing that a previous buffer
       // was written to.
       previousPosition = -1;
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
index 82d35e5ab04..c1e8e4293d4 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.fn.test.TestExecutors;
 import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -203,7 +203,7 @@ public class BeamFnDataInboundObserver2Test {
   }
 
   private BeamFnApi.Elements dataWith(String... values) throws Exception {
-    ByteString.Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     for (String value : values) {
       CODER.encode(valueInGlobalWindow(value), output);
     }
@@ -222,7 +222,7 @@ public class BeamFnDataInboundObserver2Test {
   }
 
   private BeamFnApi.Elements timerWith(String... values) throws Exception {
-    ByteString.Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     for (String value : values) {
       CODER.encode(valueInGlobalWindow(value), output);
     }
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
index 4f4cb082032..c0b1a6c24ab 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.junit.Rule;
@@ -98,7 +99,7 @@ public class BeamFnDataInboundObserverTest {
   }
 
   private ByteString dataWith(String... values) throws Exception {
-    ByteString.Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     for (String value : values) {
       CODER.encode(valueInGlobalWindow(value), output);
     }
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java
index 9937fcafe12..8b2adc6f2e2 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.fn.test.TestStreams;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.hamcrest.Matchers;
 import org.junit.Test;
@@ -145,7 +145,7 @@ public class BeamFnDataOutboundAggregatorTest {
     } else {
       receiver = Iterables.getOnlyElement(aggregator.outputDataReceivers.values());
     }
-    assertEquals(0L, receiver.getOutput().size());
+    assertEquals(0L, receiver.bufferedSize());
     assertEquals(102L, receiver.getByteCount());
     assertEquals(2L, receiver.getElementCount());
 
@@ -155,7 +155,7 @@ public class BeamFnDataOutboundAggregatorTest {
     aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
     // Test that receiver stats have been reset after
     // sendOrCollectBufferedDataAndFinishOutboundStreams.
-    assertEquals(0L, receiver.getOutput().size());
+    assertEquals(0L, receiver.bufferedSize());
     assertEquals(0L, receiver.getByteCount());
     assertEquals(0L, receiver.getElementCount());
 
@@ -344,7 +344,7 @@ public class BeamFnDataOutboundAggregatorTest {
 
   BeamFnApi.Elements.Builder messageWithDataBuilder(LogicalEndpoint endpoint, byte[]... datum)
       throws IOException {
-    ByteString.Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     for (byte[] data : datum) {
       CODER.encode(data, output);
     }
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
index 1e128747839..dfb10571a2b 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.fn.stream.DataStreams.DataStreamDecoder;
 import org.apache.beam.sdk.fn.stream.DataStreams.ElementDelimitedOutputStream;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
@@ -145,7 +146,7 @@ public class DataStreamsTest {
     }
 
     private ByteString encode(String... values) throws IOException {
-      ByteString.Output out = ByteString.newOutput();
+      ByteStringOutputStream out = new ByteStringOutputStream();
       for (String value : values) {
         StringUtf8Coder.of().encode(value, out);
       }
@@ -153,7 +154,7 @@ public class DataStreamsTest {
     }
 
     private <T> void testDecoderWith(Coder<T> coder, T... expected) throws IOException {
-      ByteString.Output output = ByteString.newOutput();
+      ByteStringOutputStream output = new ByteStringOutputStream();
       for (T value : expected) {
         int size = output.size();
         coder.encode(value, output);
diff --git a/sdks/java/harness/jmh/build.gradle b/sdks/java/harness/jmh/build.gradle
index 4d50c717b55..17860f172d3 100644
--- a/sdks/java/harness/jmh/build.gradle
+++ b/sdks/java/harness/jmh/build.gradle
@@ -31,8 +31,14 @@ configurations {
 }
 
 dependencies {
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
     implementation project(path: ":sdks:java:harness", configuration: "shadow")
-    implementation project(":runners:java-fn-execution")
+    implementation project(path: ":runners:java-fn-execution")
+    implementation project(path: ":model:pipeline", configuration: "shadow")
+    implementation library.java.vendored_grpc_1_43_2
+    implementation library.java.vendored_guava_26_0_jre
+    implementation library.java.slf4j_api
+    implementation library.java.joda_time
     runtimeOnly library.java.slf4j_jdk14
     jammAgent library.java.jamm
 }
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 24b6f8f73c7..50402e472c8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -106,6 +106,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -728,7 +729,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
               public void reset() {}
 
               private ByteString encodeProgress(double value) throws IOException {
-                ByteString.Output output = ByteString.newOutput();
+                ByteStringOutputStream output = new ByteStringOutputStream();
                 IterableCoder.of(DoubleCoder.of()).encode(Arrays.asList(value), output);
                 return output.toByteString();
               }
@@ -1514,7 +1515,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
     // Encode window splits.
     if (windowedSplitResult != null
         && windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != null) {
-      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      ByteStringOutputStream primaryInOtherWindowsBytes = new ByteStringOutputStream();
       try {
         fullInputCoder.encode(
             windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot(),
@@ -1531,7 +1532,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
     }
     if (windowedSplitResult != null
         && windowedSplitResult.getResidualInUnprocessedWindowsRoot() != null) {
-      ByteString.Output bytesOut = ByteString.newOutput();
+      ByteStringOutputStream bytesOut = new ByteStringOutputStream();
       try {
         fullInputCoder.encode(windowedSplitResult.getResidualInUnprocessedWindowsRoot(), bytesOut);
       } catch (IOException e) {
@@ -1564,8 +1565,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
               .build());
     }
 
-    ByteString.Output primaryBytes = ByteString.newOutput();
-    ByteString.Output residualBytes = ByteString.newOutput();
+    ByteStringOutputStream primaryBytes = new ByteStringOutputStream();
+    ByteStringOutputStream residualBytes = new ByteStringOutputStream();
     // Encode element split from windowedSplitResult or from downstream element split. It's possible
     // that there is no element split.
     if (windowedSplitResult != null && windowedSplitResult.getResidualSplitRoot() != null) {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
index 39181d9f57c..51d83b501d4 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
@@ -32,7 +32,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
 import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 
 /**
@@ -128,7 +128,7 @@ public class BagUserState<T> {
           request.toBuilder().setClear(StateClearRequest.getDefaultInstance()));
     }
     if (!newValues.isEmpty()) {
-      ByteString.Output out = ByteString.newOutput();
+      ByteStringOutputStream out = new ByteStringOutputStream();
       for (T newValue : newValues) {
         // TODO: Replace with chunking output stream
         valueCoder.encode(newValue, out);
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
index e3c850e7cec..1974b617934 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.CombineFnUtil;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -118,7 +119,7 @@ public class FnApiStateAccessor<K> implements SideInputReader, StateBinder {
               checkState(
                   keyCoder != null, "Accessing state in unkeyed context, no key coder available");
 
-              ByteString.Output encodedKeyOut = ByteString.newOutput();
+              ByteStringOutputStream encodedKeyOut = new ByteStringOutputStream();
               try {
                 ((Coder) keyCoder).encode(key, encodedKeyOut, Coder.Context.NESTED);
               } catch (IOException e) {
@@ -131,7 +132,7 @@ public class FnApiStateAccessor<K> implements SideInputReader, StateBinder {
         memoizeFunction(
             currentWindowSupplier,
             window -> {
-              ByteString.Output encodedWindowOut = ByteString.newOutput();
+              ByteStringOutputStream encodedWindowOut = new ByteStringOutputStream();
               try {
                 windowCoder.encode(window, encodedWindowOut);
               } catch (IOException e) {
@@ -167,7 +168,7 @@ public class FnApiStateAccessor<K> implements SideInputReader, StateBinder {
     SideInputSpec sideInputSpec = sideInputSpecMap.get(tag);
     checkArgument(sideInputSpec != null, "Attempting to access unknown side input %s.", view);
 
-    ByteString.Output encodedWindowOut = ByteString.newOutput();
+    ByteStringOutputStream encodedWindowOut = new ByteStringOutputStream();
     try {
       sideInputSpec
           .getWindowCoder()
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
index 39f735d98ef..718b440b6a4 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
@@ -42,7 +43,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table.Ce
 public class FnApiTimerBundleTracker<K> {
   private final Supplier<ByteString> encodedCurrentKeySupplier;
   private final Supplier<ByteString> encodedCurrentWindowSupplier;
-  private Table<ByteString, ByteString, Modifications<K>> timerModifications;
+  private final Table<ByteString, ByteString, Modifications<K>> timerModifications;
 
   @AutoValue
   public abstract static class TimerInfo<K> {
@@ -116,7 +117,7 @@ public class FnApiTimerBundleTracker<K> {
           Sets.newTreeSet(comparator),
           HashBasedTable.create());
     }
-  };
+  }
 
   public FnApiTimerBundleTracker(
       Coder<K> keyCoder,
@@ -131,9 +132,9 @@ public class FnApiTimerBundleTracker<K> {
               checkState(
                   keyCoder != null, "Accessing state in unkeyed context, no key coder available");
 
-              ByteString.Output encodedKeyOut = ByteString.newOutput();
+              ByteStringOutputStream encodedKeyOut = new ByteStringOutputStream();
               try {
-                ((Coder) keyCoder).encode(key, encodedKeyOut, Coder.Context.NESTED);
+                keyCoder.encode(key, encodedKeyOut, Coder.Context.NESTED);
               } catch (IOException e) {
                 throw new IllegalStateException(e);
               }
@@ -143,7 +144,7 @@ public class FnApiTimerBundleTracker<K> {
         memoizeFunction(
             currentWindowSupplier,
             window -> {
-              ByteString.Output encodedWindowOut = ByteString.newOutput();
+              ByteStringOutputStream encodedWindowOut = new ByteStringOutputStream();
               try {
                 windowCoder.encode(window, encodedWindowOut);
               } catch (IOException e) {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java
index f36423a62a2..409a4831164 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java
@@ -26,6 +26,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 
 /**
@@ -70,7 +71,7 @@ public class MultimapSideInput<K, V> implements MultimapView<K, V> {
 
   @Override
   public Iterable<V> get(K k) {
-    ByteString.Output output = ByteString.newOutput();
+    ByteStringOutputStream output = new ByteStringOutputStream();
     try {
       keyCoder.encode(k, output);
     } catch (IOException e) {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
index 5757dc9062a..3078f69c3f1 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
 import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
 import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
@@ -357,7 +358,7 @@ public class MultimapUserState<K, V> {
 
   private ByteString encodeValues(Iterable<V> values) {
     try {
-      ByteString.Output output = ByteString.newOutput();
+      ByteStringOutputStream output = new ByteStringOutputStream();
       for (V value : values) {
         valueCoder.encode(value, output);
       }
@@ -373,7 +374,7 @@ public class MultimapUserState<K, V> {
 
   private StateRequest createUserStateRequest(K key) {
     try {
-      ByteString.Output output = ByteString.newOutput();
+      ByteStringOutputStream output = new ByteStringOutputStream();
       mapKeyCoder.encode(key, output);
       StateRequest.Builder request = userStateRequest.toBuilder();
       request.getStateKeyBuilder().getMultimapUserStateBuilder().setMapKey(output.toByteString());
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 27da531e53f..de8be4252da 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -132,6 +132,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -1337,7 +1338,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
     }
 
     private ByteString encode(String... values) throws IOException {
-      ByteString.Output out = ByteString.newOutput();
+      ByteStringOutputStream out = new ByteStringOutputStream();
       for (String value : values) {
         StringUtf8Coder.of().encode(value, out);
       }
@@ -2997,8 +2998,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
     }
 
     private static SplitResult createSplitResult(double fractionOfRemainder) {
-      ByteString.Output primaryBytes = ByteString.newOutput();
-      ByteString.Output residualBytes = ByteString.newOutput();
+      ByteStringOutputStream primaryBytes = new ByteStringOutputStream();
+      ByteStringOutputStream residualBytes = new ByteStringOutputStream();
       try {
         DoubleCoder.of().encode(fractionOfRemainder, primaryBytes);
         DoubleCoder.of().encode(1 - fractionOfRemainder, residualBytes);
@@ -3181,7 +3182,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
       SplitResult expectedElementSplit = createSplitResult(0);
       BundleApplication expectedElementSplitPrimary =
           Iterables.getOnlyElement(expectedElementSplit.getPrimaryRoots());
-      ByteString.Output primaryBytes = ByteString.newOutput();
+      ByteStringOutputStream primaryBytes = new ByteStringOutputStream();
       inputCoder.encode(
           WindowedValue.of(
               KV.of(
@@ -3198,7 +3199,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
               .build();
       DelayedBundleApplication expectedElementSplitResidual =
           Iterables.getOnlyElement(expectedElementSplit.getResidualRoots());
-      ByteString.Output residualBytes = ByteString.newOutput();
+      ByteStringOutputStream residualBytes = new ByteStringOutputStream();
       inputCoder.encode(
           WindowedValue.of(
               KV.of(
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 450628a31a9..e2c0c3f0b26 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -137,6 +137,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerFamilyDeclarati
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.util.DoFnWithExecutionInformation;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
@@ -1095,9 +1096,9 @@ public class ProcessBundleHandlerTest {
     ProcessBundleHandler handler =
         setupProcessBundleHandlerForSimpleRecordingDoFn(dataOutput, timerOutput, false);
 
-    ByteString.Output encodedData = ByteString.newOutput();
+    ByteStringOutputStream encodedData = new ByteStringOutputStream();
     KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
-    ByteString.Output encodedTimer = ByteString.newOutput();
+    ByteStringOutputStream encodedTimer = new ByteStringOutputStream();
     Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
         .encode(
             Timer.of(
@@ -1160,7 +1161,7 @@ public class ProcessBundleHandlerTest {
     ProcessBundleHandler handler =
         setupProcessBundleHandlerForSimpleRecordingDoFn(dataOutput, timerOutput, false);
 
-    ByteString.Output encodedData = ByteString.newOutput();
+    ByteStringOutputStream encodedData = new ByteStringOutputStream();
     KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
 
     assertThrows(
@@ -1216,7 +1217,7 @@ public class ProcessBundleHandlerTest {
     ProcessBundleHandler handler =
         setupProcessBundleHandlerForSimpleRecordingDoFn(dataOutput, timerOutput, false);
 
-    ByteString.Output encodedTimer = ByteString.newOutput();
+    ByteStringOutputStream encodedTimer = new ByteStringOutputStream();
     Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
         .encode(
             Timer.of(
@@ -1310,7 +1311,7 @@ public class ProcessBundleHandlerTest {
     ProcessBundleHandler handler =
         setupProcessBundleHandlerForSimpleRecordingDoFn(dataOutput, timerOutput, true);
 
-    ByteString.Output encodedTimer = ByteString.newOutput();
+    ByteStringOutputStream encodedTimer = new ByteStringOutputStream();
     Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
         .encode(
             Timer.of(
@@ -1446,7 +1447,7 @@ public class ProcessBundleHandlerTest {
 
     Mockito.doAnswer(
             (invocation) -> {
-              ByteString.Output encodedData = ByteString.newOutput();
+              ByteStringOutputStream encodedData = new ByteStringOutputStream();
               StringUtf8Coder.of().encode("A", encodedData);
               String instructionId = invocation.getArgument(0, String.class);
               CloseableFnDataReceiver<BeamFnApi.Elements> data =
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
index 7b1cef87369..7b2bce0641b 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.fn.harness.Cache;
 import org.apache.beam.fn.harness.Caches;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -252,7 +253,7 @@ public class BagUserStateTest {
   }
 
   private ByteString encode(String... values) throws IOException {
-    ByteString.Output out = ByteString.newOutput();
+    ByteStringOutputStream out = new ByteStringOutputStream();
     for (String value : values) {
       StringUtf8Coder.of().encode(value, out);
     }
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
index 954b37d294e..17ef134b8e5 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
@@ -37,6 +37,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
@@ -67,7 +68,7 @@ public class FakeBeamFnStateClient implements BeamFnStateClient {
                 initialData,
                 (KV<Coder<?>, List<?>> coderAndValues) -> {
                   List<ByteString> chunks = new ArrayList<>();
-                  ByteString.Output output = ByteString.newOutput();
+                  ByteStringOutputStream output = new ByteStringOutputStream();
                   for (Object value : coderAndValues.getValue()) {
                     try {
                       ((Coder<Object>) coderAndValues.getKey()).encode(value, output);
@@ -75,7 +76,7 @@ public class FakeBeamFnStateClient implements BeamFnStateClient {
                       throw new RuntimeException(e);
                     }
                     if (output.size() >= chunkSize) {
-                      ByteString chunk = output.toByteString();
+                      ByteString chunk = output.toByteStringAndReset();
                       int i = 0;
                       for (; i + chunkSize <= chunk.size(); i += chunkSize) {
                         // We specifically use a copy of the bytes instead of a proper substring
@@ -88,7 +89,6 @@ public class FakeBeamFnStateClient implements BeamFnStateClient {
                         chunks.add(
                             ByteString.copyFrom(chunk.substring(i, chunk.size()).toByteArray()));
                       }
-                      output.reset();
                     }
                   }
                   // Add the last chunk
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java
index 6fea5e10617..f0412fdddc6 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java
@@ -28,6 +28,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -142,7 +143,7 @@ public class MultimapSideInputTest {
   }
 
   private StateKey key(byte[] key) throws IOException {
-    ByteString.Output out = ByteString.newOutput();
+    ByteStringOutputStream out = new ByteStringOutputStream();
     ByteArrayCoder.of().encode(key, out);
     return StateKey.newBuilder()
         .setMultimapSideInput(
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapUserStateTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapUserStateTest.java
index fccd8e7c1bd..dbd2add8829 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapUserStateTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapUserStateTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -1060,7 +1061,7 @@ public class MultimapUserStateTest {
   }
 
   private ByteString encode(String... values) throws IOException {
-    ByteString.Output out = ByteString.newOutput();
+    ByteStringOutputStream out = new ByteStringOutputStream();
     for (String value : values) {
       StringUtf8Coder.of().encode(value, out);
     }
@@ -1068,7 +1069,7 @@ public class MultimapUserStateTest {
   }
 
   private ByteString encode(byte[]... values) throws IOException {
-    ByteString.Output out = ByteString.newOutput();
+    ByteStringOutputStream out = new ByteStringOutputStream();
     for (byte[] value : values) {
       ByteArrayCoder.of().encode(value, out);
     }
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
index 8b05def2bfc..b6e8fe7962c 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.fn.harness.Cache;
 import org.apache.beam.fn.harness.Caches;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -297,7 +298,7 @@ public class StateBackedIterableTest {
   }
 
   private static ByteString encode(String... values) throws IOException {
-    ByteString.Output out = ByteString.newOutput();
+    ByteStringOutputStream out = new ByteStringOutputStream();
     for (String value : values) {
       StringUtf8Coder.of().encode(value, out);
     }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/Uuid.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/Uuid.java
index 969233eda92..16e5b116432 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/Uuid.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/Uuid.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Base64;
 import java.util.UUID;
 import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 
 /** A Uuid storable in a Pub/Sub Lite attribute. */
 @DefaultCoder(UuidCoder.class)
@@ -42,7 +43,7 @@ public abstract class Uuid {
 
   public static Uuid random() {
     UUID uuid = UUID.randomUUID();
-    ByteString.Output output = ByteString.newOutput(16);
+    ByteStringOutputStream output = new ByteStringOutputStream(16);
     DataOutputStream stream = new DataOutputStream(output);
     try {
       stream.writeLong(uuid.getMostSignificantBits());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
index bb8ddc4034a..39750f2c673 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
@@ -35,9 +35,9 @@ import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.schemas.SchemaTranslation;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.hamcrest.Matchers;
@@ -205,7 +205,7 @@ public class PubsubIOExternalTest {
   }
 
   private static ExternalTransforms.ExternalConfigurationPayload encodeRow(Row row) {
-    ByteString.Output outputStream = ByteString.newOutput();
+    ByteStringOutputStream outputStream = new ByteStringOutputStream();
     try {
       SchemaCoder.of(row.getSchema()).encode(row, outputStream);
     } catch (IOException e) {
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index d9489a8fc7c..b62da988d42 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -43,8 +43,8 @@ import org.apache.beam.sdk.schemas.SchemaTranslation;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -375,7 +375,7 @@ public class KafkaIOExternalTest {
   }
 
   private static ExternalConfigurationPayload encodeRow(Row row) {
-    ByteString.Output outputStream = ByteString.newOutput();
+    ByteStringOutputStream outputStream = new ByteStringOutputStream();
     try {
       SchemaCoder.of(row.getSchema()).encode(row, outputStream);
     } catch (IOException e) {