You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/20 06:22:22 UTC
[beam] branch master updated: [BEAM-13015, #21250] Optimize encoding to a ByteString (#22345)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4821e035c14 [BEAM-13015, #21250] Optimize encoding to a ByteString (#22345)
4821e035c14 is described below
commit 4821e035c148df1ed7eb9e7054e47fe2a7003a1f
Author: Luke Cwik <lc...@google.com>
AuthorDate: Tue Jul 19 23:22:16 2022 -0700
[BEAM-13015, #21250] Optimize encoding to a ByteString (#22345)
* [BEAM-13015, #21250] Optimize encoding to a ByteString
This leverages the fact that all encoding is done from a thread safe manner
allowing us to drop the syncrhonization that ByteString.Output adds and it
also optimizes the max chunk size based upon performance measurements and
the ratio for how full a byte[] should be for the final copy vs concatenate
decision.
Below are the results of several scenarios in which we compare the protobuf
vs new solution:
```
Benchmark Mode Cnt Score Error Units
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewLargeWrites thrpt 25 1149267.797 ± 15366.677 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewMixedWritesWithReuse thrpt 25 832816.697 ± 4236.341 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewMixedWritesWithoutReuse thrpt 25 916629.194 ± 5669.323 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewSmallWrites thrpt 25 14175167.566 ± 88540.030 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewTinyWrites thrpt 25 22471597.238 ± 186098.311 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyLargeWrites thrpt 25 610.218 ± 5.019 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyMixedWritesWithReuse thrpt 25 484.413 ± 35.194 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyMixedWritesWithoutReuse thrpt 25 559.983 ± 6.228 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManySmallWrites thrpt 25 10969.839 ± 88.199 ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyTinyWrites thrpt 25 40822.925 ± 191.402 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewLargeWrites thrpt 25 1167673.532 ± 9747.507 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewMixedWritesWithReuse thrpt 25 1576528.242 ± 15883.083 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewMixedWritesWithoutReuse thrpt 25 1009766.655 ± 8700.273 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewSmallWrites thrpt 25 33293140.679 ± 233693.771 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewTinyWrites thrpt 25 86841328.763 ± 729741.769 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyLargeWrites thrpt 25 1058.150 ± 15.192 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyMixedWritesWithReuse thrpt 25 937.249 ± 9.264 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyMixedWritesWithoutReuse thrpt 25 959.671 ± 13.989 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManySmallWrites thrpt 25 12601.065 ± 92.375 ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyTinyWrites thrpt 25 65277.229 ± 3795.676 ops/s
```
The copy vs concatenate numbers come from these results which show that 256k seems to
be a pretty good chunk size since the larger chunks seem to cost more per byte to allocate.
They also show at what threshold should we currently copy the bytes vs concatenate a partially
full buffer and allocate a new one:
```
Benchmark newSize copyVsNew Mode Cnt Score Error Units
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 512/1024 thrpt 25 19744209.563 ± 148287.185 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 640/1024 thrpt 25 15738981.338 ± 103684.000 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 768/1024 thrpt 25 12778194.652 ± 202212.679 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 896/1024 thrpt 25 11053602.109 ± 103120.446 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 4096/8192 thrpt 25 2961435.128 ± 25895.802 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 5120/8192 thrpt 25 2498594.030 ± 26051.674 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 6144/8192 thrpt 25 2173161.031 ± 20014.569 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 7168/8192 thrpt 25 1917545.913 ± 21470.719 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 20480/65536 thrpt 25 537872.049 ± 5525.024 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 24576/65536 thrpt 25 371312.042 ± 4450.715 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 28672/65536 thrpt 25 306027.442 ± 2830.503 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 32768/65536 thrpt 25 263933.096 ± 1833.603 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 131072/262144 thrpt 25 80224.558 ± 1192.994 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 163840/262144 thrpt 25 65311.283 ± 775.920 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 196608/262144 thrpt 25 54510.877 ± 797.775 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 229376/262144 thrpt 25 46808.185 ± 515.039 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 524288/1048576 thrpt 25 17729.937 ± 301.199 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 655360/1048576 thrpt 25 12996.953 ± 228.552 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 786432/1048576 thrpt 25 11383.122 ± 384.086 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray N/A 917504/1048576 thrpt 25 9915.318 ± 285.995 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray 1024 N/A thrpt 25 10023631.563 ± 61317.055 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray 8192 N/A thrpt 25 2109120.041 ± 17482.682 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray 65536 N/A thrpt 25 318492.630 ± 3006.827 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray 262144 N/A thrpt 25 79228.892 ± 725.230 ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray 1048576 N/A thrpt 25 13089.221 ± 73.535 ops/s
```
The difference is minor in the `ProcessBundleBenchmark` as there is not
enough data being passed around for it to make a major difference:
```
Before
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 25 1156.159 ± 9.001 ops/s
ProcessBundleBenchmark.testTinyBundle thrpt 25 29641.444 ± 125.041 ops/s
After
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 25 1168.977 ± 25.848 ops/s
ProcessBundleBenchmark.testTinyBundle thrpt 25 29664.783 ± 99.791 ops/s
```
* fixup comment and address analyzeClassDependencies failures
---
.../core/construction/ValidateRunnerXlangTest.java | 4 +-
.../core/metrics/MonitoringInfoEncodings.java | 11 +-
.../beam/runners/dataflow/worker/PubsubSink.java | 5 +-
.../beam/runners/dataflow/worker/StateFetcher.java | 4 +-
.../worker/StreamingModeExecutionContext.java | 7 +-
.../dataflow/worker/StreamingSideInputFetcher.java | 4 +-
.../beam/runners/dataflow/worker/WindmillSink.java | 5 +-
.../dataflow/worker/WindmillStateInternals.java | 19 +-
.../control/RegisterAndProcessBundleOperation.java | 3 +-
.../graph/CreateExecutableStageNodeFunction.java | 3 +-
.../worker/graph/RegisterNodeFunction.java | 3 +-
.../runners/dataflow/worker/StateFetcherTest.java | 10 +-
.../worker/StreamingDataflowWorkerTest.java | 4 +-
.../worker/StreamingGroupAlsoByWindowFnsTest.java | 3 +-
...reamingGroupAlsoByWindowsReshuffleDoFnTest.java | 3 +-
.../worker/WindmillStateInternalsTest.java | 3 +-
.../dataflow/worker/WindmillStateReaderTest.java | 4 +-
.../fnexecution/control/RemoteExecutionTest.java | 5 +-
sdks/java/core/jmh/build.gradle | 3 +
.../jmh/util/ByteStringOutputStreamBenchmark.java | 416 +++++++++++++++++++++
.../apache/beam/sdk/jmh/util/package-info.java} | 24 +-
.../util/ByteStringOutputStreamBenchmarkTest.java | 88 +++++
.../beam/sdk/util/ByteStringOutputStream.java | 171 +++++++++
.../beam/sdk/util/ByteStringOutputStreamTest.java | 115 ++++++
.../expansion/service/ExpansionServiceTest.java | 4 +-
.../JavaClassLookupTransformProviderTest.java | 3 +-
.../sdk/fn/data/BeamFnDataOutboundAggregator.java | 27 +-
.../org/apache/beam/sdk/fn/stream/DataStreams.java | 8 +-
.../fn/data/BeamFnDataInboundObserver2Test.java | 6 +-
.../sdk/fn/data/BeamFnDataInboundObserverTest.java | 3 +-
.../fn/data/BeamFnDataOutboundAggregatorTest.java | 8 +-
.../apache/beam/sdk/fn/stream/DataStreamsTest.java | 5 +-
sdks/java/harness/jmh/build.gradle | 8 +-
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 11 +-
.../apache/beam/fn/harness/state/BagUserState.java | 4 +-
.../beam/fn/harness/state/FnApiStateAccessor.java | 7 +-
.../fn/harness/state/FnApiTimerBundleTracker.java | 11 +-
.../beam/fn/harness/state/MultimapSideInput.java | 3 +-
.../beam/fn/harness/state/MultimapUserState.java | 5 +-
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 11 +-
.../harness/control/ProcessBundleHandlerTest.java | 13 +-
.../beam/fn/harness/state/BagUserStateTest.java | 3 +-
.../fn/harness/state/FakeBeamFnStateClient.java | 6 +-
.../fn/harness/state/MultimapSideInputTest.java | 3 +-
.../fn/harness/state/MultimapUserStateTest.java | 5 +-
.../fn/harness/state/StateBackedIterableTest.java | 3 +-
.../beam/sdk/io/gcp/pubsublite/internal/Uuid.java | 3 +-
.../sdk/io/gcp/pubsub/PubsubIOExternalTest.java | 4 +-
.../beam/sdk/io/kafka/KafkaIOExternalTest.java | 4 +-
49 files changed, 950 insertions(+), 135 deletions(-)
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
index 8a13df9abd4..1ecd9041a75 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
@@ -34,13 +34,13 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -93,7 +93,7 @@ public class ValidateRunnerXlangTest {
.withFieldValue("data", data)
.build();
- ByteString.Output outputStream = ByteString.newOutput();
+ ByteStringOutputStream outputStream = new ByteStringOutputStream();
try {
RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
} catch (IOException e) {
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
index cfa082c6cb4..3d8cf2d941e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.joda.time.Instant;
@@ -32,7 +33,7 @@ public class MonitoringInfoEncodings {
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#DISTRIBUTION_INT64_TYPE}. */
public static ByteString encodeInt64Distribution(DistributionData data) {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
try {
VARINT_CODER.encode(data.count(), output);
VARINT_CODER.encode(data.sum(), output);
@@ -62,7 +63,7 @@ public class MonitoringInfoEncodings {
// TODO(BEAM-4374): Implement decodeDoubleDistribution(...)
public static ByteString encodeDoubleDistribution(
long count, double sum, double min, double max) {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
try {
VARINT_CODER.encode(count, output);
DOUBLE_CODER.encode(sum, output);
@@ -76,7 +77,7 @@ public class MonitoringInfoEncodings {
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#LATEST_INT64_TYPE}. */
public static ByteString encodeInt64Gauge(GaugeData data) {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
try {
VARINT_CODER.encode(data.timestamp().getMillis(), output);
VARINT_CODER.encode(data.value(), output);
@@ -99,7 +100,7 @@ public class MonitoringInfoEncodings {
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#SUM_INT64_TYPE}. */
public static ByteString encodeInt64Counter(long value) {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
try {
VARINT_CODER.encode(value, output);
} catch (IOException e) {
@@ -119,7 +120,7 @@ public class MonitoringInfoEncodings {
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#SUM_DOUBLE_TYPE}. */
public static ByteString encodeDoubleCounter(double value) {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
try {
DOUBLE_CODER.encode(value, output);
} catch (IOException e) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
index 6bd58fd9821..d8c670b7c55 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubSink.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -160,11 +161,11 @@ class PubsubSink<T> extends Sink<WindowedValue<T>> {
if (formatted.getAttributeMap() != null) {
pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
}
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
pubsubMessageBuilder.build().writeTo(output);
byteString = output.toByteString();
} else {
- ByteString.Output stream = ByteString.newOutput();
+ ByteStringOutputStream stream = new ByteStringOutputStream();
coder.encode(data.getValue(), stream, Coder.Context.OUTER);
byteString = stream.toByteString();
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
index 883822f4fc7..82ca67c8fb3 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
@@ -35,10 +35,10 @@ import org.apache.beam.sdk.transforms.Materializations.IterableView;
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
@@ -124,7 +124,7 @@ class StateFetcher {
Coder<SideWindowT> windowCoder = sideWindowStrategy.getWindowFn().windowCoder();
- ByteString.Output windowStream = ByteString.newOutput();
+ ByteStringOutputStream windowStream = new ByteStringOutputStream();
windowCoder.encode(sideWindow, windowStream, Coder.Context.OUTER);
@SuppressWarnings("unchecked")
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index da18f865500..601efcd5f01 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
@@ -429,7 +430,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
((UnboundedSource<?, UnboundedSource.CheckpointMark>) activeReader.getCurrentSource())
.getCheckpointMarkCoder();
if (checkpointCoder != null) {
- ByteString.Output stream = ByteString.newOutput();
+ ByteStringOutputStream stream = new ByteStringOutputStream();
try {
checkpointCoder.encode(checkpointMark, stream, Coder.Context.OUTER);
} catch (IOException e) {
@@ -738,10 +739,10 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
throw new IllegalStateException("writePCollectionViewData must follow a Combine.globally");
}
- ByteString.Output dataStream = ByteString.newOutput();
+ ByteStringOutputStream dataStream = new ByteStringOutputStream();
dataCoder.encode(data, dataStream, Coder.Context.OUTER);
- ByteString.Output windowStream = ByteString.newOutput();
+ ByteStringOutputStream windowStream = new ByteStringOutputStream();
windowCoder.encode(window, windowStream, Coder.Context.OUTER);
ensureStateful("Tried to write view data");
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
index c8c7e04cad1..12d98a885bd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
@@ -45,10 +45,10 @@ import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Parser;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -305,7 +305,7 @@ public class StreamingSideInputFetcher<InputT, W extends BoundedWindow> {
SideWindowT sideInputWindow =
(SideWindowT) view.getWindowMappingFn().getSideInputWindow(mainWindow);
- ByteString.Output windowStream = ByteString.newOutput();
+ ByteStringOutputStream windowStream = new ByteStringOutputStream();
try {
sideInputWindowCoder.encode(sideInputWindow, windowStream, Coder.Context.OUTER);
} catch (IOException e) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index 6696de6ca74..a755f318350 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
@@ -69,7 +70,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
Collection<? extends BoundedWindow> windows,
PaneInfo pane)
throws IOException {
- ByteString.Output stream = ByteString.newOutput();
+ ByteStringOutputStream stream = new ByteStringOutputStream();
PaneInfoCoder.INSTANCE.encode(pane, stream);
windowsCoder.encode(windows, stream, Coder.Context.OUTER);
return stream.toByteString();
@@ -135,7 +136,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
}
private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object) throws IOException {
- ByteString.Output stream = ByteString.newOutput();
+ ByteStringOutputStream stream = new ByteStringOutputStream();
coder.encode(object, stream, Coder.Context.OUTER);
return stream.toByteString();
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
index 5093b21c5a8..7eca9d3da39 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
@@ -80,6 +80,7 @@ import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -351,7 +352,7 @@ class WindmillStateInternals<K> implements StateInternals {
try {
// Use ByteString.Output rather than concatenation and String.format. We build these keys
// a lot, and this leads to better performance results. See associated benchmarks.
- ByteString.Output stream = ByteString.newOutput();
+ ByteStringOutputStream stream = new ByteStringOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8);
// stringKey starts and ends with a slash. We separate it from the
@@ -522,7 +523,7 @@ class WindmillStateInternals<K> implements StateInternals {
ByteString encoded = null;
if (cachedSize == -1 || modified) {
- ByteString.Output stream = ByteString.newOutput();
+ ByteStringOutputStream stream = new ByteStringOutputStream();
if (value != null) {
coder.encode(value, stream, Coder.Context.OUTER);
}
@@ -1047,7 +1048,7 @@ class WindmillStateInternals<K> implements StateInternals {
pendingAdds,
(elem, id) -> {
try {
- ByteString.Output elementStream = ByteString.newOutput();
+ ByteStringOutputStream elementStream = new ByteStringOutputStream();
elemCoder.encode(elem.getValue(), elementStream, Context.OUTER);
insertBuilder.addEntries(
SortedListEntry.newBuilder()
@@ -1249,7 +1250,7 @@ class WindmillStateInternals<K> implements StateInternals {
}
private ByteString protoKeyFromUserKey(K key) throws IOException {
- ByteString.Output keyStream = ByteString.newOutput();
+ ByteStringOutputStream keyStream = new ByteStringOutputStream();
stateKeyPrefix.writeTo(keyStream);
keyCoder.encode(key, keyStream, Context.OUTER);
return keyStream.toByteString();
@@ -1275,7 +1276,7 @@ class WindmillStateInternals<K> implements StateInternals {
for (K key : localAdditions) {
ByteString keyBytes = protoKeyFromUserKey(key);
- ByteString.Output valueStream = ByteString.newOutput();
+ ByteStringOutputStream valueStream = new ByteStringOutputStream();
valueCoder.encode(cachedValues.get(key), valueStream, Context.OUTER);
ByteString valueBytes = valueStream.toByteString();
@@ -1290,7 +1291,7 @@ class WindmillStateInternals<K> implements StateInternals {
localAdditions.clear();
for (K key : localRemovals) {
- ByteString.Output keyStream = ByteString.newOutput();
+ ByteStringOutputStream keyStream = new ByteStringOutputStream();
stateKeyPrefix.writeTo(keyStream);
keyCoder.encode(key, keyStream, Context.OUTER);
ByteString keyBytes = keyStream.toByteString();
@@ -1304,7 +1305,7 @@ class WindmillStateInternals<K> implements StateInternals {
V cachedValue = cachedValues.remove(key);
if (cachedValue != null) {
- ByteString.Output valueStream = ByteString.newOutput();
+ ByteStringOutputStream valueStream = new ByteStringOutputStream();
valueCoder.encode(cachedValues.get(key), valueStream, Context.OUTER);
}
}
@@ -1555,7 +1556,7 @@ class WindmillStateInternals<K> implements StateInternals {
private Future<V> getFutureForKey(K key) {
try {
- ByteString.Output keyStream = ByteString.newOutput();
+ ByteStringOutputStream keyStream = new ByteStringOutputStream();
stateKeyPrefix.writeTo(keyStream);
keyCoder.encode(key, keyStream, Context.OUTER);
return reader.valueFuture(keyStream.toByteString(), stateFamily, valueCoder);
@@ -1703,7 +1704,7 @@ class WindmillStateInternals<K> implements StateInternals {
bagUpdatesBuilder = commitBuilder.addBagUpdatesBuilder();
}
for (T value : localAdditions) {
- ByteString.Output stream = ByteString.newOutput();
+ ByteStringOutputStream stream = new ByteStringOutputStream();
// Encode the value
elemCoder.encode(value, stream, Coder.Context.OUTER);
ByteString encoded = stream.toByteString();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index 7be2820dee7..580450aed4d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -71,6 +71,7 @@ import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
@@ -668,7 +669,7 @@ public class RegisterAndProcessBundleOperation extends Operation {
}
static ByteString encodeAndConcat(Iterable<Object> values, Coder valueCoder) throws IOException {
- ByteString.Output out = ByteString.newOutput();
+ ByteStringOutputStream out = new ByteStringOutputStream();
if (values != null) {
for (Object value : values) {
int size = out.size();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index ee79ee39a46..0e9b86beb9b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -79,6 +79,7 @@ import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollectionView;
@@ -216,7 +217,7 @@ public class CreateExecutableStageNodeFunction
String coderId = "generatedCoder" + idGenerator.getId();
String windowingStrategyId;
- try (ByteString.Output output = ByteString.newOutput()) {
+ try (ByteStringOutputStream output = new ByteStringOutputStream()) {
try {
Coder<?> javaCoder =
CloudObjects.coderFromCloudObject(CloudObject.fromSpec(instructionOutput.getCodec()));
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
index befaccbd6ad..927bea9f55a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
@@ -72,6 +72,7 @@ import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
@@ -234,7 +235,7 @@ public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>
String coderId = "generatedCoder" + idGenerator.getId();
instructionOutputNodeToCoderIdBuilder.put(node, coderId);
- try (ByteString.Output output = ByteString.newOutput()) {
+ try (ByteStringOutputStream output = new ByteStringOutputStream()) {
try {
Coder<?> javaCoder =
CloudObjects.coderFromCloudObject(CloudObject.fromSpec(instructionOutput.getCodec()));
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StateFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StateFetcherTest.java
index f16ff49536f..e87ec6ae34c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StateFetcherTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StateFetcherTest.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
@@ -73,7 +74,7 @@ public class StateFetcherTest {
public void testFetchGlobalDataBasic() throws Exception {
StateFetcher fetcher = new StateFetcher(server);
- ByteString.Output stream = ByteString.newOutput();
+ ByteStringOutputStream stream = new ByteStringOutputStream();
ListCoder.of(StringUtf8Coder.of()).encode(Arrays.asList("data"), stream, Coder.Context.OUTER);
ByteString encodedIterable = stream.toByteString();
@@ -126,7 +127,7 @@ public class StateFetcherTest {
public void testFetchGlobalDataNull() throws Exception {
StateFetcher fetcher = new StateFetcher(server);
- ByteString.Output stream = ByteString.newOutput();
+ ByteStringOutputStream stream = new ByteStringOutputStream();
ListCoder.of(VoidCoder.of()).encode(Arrays.asList((Void) null), stream, Coder.Context.OUTER);
ByteString encodedIterable = stream.toByteString();
@@ -179,10 +180,9 @@ public class StateFetcherTest {
public void testFetchGlobalDataCacheOverflow() throws Exception {
Coder<List<String>> coder = ListCoder.of(StringUtf8Coder.of());
- ByteString.Output stream = ByteString.newOutput();
+ ByteStringOutputStream stream = new ByteStringOutputStream();
coder.encode(Arrays.asList("data1"), stream, Coder.Context.OUTER);
- ByteString encodedIterable1 = stream.toByteString();
- stream = ByteString.newOutput();
+ ByteString encodedIterable1 = stream.toByteStringAndReset();
coder.encode(Arrays.asList("data2"), stream, Coder.Context.OUTER);
ByteString encodedIterable2 = stream.toByteString();
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index 338a1d7eb14..d52680fcdda 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -131,6 +131,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.DoFnInfo;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -144,7 +145,6 @@ import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString.Output;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheStats;
@@ -661,7 +661,7 @@ public class StreamingDataflowWorkerTest {
private ByteString addPaneTag(PaneInfo pane, byte[] windowBytes)
throws CoderException, IOException {
- Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
PaneInfo.PaneInfoCoder.INSTANCE.encode(pane, output, Context.OUTER);
output.write(windowBytes);
return output.toByteString();
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
index 0a3a997a2cf..7b8e8991a90 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
@@ -71,6 +71,7 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
@@ -169,7 +170,7 @@ public class StreamingGroupAlsoByWindowFnsTest {
Coder<Collection<? extends BoundedWindow>> windowsCoder =
(Coder) CollectionCoder.of(windowCoder);
- ByteString.Output dataOutput = ByteString.newOutput();
+ ByteStringOutputStream dataOutput = new ByteStringOutputStream();
valueCoder.encode(value, dataOutput, Context.OUTER);
messageBundle
.addMessagesBuilder()
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
index 1b9f96e45cc..757e49d3233 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
@@ -109,7 +110,7 @@ public class StreamingGroupAlsoByWindowsReshuffleDoFnTest {
Coder<Collection<? extends BoundedWindow>> windowsCoder =
(Coder) CollectionCoder.of(windowCoder);
- ByteString.Output dataOutput = ByteString.newOutput();
+ ByteStringOutputStream dataOutput = new ByteStringOutputStream();
valueCoder.encode(value, dataOutput, Context.OUTER);
messageBundle
.addMessagesBuilder()
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java
index 2abc4f0e3ba..157347c52ed 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
@@ -198,7 +199,7 @@ public class WindmillStateInternalsTest {
private <K> ByteString protoKeyFromUserKey(@Nullable K tag, Coder<K> keyCoder)
throws IOException {
- ByteString.Output keyStream = ByteString.newOutput();
+ ByteStringOutputStream keyStream = new ByteStringOutputStream();
key(NAMESPACE, "map").writeTo(keyStream);
if (tag != null) {
keyCoder.encode(tag, keyStream, Context.OUTER);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java
index edcb1915c04..934487d291b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateReaderTest.java
@@ -33,9 +33,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.SortedListRange
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString.Output;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Range;
import org.hamcrest.Matchers;
@@ -93,7 +93,7 @@ public class WindmillStateReaderTest {
}
private ByteString intData(int value) throws IOException {
- Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
INT_CODER.encode(value, output, Coder.Context.OUTER);
return output.toByteString();
}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 7e28a799c46..8f5dbfa4eb1 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -130,6 +130,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -819,7 +820,7 @@ public class RemoteExecutionTest implements Serializable {
}
private static ByteString encode(String value) throws Exception {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
StringUtf8Coder.of().encode(value, output);
return output.toByteString();
}
@@ -1524,7 +1525,7 @@ public class RemoteExecutionTest implements Serializable {
// 3 Requests expected: state read, state2 read, and state2 clear
assertEquals(3, stateRequestHandler.getRequestCount());
- ByteString.Output out = ByteString.newOutput();
+ ByteStringOutputStream out = new ByteStringOutputStream();
StringUtf8Coder.of().encode("X", out);
assertEquals(
diff --git a/sdks/java/core/jmh/build.gradle b/sdks/java/core/jmh/build.gradle
index 7d9948ba517..06df6abf739 100644
--- a/sdks/java/core/jmh/build.gradle
+++ b/sdks/java/core/jmh/build.gradle
@@ -29,6 +29,9 @@ ext.summary = "This contains JMH benchmarks for the SDK Core for Beam Java"
dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.joda_time
+ implementation library.java.vendored_grpc_1_43_2
implementation library.java.vendored_guava_26_0_jre
runtimeOnly library.java.slf4j_jdk14
+ testImplementation library.java.junit
+ testImplementation library.java.hamcrest
}
diff --git a/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmark.java b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmark.java
new file mode 100644
index 00000000000..2a33c76ebaf
--- /dev/null
+++ b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmark.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.jmh.util;
+
+import java.util.List;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.infra.Blackhole;
+
+/** Benchmarks for {@link ByteStringOutputStream}. */
+public class ByteStringOutputStreamBenchmark {
+
+ private static final int MANY_WRITES = 10_000;
+ private static final int FEW_WRITES = 5;
+ private static final byte[] LARGE_BUFFER = new byte[1000];
+ private static final byte[] SMALL_BUFFER = new byte[20];
+
+ @State(Scope.Thread)
+ public static class ProtobufByteStringOutputStream {
+ final ByteString.Output output = ByteString.newOutput();
+
+ @TearDown
+ public void tearDown() throws Exception {
+ output.close();
+ }
+ }
+
+ @State(Scope.Thread)
+ public static class SdkCoreByteStringOutputStream {
+ final ByteStringOutputStream output = new ByteStringOutputStream();
+
+ @TearDown
+ public void tearDown() throws Exception {
+ output.close();
+ }
+ }
+
+ @Benchmark
+ public void testSdkCoreByteStringOutputStreamManyMixedWritesWithoutReuse() throws Exception {
+ ByteStringOutputStream output = new ByteStringOutputStream();
+ for (int i = 0; i < MANY_WRITES; i++) {
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ output.write(LARGE_BUFFER);
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ }
+ if (output.toByteString().size()
+ != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * MANY_WRITES) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testSdkCoreByteStringOutputStreamFewMixedWritesWithoutReuse() throws Exception {
+ ByteStringOutputStream output = new ByteStringOutputStream();
+ for (int i = 0; i < FEW_WRITES; i++) {
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ output.write(LARGE_BUFFER);
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ }
+ if (output.toByteString().size()
+ != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * FEW_WRITES) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testProtobufByteStringOutputStreamManyMixedWritesWithoutReuse() throws Exception {
+ ByteString.Output output = ByteString.newOutput();
+ for (int i = 0; i < MANY_WRITES; i++) {
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ output.write(LARGE_BUFFER);
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ }
+ if (output.toByteString().size()
+ != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * MANY_WRITES) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testProtobufByteStringOutputStreamFewMixedWritesWithoutReuse() throws Exception {
+ ByteString.Output output = ByteString.newOutput();
+ for (int i = 0; i < FEW_WRITES; i++) {
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ output.write(LARGE_BUFFER);
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ }
+ if (output.toByteString().size()
+ != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * FEW_WRITES) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testProtobufByteStringOutputStreamManyTinyWrites() throws Exception {
+ ByteString.Output output = ByteString.newOutput();
+ for (int i = 0; i < MANY_WRITES; ++i) {
+ output.write(1);
+ }
+ if (output.toByteString().size() != MANY_WRITES) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testProtobufByteStringOutputStreamManySmallWrites() throws Exception {
+ ByteString.Output output = ByteString.newOutput();
+ for (int i = 0; i < MANY_WRITES; ++i) {
+ output.write(SMALL_BUFFER);
+ }
+ if (output.toByteString().size() != MANY_WRITES * SMALL_BUFFER.length) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testProtobufByteStringOutputStreamManyLargeWrites() throws Exception {
+ ByteString.Output output = ByteString.newOutput();
+ for (int i = 0; i < MANY_WRITES; ++i) {
+ output.write(LARGE_BUFFER);
+ }
+ if (output.toByteString().size() != MANY_WRITES * LARGE_BUFFER.length) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testProtobufByteStringOutputStreamFewTinyWrites() throws Exception {
+ ByteString.Output output = ByteString.newOutput();
+ for (int i = 0; i < FEW_WRITES; ++i) {
+ output.write(1);
+ }
+ if (output.toByteString().size() != FEW_WRITES) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testProtobufByteStringOutputStreamFewSmallWrites() throws Exception {
+ ByteString.Output output = ByteString.newOutput();
+ for (int i = 0; i < FEW_WRITES; ++i) {
+ output.write(SMALL_BUFFER);
+ }
+ if (output.toByteString().size() != FEW_WRITES * SMALL_BUFFER.length) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testProtobufByteStringOutputStreamFewLargeWrites() throws Exception {
+ ByteString.Output output = ByteString.newOutput();
+ for (int i = 0; i < FEW_WRITES; ++i) {
+ output.write(LARGE_BUFFER);
+ }
+ if (output.toByteString().size() != FEW_WRITES * LARGE_BUFFER.length) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testProtobufByteStringOutputStreamManyMixedWritesWithReuse(
+ ProtobufByteStringOutputStream state) throws Exception {
+ ByteString.Output output = state.output;
+ for (int i = 0; i < 9850; i++) {
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ output.write(LARGE_BUFFER);
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ }
+ if (output.toByteString().size()
+ != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * 9850) {
+ throw new IllegalArgumentException();
+ }
+ output.reset();
+ }
+
+ @Benchmark
+ public void testProtobufByteStringOutputStreamFewMixedWritesWithReuse(
+ ProtobufByteStringOutputStream state) throws Exception {
+ ByteString.Output output = state.output;
+ for (int i = 0; i < FEW_WRITES; i++) {
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ output.write(LARGE_BUFFER);
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ }
+ if (output.toByteString().size()
+ != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * FEW_WRITES) {
+ throw new IllegalArgumentException();
+ }
+ output.reset();
+ }
+
+ @Benchmark
+ public void testSdkCoreByteStringOutputStreamManyTinyWrites() throws Exception {
+ ByteStringOutputStream output = new ByteStringOutputStream();
+ for (int i = 0; i < MANY_WRITES; ++i) {
+ output.write(1);
+ }
+ if (output.toByteString().size() != MANY_WRITES) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testSdkCoreByteStringOutputStreamManySmallWrites() throws Exception {
+ ByteStringOutputStream output = new ByteStringOutputStream();
+ for (int i = 0; i < MANY_WRITES; ++i) {
+ output.write(SMALL_BUFFER);
+ }
+ if (output.toByteString().size() != MANY_WRITES * SMALL_BUFFER.length) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testSdkCoreByteStringOutputStreamManyLargeWrites() throws Exception {
+ ByteStringOutputStream output = new ByteStringOutputStream();
+ for (int i = 0; i < MANY_WRITES; ++i) {
+ output.write(LARGE_BUFFER);
+ }
+ if (output.toByteString().size() != MANY_WRITES * LARGE_BUFFER.length) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testSdkCoreByteStringOutputStreamFewTinyWrites() throws Exception {
+ ByteStringOutputStream output = new ByteStringOutputStream();
+ for (int i = 0; i < FEW_WRITES; ++i) {
+ output.write(1);
+ }
+ if (output.toByteString().size() != FEW_WRITES) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testSdkCoreByteStringOutputStreamFewSmallWrites() throws Exception {
+ ByteStringOutputStream output = new ByteStringOutputStream();
+ for (int i = 0; i < FEW_WRITES; ++i) {
+ output.write(SMALL_BUFFER);
+ }
+ if (output.toByteString().size() != FEW_WRITES * SMALL_BUFFER.length) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testSdkCoreByteStringOutputStreamFewLargeWrites() throws Exception {
+ ByteStringOutputStream output = new ByteStringOutputStream();
+ for (int i = 0; i < FEW_WRITES; ++i) {
+ output.write(LARGE_BUFFER);
+ }
+ if (output.toByteString().size() != FEW_WRITES * LARGE_BUFFER.length) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testSdkCoreByteStringOutputStreamManyMixedWritesWithReuse(
+ SdkCoreByteStringOutputStream state) throws Exception {
+ ByteStringOutputStream output = state.output;
+ for (int i = 0; i < 9850; i++) {
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ output.write(LARGE_BUFFER);
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ }
+ if (output.toByteStringAndReset().size()
+ != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * 9850) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ @Benchmark
+ public void testSdkCoreByteStringOutputStreamFewMixedWritesWithReuse(
+ SdkCoreByteStringOutputStream state) throws Exception {
+ ByteStringOutputStream output = state.output;
+ for (int i = 0; i < FEW_WRITES; i++) {
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ output.write(LARGE_BUFFER);
+ output.write(1);
+ output.write(SMALL_BUFFER);
+ output.write(1);
+ }
+ if (output.toByteStringAndReset().size()
+ != (4 + 2 * SMALL_BUFFER.length + LARGE_BUFFER.length) * FEW_WRITES) {
+ throw new IllegalArgumentException();
+ }
+ output.close();
+ }
+
+ /**
+ * These benchmarks below provide good details as to the cost of creating a new buffer vs copying
+ * a subset of the existing one and re-using the larger one.
+ */
+ public static class NewVsCopy {
+ @State(Scope.Thread)
+ public static class ArrayCopyState {
+ @Param({
+ "512/1024", "640/1024", "768/1024", "896/1024",
+ "4096/8192", "5120/8192", "6144/8192", "7168/8192",
+ "20480/65536", "24576/65536", "28672/65536", "32768/65536",
+ "131072/262144", "163840/262144", "196608/262144", "229376/262144",
+ "524288/1048576", "655360/1048576", "786432/1048576", "917504/1048576"
+ })
+ String copyVsNew;
+
+ int copyThreshold;
+ int byteArraySize;
+ public byte[] src;
+
+ @Setup
+ public void setup() {
+ List<String> parts = Splitter.on('/').splitToList(copyVsNew);
+ copyThreshold = Integer.parseInt(parts.get(0));
+ byteArraySize = Integer.parseInt(parts.get(1));
+ src = new byte[byteArraySize];
+ }
+ }
+
+ @Benchmark
+ public void testCopyArray(ArrayCopyState state, Blackhole bh) {
+ byte[] dest = new byte[state.copyThreshold];
+ System.arraycopy(state.src, 0, dest, 0, state.copyThreshold);
+ bh.consume(UnsafeByteOperations.unsafeWrap(dest));
+ }
+
+ @State(Scope.Benchmark)
+ public static class ArrayNewState {
+ @Param({"1024", "8192", "65536", "262144", "1048576"})
+ int byteArraySize;
+
+ public byte[] src;
+
+ @Setup
+ public void setup() {
+ src = new byte[byteArraySize];
+ }
+ }
+
+ @Benchmark
+ public void testNewArray(ArrayNewState state, Blackhole bh) {
+ bh.consume(UnsafeByteOperations.unsafeWrap(state.src, 0, state.byteArraySize));
+ state.src = new byte[state.byteArraySize];
+ }
+ }
+}
diff --git a/sdks/java/core/jmh/build.gradle b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/package-info.java
similarity index 51%
copy from sdks/java/core/jmh/build.gradle
copy to sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/package-info.java
index 7d9948ba517..647f31b7276 100644
--- a/sdks/java/core/jmh/build.gradle
+++ b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/package-info.java
@@ -4,31 +4,21 @@
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
+ * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
+ * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-plugins { id 'org.apache.beam.module' }
+/** Benchmarks for core SDK utility classes. */
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.jmh.util;
-applyJavaNature(
- automaticModuleName: 'org.apache.beam.sdk.jmh',
- enableJmh: true,
- publish: false)
-
-description = "Apache Beam :: SDKs :: Java :: Core :: JMH"
-ext.summary = "This contains JMH benchmarks for the SDK Core for Beam Java"
-
-dependencies {
- implementation project(path: ":sdks:java:core", configuration: "shadow")
- implementation library.java.joda_time
- implementation library.java.vendored_guava_26_0_jre
- runtimeOnly library.java.slf4j_jdk14
-}
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import org.checkerframework.checker.nullness.qual.NonNull;
diff --git a/sdks/java/core/jmh/src/test/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmarkTest.java b/sdks/java/core/jmh/src/test/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmarkTest.java
new file mode 100644
index 00000000000..31e15fb4f08
--- /dev/null
+++ b/sdks/java/core/jmh/src/test/java/org/apache/beam/sdk/jmh/util/ByteStringOutputStreamBenchmarkTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.jmh.util;
+
+import org.apache.beam.sdk.jmh.util.ByteStringOutputStreamBenchmark.NewVsCopy.ArrayCopyState;
+import org.apache.beam.sdk.jmh.util.ByteStringOutputStreamBenchmark.NewVsCopy.ArrayNewState;
+import org.apache.beam.sdk.jmh.util.ByteStringOutputStreamBenchmark.ProtobufByteStringOutputStream;
+import org.apache.beam.sdk.jmh.util.ByteStringOutputStreamBenchmark.SdkCoreByteStringOutputStream;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.openjdk.jmh.infra.Blackhole;
+
+/** Tests for {@link ByteStringOutputStreamBenchmark}. */
+@RunWith(JUnit4.class)
+public class ByteStringOutputStreamBenchmarkTest {
+ @Test
+ public void testProtobufByteStringOutputStream() throws Exception {
+ new ByteStringOutputStreamBenchmark()
+ .testProtobufByteStringOutputStreamFewMixedWritesWithoutReuse();
+ new ByteStringOutputStreamBenchmark()
+ .testProtobufByteStringOutputStreamFewMixedWritesWithReuse(
+ new ProtobufByteStringOutputStream());
+ new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamFewLargeWrites();
+ new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamFewSmallWrites();
+ new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamFewTinyWrites();
+ new ByteStringOutputStreamBenchmark()
+ .testProtobufByteStringOutputStreamManyMixedWritesWithoutReuse();
+ new ByteStringOutputStreamBenchmark()
+ .testProtobufByteStringOutputStreamManyMixedWritesWithReuse(
+ new ProtobufByteStringOutputStream());
+ new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamManyLargeWrites();
+ new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamManySmallWrites();
+ new ByteStringOutputStreamBenchmark().testProtobufByteStringOutputStreamManyTinyWrites();
+ }
+
+ @Test
+ public void testSdkCoreByteStringOutputStream() throws Exception {
+ new ByteStringOutputStreamBenchmark()
+ .testSdkCoreByteStringOutputStreamFewMixedWritesWithoutReuse();
+ new ByteStringOutputStreamBenchmark()
+ .testSdkCoreByteStringOutputStreamFewMixedWritesWithReuse(
+ new SdkCoreByteStringOutputStream());
+ new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamFewLargeWrites();
+ new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamFewSmallWrites();
+ new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamFewTinyWrites();
+ new ByteStringOutputStreamBenchmark()
+ .testSdkCoreByteStringOutputStreamManyMixedWritesWithoutReuse();
+ new ByteStringOutputStreamBenchmark()
+ .testSdkCoreByteStringOutputStreamManyMixedWritesWithReuse(
+ new SdkCoreByteStringOutputStream());
+ new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamManyLargeWrites();
+ new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamManySmallWrites();
+ new ByteStringOutputStreamBenchmark().testSdkCoreByteStringOutputStreamManyTinyWrites();
+ }
+
+ @Test
+ public void testNewVsCopy() throws Exception {
+ Blackhole bh =
+ new Blackhole(
+ "Today's password is swordfish. I understand instantiating Blackholes directly is dangerous.");
+ ArrayCopyState copyState = new ArrayCopyState();
+ copyState.copyVsNew = "512/2048";
+ copyState.setup();
+
+ ArrayNewState newState = new ArrayNewState();
+ newState.byteArraySize = 2048;
+ newState.setup();
+
+ new ByteStringOutputStreamBenchmark.NewVsCopy().testCopyArray(copyState, bh);
+ new ByteStringOutputStreamBenchmark.NewVsCopy().testNewArray(newState, bh);
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java
new file mode 100644
index 00000000000..112032e1af0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.OutputStream;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+
+/**
+ * An {@link OutputStream} that produces {@link ByteString}s.
+ *
+ * <p>Closing this output stream does nothing.
+ *
+ * <p>This class is not thread safe and expects appropriate locking to be used in a thread-safe
+ * manner. This differs from {@link ByteString.Output} which synchronizes its writes.
+ */
+@NotThreadSafe
+public final class ByteStringOutputStream extends OutputStream {
+
+ // This constant was chosen based upon Protobufs ByteString#CONCATENATE_BY_COPY which
+ // isn't public to prevent copying the bytes again when concatenating ByteStrings instead
+ // of appending.
+ private static final int DEFAULT_CAPACITY = 128;
+
+ // ByteStringOutputStreamBenchmark.NewVsCopy shows that we actually are faster
+ // creating a 4 new arrays that are 256k vs one that is 1024k by almost a factor
+ // of 2.
+ //
+ // This number should be tuned periodically as hardware changes.
+ private static final int MAX_CHUNK_SIZE = 256 * 1024;
+
+ // ByteString to be concatenated to create the result
+ private ByteString result;
+
+ // Current buffer to which we are writing
+ private byte[] buffer;
+
+ // Location in buffer[] to which we write the next byte.
+ private int bufferPos;
+
+ /** Creates a new output stream with a default capacity. */
+ public ByteStringOutputStream() {
+ this(DEFAULT_CAPACITY);
+ }
+
+ /**
+ * Creates a new output stream with the specified initial capacity.
+ *
+ * @param initialCapacity the initial capacity of the output stream.
+ */
+ public ByteStringOutputStream(int initialCapacity) {
+ if (initialCapacity < 0) {
+ throw new IllegalArgumentException("Initial capacity < 0");
+ }
+ this.buffer = new byte[initialCapacity];
+ this.result = ByteString.EMPTY;
+ }
+
+ @Override
+ public void write(int b) {
+ if (bufferPos == buffer.length) {
+ // We want to increase our total capacity by 50% but not larger than the max chunk size.
+ result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+ buffer = new byte[Math.min(Math.max(1, result.size()), MAX_CHUNK_SIZE)];
+ bufferPos = 0;
+ }
+ buffer[bufferPos++] = (byte) b;
+ }
+
+ @Override
+ public void write(byte[] b, int offset, int length) {
+ int remainingSpaceInBuffer = buffer.length - bufferPos;
+ while (length > remainingSpaceInBuffer) {
+ // Use up the current buffer
+ System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer);
+ offset += remainingSpaceInBuffer;
+ length -= remainingSpaceInBuffer;
+
+ result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
+ // We want to increase our total capacity but not larger than the max chunk size.
+ remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE);
+ buffer = new byte[remainingSpaceInBuffer];
+ bufferPos = 0;
+ }
+
+ System.arraycopy(b, offset, buffer, bufferPos, length);
+ bufferPos += length;
+ }
+
+ /**
+ * Creates a byte string with the size and contents of this output stream.
+ *
+ * <p>Note that the caller must not invoke {#link {@link #toByteStringAndReset} as the internal
+ * buffer maybe mutated by a future {@link #write} mutating {@link ByteString}s returned in the
+ * past.
+ */
+ public ByteString toByteString() {
+ // We specifically choose to concatenate here since the user won't be re-using the buffer.
+ return result.concat(UnsafeByteOperations.unsafeWrap(buffer, 0, bufferPos));
+ }
+
+ /**
+ * Creates a byte string with the size and contents of this output stream and resets the output
+ * stream to be re-used possibly re-using any existing buffers.
+ */
+ public ByteString toByteStringAndReset() {
+ ByteString rval;
+ if (bufferPos > 0) {
+ final boolean copy;
+ // These thresholds are from the results of ByteStringOutputStreamBenchmark.CopyVewNew
+ // which show that at these thresholds we should copy the bytes instead to re-use
+ // the existing buffer since creating a new one is more expensive.
+ if (buffer.length <= 128) {
+ // Always copy small byte arrays to prevent large chunks of wasted space
+ // when dealing with very small amounts of data.
+ copy = true;
+ } else if (buffer.length <= 1024) {
+ copy = bufferPos <= buffer.length * 0.875;
+ } else if (buffer.length <= 8192) {
+ copy = bufferPos <= buffer.length * 0.75;
+ } else {
+ copy = bufferPos <= buffer.length * 0.4375;
+ }
+ if (copy) {
+ byte[] bufferCopy = new byte[bufferPos];
+ System.arraycopy(buffer, 0, bufferCopy, 0, bufferPos);
+ rval = result.concat(UnsafeByteOperations.unsafeWrap(bufferCopy));
+ } else {
+ rval = result.concat(UnsafeByteOperations.unsafeWrap(buffer, 0, bufferPos));
+ buffer = new byte[Math.min(rval.size(), MAX_CHUNK_SIZE)];
+ }
+ bufferPos = 0;
+ } else {
+ rval = result;
+ }
+ result = ByteString.EMPTY;
+ return rval;
+ }
+
+ /**
+ * Returns the current size of the output stream.
+ *
+ * @return the current size of the output stream
+ */
+ public int size() {
+ return result.size() + bufferPos;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "<ByteStringOutputStream@%s size=%d>",
+ Integer.toHexString(System.identityHashCode(this)), size());
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java
new file mode 100644
index 00000000000..faa77cf6467
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ByteStringOutputStreamTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.UnsafeByteOperations;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ByteStringOutputStreamTest {
+
+ @Test
+ public void testInvalidInitialCapacity() throws Exception {
+ assertThrows(
+ "Initial capacity < 0",
+ IllegalArgumentException.class,
+ () -> new ByteStringOutputStream(-1));
+ }
+
+ @Test
+ public void testWriteBytes() throws Exception {
+ ByteStringOutputStream out = new ByteStringOutputStream();
+ assertEquals(0, out.size());
+ for (int numElements = 0; numElements < 1024 * 1024; numElements = next(numElements)) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(baos);
+ try {
+ for (int i = 0; i < numElements; ++i) {
+ dataOut.writeInt(i);
+ }
+ dataOut.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ dataOut.close();
+ byte[] testBuffer = baos.toByteArray();
+
+ for (int pos = 0; pos < testBuffer.length; ) {
+ if (testBuffer[pos] == 0) {
+ out.write(testBuffer[pos]);
+ pos += 1;
+ } else {
+ int len = Math.min(testBuffer.length - pos, Math.abs(testBuffer[pos]));
+ out.write(testBuffer, pos, len);
+ pos += len;
+ }
+ assertEquals(pos, out.size());
+ }
+ assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteString());
+ assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteStringAndReset());
+ }
+ }
+
+ @Test
+ public void testWriteBytesWithZeroInitialCapacity() throws Exception {
+ for (int numElements = 0; numElements < 1024 * 1024; numElements = next(numElements)) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(baos);
+ try {
+ for (int i = 0; i < numElements; ++i) {
+ dataOut.writeInt(i);
+ }
+ dataOut.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ dataOut.close();
+ byte[] testBuffer = baos.toByteArray();
+
+ ByteStringOutputStream out = new ByteStringOutputStream(0);
+ assertEquals(0, out.size());
+
+ for (int pos = 0; pos < testBuffer.length; ) {
+ if (testBuffer[pos] == 0) {
+ out.write(testBuffer[pos]);
+ pos += 1;
+ } else {
+ int len = Math.min(testBuffer.length - pos, Math.abs(testBuffer[pos]));
+ out.write(testBuffer, pos, len);
+ pos += len;
+ }
+ assertEquals(pos, out.size());
+ }
+ assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteString());
+ assertEquals(UnsafeByteOperations.unsafeWrap(testBuffer), out.toByteStringAndReset());
+ }
+ }
+
+ // Grow the elements based upon an approximation of the fibonacci sequence.
+ private static int next(int current) {
+ double a = Math.max(1, current * (1 + Math.sqrt(5)) / 2.0);
+ return (int) Math.round(a);
+ }
+}
diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
index b9cbdc779d4..39076d02268 100644
--- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
+++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java
@@ -53,8 +53,8 @@ import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -353,7 +353,7 @@ public class ExpansionServiceTest {
private static ExternalTransforms.ExternalConfigurationPayload
encodeRowIntoExternalConfigurationPayload(Row row) {
- ByteString.Output outputStream = ByteString.newOutput();
+ ByteStringOutputStream outputStream = new ByteStringOutputStream();
try {
SchemaCoder.of(row.getSchema()).encode(row, outputStream);
} catch (IOException e) {
diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java
index 631d208d6e8..26b6a591738 100644
--- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java
+++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/JavaClassLookupTransformProviderTest.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
@@ -1144,7 +1145,7 @@ public class JavaClassLookupTransformProviderTest {
}
private ByteString getProtoPayloadFromRow(Row row) {
- ByteString.Output outputStream = ByteString.newOutput();
+ ByteStringOutputStream outputStream = new ByteStringOutputStream();
try {
SchemaCoder.of(row.getSchema()).encode(row, outputStream);
} catch (IOException e) {
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
index d78191ba8b0..09422e07950 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
@@ -35,6 +35,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
@@ -236,29 +237,27 @@ public class BeamFnDataOutboundAggregator {
private Elements.Builder convertBufferForTransmission() {
Elements.Builder bufferedElements = Elements.newBuilder();
for (Map.Entry<String, Receiver<?>> entry : outputDataReceivers.entrySet()) {
- if (entry.getValue().getOutput().size() == 0) {
+ if (entry.getValue().bufferedSize() == 0) {
continue;
}
- ByteString bytes = entry.getValue().getOutput().toByteString();
+ ByteString bytes = entry.getValue().toByteStringAndResetBuffer();
bufferedElements
.addDataBuilder()
.setInstructionId(processBundleRequestIdSupplier.get())
.setTransformId(entry.getKey())
.setData(bytes);
- entry.getValue().resetOutput();
}
for (Map.Entry<TimerEndpoint, Receiver<?>> entry : outputTimersReceivers.entrySet()) {
- if (entry.getValue().getOutput().size() == 0) {
+ if (entry.getValue().bufferedSize() == 0) {
continue;
}
- ByteString bytes = entry.getValue().getOutput().toByteString();
+ ByteString bytes = entry.getValue().toByteStringAndResetBuffer();
bufferedElements
.addTimersBuilder()
.setInstructionId(processBundleRequestIdSupplier.get())
.setTransformId(entry.getKey().pTransformId)
.setTimerFamilyId(entry.getKey().timerFamilyId)
.setTimers(bytes);
- entry.getValue().resetOutput();
}
bytesWrittenSinceFlush = 0L;
return bufferedElements;
@@ -323,13 +322,13 @@ public class BeamFnDataOutboundAggregator {
@VisibleForTesting
class Receiver<T> implements FnDataReceiver<T> {
- private final ByteString.Output output;
+ private final ByteStringOutputStream output;
private final Coder<T> coder;
private long perBundleByteCount;
private long perBundleElementCount;
public Receiver(Coder<T> coder) {
- this.output = ByteString.newOutput();
+ this.output = new ByteStringOutputStream();
this.coder = coder;
this.perBundleByteCount = 0L;
this.perBundleElementCount = 0L;
@@ -351,10 +350,6 @@ public class BeamFnDataOutboundAggregator {
}
}
- public ByteString.Output getOutput() {
- return output;
- }
-
public long getByteCount() {
return perBundleByteCount;
}
@@ -363,8 +358,12 @@ public class BeamFnDataOutboundAggregator {
return perBundleElementCount;
}
- public void resetOutput() {
- this.output.reset();
+ public int bufferedSize() {
+ return output.size();
+ }
+
+ public ByteString toByteStringAndResetBuffer() {
+ return this.output.toByteStringAndReset();
}
public void resetStats() {
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
index 9a0b15f5927..d55c09120ef 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
/**
@@ -80,7 +81,7 @@ public class DataStreams {
*/
public static final class ElementDelimitedOutputStream extends OutputStream {
private final OutputChunkConsumer<ByteString> consumer;
- private final ByteString.Output output;
+ private final ByteStringOutputStream output;
private final int maximumChunkSize;
int previousPosition;
@@ -88,7 +89,7 @@ public class DataStreams {
OutputChunkConsumer<ByteString> consumer, int maximumChunkSize) {
this.consumer = consumer;
this.maximumChunkSize = maximumChunkSize;
- this.output = ByteString.newOutput(maximumChunkSize);
+ this.output = new ByteStringOutputStream(maximumChunkSize);
}
public void delimitElement() throws IOException {
@@ -139,8 +140,7 @@ public class DataStreams {
/** Can only be called if at least one byte has been written. */
private void internalFlush() throws IOException {
- consumer.read(output.toByteString());
- output.reset();
+ consumer.read(output.toByteStringAndReset());
// Set the previous position to an invalid position representing that a previous buffer
// was written to.
previousPosition = -1;
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
index 82d35e5ab04..c1e8e4293d4 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver2Test.java
@@ -35,8 +35,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -203,7 +203,7 @@ public class BeamFnDataInboundObserver2Test {
}
private BeamFnApi.Elements dataWith(String... values) throws Exception {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
for (String value : values) {
CODER.encode(valueInGlobalWindow(value), output);
}
@@ -222,7 +222,7 @@ public class BeamFnDataInboundObserver2Test {
}
private BeamFnApi.Elements timerWith(String... values) throws Exception {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
for (String value : values) {
CODER.encode(valueInGlobalWindow(value), output);
}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
index 4f4cb082032..c0b1a6c24ab 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.junit.Rule;
@@ -98,7 +99,7 @@ public class BeamFnDataInboundObserverTest {
}
private ByteString dataWith(String... values) throws Exception {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
for (String value : values) {
CODER.encode(valueInGlobalWindow(value), output);
}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java
index 9937fcafe12..8b2adc6f2e2 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregatorTest.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.junit.Test;
@@ -145,7 +145,7 @@ public class BeamFnDataOutboundAggregatorTest {
} else {
receiver = Iterables.getOnlyElement(aggregator.outputDataReceivers.values());
}
- assertEquals(0L, receiver.getOutput().size());
+ assertEquals(0L, receiver.bufferedSize());
assertEquals(102L, receiver.getByteCount());
assertEquals(2L, receiver.getElementCount());
@@ -155,7 +155,7 @@ public class BeamFnDataOutboundAggregatorTest {
aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
// Test that receiver stats have been reset after
// sendOrCollectBufferedDataAndFinishOutboundStreams.
- assertEquals(0L, receiver.getOutput().size());
+ assertEquals(0L, receiver.bufferedSize());
assertEquals(0L, receiver.getByteCount());
assertEquals(0L, receiver.getElementCount());
@@ -344,7 +344,7 @@ public class BeamFnDataOutboundAggregatorTest {
BeamFnApi.Elements.Builder messageWithDataBuilder(LogicalEndpoint endpoint, byte[]... datum)
throws IOException {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
for (byte[] data : datum) {
CODER.encode(data, output);
}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
index 1e128747839..dfb10571a2b 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.stream.DataStreams.DataStreamDecoder;
import org.apache.beam.sdk.fn.stream.DataStreams.ElementDelimitedOutputStream;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
@@ -145,7 +146,7 @@ public class DataStreamsTest {
}
private ByteString encode(String... values) throws IOException {
- ByteString.Output out = ByteString.newOutput();
+ ByteStringOutputStream out = new ByteStringOutputStream();
for (String value : values) {
StringUtf8Coder.of().encode(value, out);
}
@@ -153,7 +154,7 @@ public class DataStreamsTest {
}
private <T> void testDecoderWith(Coder<T> coder, T... expected) throws IOException {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
for (T value : expected) {
int size = output.size();
coder.encode(value, output);
diff --git a/sdks/java/harness/jmh/build.gradle b/sdks/java/harness/jmh/build.gradle
index 4d50c717b55..17860f172d3 100644
--- a/sdks/java/harness/jmh/build.gradle
+++ b/sdks/java/harness/jmh/build.gradle
@@ -31,8 +31,14 @@ configurations {
}
dependencies {
+ implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":sdks:java:harness", configuration: "shadow")
- implementation project(":runners:java-fn-execution")
+ implementation project(path: ":runners:java-fn-execution")
+ implementation project(path: ":model:pipeline", configuration: "shadow")
+ implementation library.java.vendored_grpc_1_43_2
+ implementation library.java.vendored_guava_26_0_jre
+ implementation library.java.slf4j_api
+ implementation library.java.joda_time
runtimeOnly library.java.slf4j_jdk14
jammAgent library.java.jamm
}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 24b6f8f73c7..50402e472c8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -106,6 +106,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -728,7 +729,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
public void reset() {}
private ByteString encodeProgress(double value) throws IOException {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
IterableCoder.of(DoubleCoder.of()).encode(Arrays.asList(value), output);
return output.toByteString();
}
@@ -1514,7 +1515,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
// Encode window splits.
if (windowedSplitResult != null
&& windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != null) {
- ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+ ByteStringOutputStream primaryInOtherWindowsBytes = new ByteStringOutputStream();
try {
fullInputCoder.encode(
windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot(),
@@ -1531,7 +1532,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
if (windowedSplitResult != null
&& windowedSplitResult.getResidualInUnprocessedWindowsRoot() != null) {
- ByteString.Output bytesOut = ByteString.newOutput();
+ ByteStringOutputStream bytesOut = new ByteStringOutputStream();
try {
fullInputCoder.encode(windowedSplitResult.getResidualInUnprocessedWindowsRoot(), bytesOut);
} catch (IOException e) {
@@ -1564,8 +1565,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
.build());
}
- ByteString.Output primaryBytes = ByteString.newOutput();
- ByteString.Output residualBytes = ByteString.newOutput();
+ ByteStringOutputStream primaryBytes = new ByteStringOutputStream();
+ ByteStringOutputStream residualBytes = new ByteStringOutputStream();
// Encode element split from windowedSplitResult or from downstream element split. It's possible
// that there is no element split.
if (windowedSplitResult != null && windowedSplitResult.getResidualSplitRoot() != null) {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
index 39181d9f57c..51d83b501d4 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java
@@ -32,7 +32,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
/**
@@ -128,7 +128,7 @@ public class BagUserState<T> {
request.toBuilder().setClear(StateClearRequest.getDefaultInstance()));
}
if (!newValues.isEmpty()) {
- ByteString.Output out = ByteString.newOutput();
+ ByteStringOutputStream out = new ByteStringOutputStream();
for (T newValue : newValues) {
// TODO: Replace with chunking output stream
valueCoder.encode(newValue, out);
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
index e3c850e7cec..1974b617934 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -118,7 +119,7 @@ public class FnApiStateAccessor<K> implements SideInputReader, StateBinder {
checkState(
keyCoder != null, "Accessing state in unkeyed context, no key coder available");
- ByteString.Output encodedKeyOut = ByteString.newOutput();
+ ByteStringOutputStream encodedKeyOut = new ByteStringOutputStream();
try {
((Coder) keyCoder).encode(key, encodedKeyOut, Coder.Context.NESTED);
} catch (IOException e) {
@@ -131,7 +132,7 @@ public class FnApiStateAccessor<K> implements SideInputReader, StateBinder {
memoizeFunction(
currentWindowSupplier,
window -> {
- ByteString.Output encodedWindowOut = ByteString.newOutput();
+ ByteStringOutputStream encodedWindowOut = new ByteStringOutputStream();
try {
windowCoder.encode(window, encodedWindowOut);
} catch (IOException e) {
@@ -167,7 +168,7 @@ public class FnApiStateAccessor<K> implements SideInputReader, StateBinder {
SideInputSpec sideInputSpec = sideInputSpecMap.get(tag);
checkArgument(sideInputSpec != null, "Attempting to access unknown side input %s.", view);
- ByteString.Output encodedWindowOut = ByteString.newOutput();
+ ByteStringOutputStream encodedWindowOut = new ByteStringOutputStream();
try {
sideInputSpec
.getWindowCoder()
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
index 39f735d98ef..718b440b6a4 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
@@ -42,7 +43,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table.Ce
public class FnApiTimerBundleTracker<K> {
private final Supplier<ByteString> encodedCurrentKeySupplier;
private final Supplier<ByteString> encodedCurrentWindowSupplier;
- private Table<ByteString, ByteString, Modifications<K>> timerModifications;
+ private final Table<ByteString, ByteString, Modifications<K>> timerModifications;
@AutoValue
public abstract static class TimerInfo<K> {
@@ -116,7 +117,7 @@ public class FnApiTimerBundleTracker<K> {
Sets.newTreeSet(comparator),
HashBasedTable.create());
}
- };
+ }
public FnApiTimerBundleTracker(
Coder<K> keyCoder,
@@ -131,9 +132,9 @@ public class FnApiTimerBundleTracker<K> {
checkState(
keyCoder != null, "Accessing state in unkeyed context, no key coder available");
- ByteString.Output encodedKeyOut = ByteString.newOutput();
+ ByteStringOutputStream encodedKeyOut = new ByteStringOutputStream();
try {
- ((Coder) keyCoder).encode(key, encodedKeyOut, Coder.Context.NESTED);
+ keyCoder.encode(key, encodedKeyOut, Coder.Context.NESTED);
} catch (IOException e) {
throw new IllegalStateException(e);
}
@@ -143,7 +144,7 @@ public class FnApiTimerBundleTracker<K> {
memoizeFunction(
currentWindowSupplier,
window -> {
- ByteString.Output encodedWindowOut = ByteString.newOutput();
+ ByteStringOutputStream encodedWindowOut = new ByteStringOutputStream();
try {
windowCoder.encode(window, encodedWindowOut);
} catch (IOException e) {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java
index f36423a62a2..409a4831164 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java
@@ -26,6 +26,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
/**
@@ -70,7 +71,7 @@ public class MultimapSideInput<K, V> implements MultimapView<K, V> {
@Override
public Iterable<V> get(K k) {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
try {
keyCoder.encode(k, output);
} catch (IOException e) {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
index 5757dc9062a..3078f69c3f1 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
@@ -357,7 +358,7 @@ public class MultimapUserState<K, V> {
private ByteString encodeValues(Iterable<V> values) {
try {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
for (V value : values) {
valueCoder.encode(value, output);
}
@@ -373,7 +374,7 @@ public class MultimapUserState<K, V> {
private StateRequest createUserStateRequest(K key) {
try {
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
mapKeyCoder.encode(key, output);
StateRequest.Builder request = userStateRequest.toBuilder();
request.getStateKeyBuilder().getMultimapUserStateBuilder().setMapKey(output.toByteString());
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 27da531e53f..de8be4252da 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -132,6 +132,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
@@ -1337,7 +1338,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
}
private ByteString encode(String... values) throws IOException {
- ByteString.Output out = ByteString.newOutput();
+ ByteStringOutputStream out = new ByteStringOutputStream();
for (String value : values) {
StringUtf8Coder.of().encode(value, out);
}
@@ -2997,8 +2998,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
}
private static SplitResult createSplitResult(double fractionOfRemainder) {
- ByteString.Output primaryBytes = ByteString.newOutput();
- ByteString.Output residualBytes = ByteString.newOutput();
+ ByteStringOutputStream primaryBytes = new ByteStringOutputStream();
+ ByteStringOutputStream residualBytes = new ByteStringOutputStream();
try {
DoubleCoder.of().encode(fractionOfRemainder, primaryBytes);
DoubleCoder.of().encode(1 - fractionOfRemainder, residualBytes);
@@ -3181,7 +3182,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
SplitResult expectedElementSplit = createSplitResult(0);
BundleApplication expectedElementSplitPrimary =
Iterables.getOnlyElement(expectedElementSplit.getPrimaryRoots());
- ByteString.Output primaryBytes = ByteString.newOutput();
+ ByteStringOutputStream primaryBytes = new ByteStringOutputStream();
inputCoder.encode(
WindowedValue.of(
KV.of(
@@ -3198,7 +3199,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
.build();
DelayedBundleApplication expectedElementSplitResidual =
Iterables.getOnlyElement(expectedElementSplit.getResidualRoots());
- ByteString.Output residualBytes = ByteString.newOutput();
+ ByteStringOutputStream residualBytes = new ByteStringOutputStream();
inputCoder.encode(
WindowedValue.of(
KV.of(
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 450628a31a9..e2c0c3f0b26 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -137,6 +137,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerFamilyDeclarati
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.DoFnWithExecutionInformation;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
@@ -1095,9 +1096,9 @@ public class ProcessBundleHandlerTest {
ProcessBundleHandler handler =
setupProcessBundleHandlerForSimpleRecordingDoFn(dataOutput, timerOutput, false);
- ByteString.Output encodedData = ByteString.newOutput();
+ ByteStringOutputStream encodedData = new ByteStringOutputStream();
KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
- ByteString.Output encodedTimer = ByteString.newOutput();
+ ByteStringOutputStream encodedTimer = new ByteStringOutputStream();
Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
.encode(
Timer.of(
@@ -1160,7 +1161,7 @@ public class ProcessBundleHandlerTest {
ProcessBundleHandler handler =
setupProcessBundleHandlerForSimpleRecordingDoFn(dataOutput, timerOutput, false);
- ByteString.Output encodedData = ByteString.newOutput();
+ ByteStringOutputStream encodedData = new ByteStringOutputStream();
KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()).encode(KV.of("", "data"), encodedData);
assertThrows(
@@ -1216,7 +1217,7 @@ public class ProcessBundleHandlerTest {
ProcessBundleHandler handler =
setupProcessBundleHandlerForSimpleRecordingDoFn(dataOutput, timerOutput, false);
- ByteString.Output encodedTimer = ByteString.newOutput();
+ ByteStringOutputStream encodedTimer = new ByteStringOutputStream();
Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
.encode(
Timer.of(
@@ -1310,7 +1311,7 @@ public class ProcessBundleHandlerTest {
ProcessBundleHandler handler =
setupProcessBundleHandlerForSimpleRecordingDoFn(dataOutput, timerOutput, true);
- ByteString.Output encodedTimer = ByteString.newOutput();
+ ByteStringOutputStream encodedTimer = new ByteStringOutputStream();
Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)
.encode(
Timer.of(
@@ -1446,7 +1447,7 @@ public class ProcessBundleHandlerTest {
Mockito.doAnswer(
(invocation) -> {
- ByteString.Output encodedData = ByteString.newOutput();
+ ByteStringOutputStream encodedData = new ByteStringOutputStream();
StringUtf8Coder.of().encode("A", encodedData);
String instructionId = invocation.getArgument(0, String.class);
CloseableFnDataReceiver<BeamFnApi.Elements> data =
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
index 7b1cef87369..7b2bce0641b 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/BagUserStateTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -252,7 +253,7 @@ public class BagUserStateTest {
}
private ByteString encode(String... values) throws IOException {
- ByteString.Output out = ByteString.newOutput();
+ ByteStringOutputStream out = new ByteStringOutputStream();
for (String value : values) {
StringUtf8Coder.of().encode(value, out);
}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
index 954b37d294e..17ef134b8e5 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
@@ -37,6 +37,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
@@ -67,7 +68,7 @@ public class FakeBeamFnStateClient implements BeamFnStateClient {
initialData,
(KV<Coder<?>, List<?>> coderAndValues) -> {
List<ByteString> chunks = new ArrayList<>();
- ByteString.Output output = ByteString.newOutput();
+ ByteStringOutputStream output = new ByteStringOutputStream();
for (Object value : coderAndValues.getValue()) {
try {
((Coder<Object>) coderAndValues.getKey()).encode(value, output);
@@ -75,7 +76,7 @@ public class FakeBeamFnStateClient implements BeamFnStateClient {
throw new RuntimeException(e);
}
if (output.size() >= chunkSize) {
- ByteString chunk = output.toByteString();
+ ByteString chunk = output.toByteStringAndReset();
int i = 0;
for (; i + chunkSize <= chunk.size(); i += chunkSize) {
// We specifically use a copy of the bytes instead of a proper substring
@@ -88,7 +89,6 @@ public class FakeBeamFnStateClient implements BeamFnStateClient {
chunks.add(
ByteString.copyFrom(chunk.substring(i, chunk.size()).toByteArray()));
}
- output.reset();
}
}
// Add the last chunk
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java
index 6fea5e10617..f0412fdddc6 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java
@@ -28,6 +28,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -142,7 +143,7 @@ public class MultimapSideInputTest {
}
private StateKey key(byte[] key) throws IOException {
- ByteString.Output out = ByteString.newOutput();
+ ByteStringOutputStream out = new ByteStringOutputStream();
ByteArrayCoder.of().encode(key, out);
return StateKey.newBuilder()
.setMultimapSideInput(
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapUserStateTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapUserStateTest.java
index fccd8e7c1bd..dbd2add8829 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapUserStateTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapUserStateTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -1060,7 +1061,7 @@ public class MultimapUserStateTest {
}
private ByteString encode(String... values) throws IOException {
- ByteString.Output out = ByteString.newOutput();
+ ByteStringOutputStream out = new ByteStringOutputStream();
for (String value : values) {
StringUtf8Coder.of().encode(value, out);
}
@@ -1068,7 +1069,7 @@ public class MultimapUserStateTest {
}
private ByteString encode(byte[]... values) throws IOException {
- ByteString.Output out = ByteString.newOutput();
+ ByteStringOutputStream out = new ByteStringOutputStream();
for (byte[] value : values) {
ByteArrayCoder.of().encode(value, out);
}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
index 8b05def2bfc..b6e8fe7962c 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateBackedIterableTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -297,7 +298,7 @@ public class StateBackedIterableTest {
}
private static ByteString encode(String... values) throws IOException {
- ByteString.Output out = ByteString.newOutput();
+ ByteStringOutputStream out = new ByteStringOutputStream();
for (String value : values) {
StringUtf8Coder.of().encode(value, out);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/Uuid.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/Uuid.java
index 969233eda92..16e5b116432 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/Uuid.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/Uuid.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Base64;
import java.util.UUID;
import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
/** A Uuid storable in a Pub/Sub Lite attribute. */
@DefaultCoder(UuidCoder.class)
@@ -42,7 +43,7 @@ public abstract class Uuid {
public static Uuid random() {
UUID uuid = UUID.randomUUID();
- ByteString.Output output = ByteString.newOutput(16);
+ ByteStringOutputStream output = new ByteStringOutputStream(16);
DataOutputStream stream = new DataOutputStream(output);
try {
stream.writeLong(uuid.getMostSignificantBits());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
index bb8ddc4034a..39750f2c673 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
@@ -35,9 +35,9 @@ import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
@@ -205,7 +205,7 @@ public class PubsubIOExternalTest {
}
private static ExternalTransforms.ExternalConfigurationPayload encodeRow(Row row) {
- ByteString.Output outputStream = ByteString.newOutput();
+ ByteStringOutputStream outputStream = new ByteStringOutputStream();
try {
SchemaCoder.of(row.getSchema()).encode(row, outputStream);
} catch (IOException e) {
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index d9489a8fc7c..b62da988d42 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -43,8 +43,8 @@ import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -375,7 +375,7 @@ public class KafkaIOExternalTest {
}
private static ExternalConfigurationPayload encodeRow(Row row) {
- ByteString.Output outputStream = ByteString.newOutput();
+ ByteStringOutputStream outputStream = new ByteStringOutputStream();
try {
SchemaCoder.of(row.getSchema()).encode(row, outputStream);
} catch (IOException e) {