You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/02/20 00:57:28 UTC
[beam] 06/13: Use WindowedValue.withValue on hot paths #21250 (#25519)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 0dd240529d120c49155a0eb88274a0e9f2047091
Author: Luke Cwik <lu...@gmail.com>
AuthorDate: Fri Feb 17 16:58:29 2023 -0800
Use WindowedValue.withValue on hot paths #21250 (#25519)
* Use WindowedValue.withValue on hot paths #21250
This removed about half of the overhead for outputting a value in the common scenario where we are already using a valid timestamp (the input timestamp) and also that we can use the `withValue` hot path which is optimized for certain use cases (e.g. the globally windowed value case).
Before:
```
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 3616.761 ± 157.844 ops/s
```
After:
```
Benchmark Mode Cnt Score Error Units
ProcessBundleBenchmark.testLargeBundle thrpt 15 3666.889 ± 151.448 ops/s
```
This is for #21250.
---
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 257 +++++++++++++++++----
1 file changed, 209 insertions(+), 48 deletions(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 0cfcb0a84f2..561bb0f39fd 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2165,7 +2165,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
/** Provides arguments for a {@link DoFnInvoker} for a window observing method. */
- private class WindowObservingProcessBundleContext extends ProcessBundleContextBase {
+ private abstract class WindowObservingProcessBundleContextBase extends ProcessBundleContextBase {
@Override
public BoundedWindow window() {
return currentWindow;
@@ -2180,6 +2180,53 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
public <T> T sideInput(PCollectionView<T> view) {
return stateAccessor.get(view, currentWindow);
}
+ }
+
+ private class WindowObservingProcessBundleContext
+ extends WindowObservingProcessBundleContextBase {
+
+ @Override
+ public void output(OutputT output) {
+ // Don't need to check timestamp since we can always output using the input timestamp.
+ outputTo(
+ mainOutputConsumer,
+ WindowedValue.of(
+ output, currentElement.getTimestamp(), currentWindow, currentElement.getPane()));
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, T output) {
+ FnDataReceiver<WindowedValue<T>> consumer =
+ (FnDataReceiver) localNameToConsumer.get(tag.getId());
+ if (consumer == null) {
+ throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+ }
+ // Don't need to check timestamp since we can always output using the input timestamp.
+ outputTo(
+ consumer,
+ WindowedValue.of(
+ output, currentElement.getTimestamp(), currentWindow, currentElement.getPane()));
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
+ outputTo(
+ mainOutputConsumer,
+ WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
+ }
+
+ @Override
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
+ FnDataReceiver<WindowedValue<T>> consumer =
+ (FnDataReceiver) localNameToConsumer.get(tag.getId());
+ if (consumer == null) {
+ throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+ }
+ outputTo(
+ consumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
+ }
@Override
public State state(String stateId, boolean alwaysFetched) {
@@ -2232,37 +2279,62 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
currentElement.getTimestamp(),
currentElement.getPane());
}
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
- outputTo(
- mainOutputConsumer,
- WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
- }
-
- @Override
- public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
- FnDataReceiver<WindowedValue<T>> consumer =
- (FnDataReceiver) localNameToConsumer.get(tag.getId());
- if (consumer == null) {
- throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
- }
- outputTo(
- consumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
- }
}
/** This context outputs KV<KV<Element, KV<Restriction, WatemarkEstimatorState>>, Size>. */
private class SizedRestrictionWindowObservingProcessBundleContext
- extends WindowObservingProcessBundleContext {
+ extends WindowObservingProcessBundleContextBase {
private final String errorContextPrefix;
SizedRestrictionWindowObservingProcessBundleContext(String errorContextPrefix) {
this.errorContextPrefix = errorContextPrefix;
}
+ @Override
+ // OutputT == RestrictionT
+ public void output(OutputT output) {
+ double size =
+ doFnInvoker.invokeGetSize(
+ new DelegatingArgumentProvider<InputT, OutputT>(
+ this, this.errorContextPrefix + "/GetSize") {
+ @Override
+ public Object restriction() {
+ return output;
+ }
+
+ @Override
+ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+ return currentElement.getTimestamp();
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return doFnInvoker.invokeNewTracker(this);
+ }
+ });
+
+ // Don't need to check timestamp since we can always output using the input timestamp.
+ outputTo(
+ mainOutputConsumer,
+ (WindowedValue<OutputT>)
+ WindowedValue.of(
+ KV.of(
+ KV.of(
+ currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)),
+ size),
+ currentElement.getTimestamp(),
+ currentWindow,
+ currentElement.getPane()));
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, T output) {
+ // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+ // of these methods when producing output.
+ throw new UnsupportedOperationException(
+ String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+ }
+
@Override
// OutputT == RestrictionT
public void outputWithTimestamp(OutputT output, Instant timestamp) {
@@ -2299,17 +2371,85 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
currentWindow,
currentElement.getPane()));
}
+
+ @Override
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+ // of these methods when producing output.
+ throw new UnsupportedOperationException(
+ String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+ }
+
+ @Override
+ public State state(String stateId, boolean alwaysFetched) {
+ throw new UnsupportedOperationException(
+ String.format("State unsupported in %s", errorContextPrefix));
+ }
+
+ @Override
+ public org.apache.beam.sdk.state.Timer timer(String timerId) {
+ throw new UnsupportedOperationException(
+ String.format("Timer unsupported in %s", errorContextPrefix));
+ }
+
+ @Override
+ public TimerMap timerFamily(String tagId) {
+ throw new UnsupportedOperationException(
+ String.format("Timer unsupported in %s", errorContextPrefix));
+ }
}
/** This context outputs KV<KV<Element, KV<Restriction, WatermarkEstimatorState>>, Size>. */
private class SizedRestrictionNonWindowObservingProcessBundleContext
- extends NonWindowObservingProcessBundleContext {
+ extends NonWindowObservingProcessBundleContextBase {
private final String errorContextPrefix;
SizedRestrictionNonWindowObservingProcessBundleContext(String errorContextPrefix) {
this.errorContextPrefix = errorContextPrefix;
}
+ @Override
+ // OutputT == RestrictionT
+ public void output(OutputT output) {
+ double size =
+ doFnInvoker.invokeGetSize(
+ new DelegatingArgumentProvider<InputT, OutputT>(
+ this, errorContextPrefix + "/GetSize") {
+ @Override
+ public Object restriction() {
+ return output;
+ }
+
+ @Override
+ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+ return currentElement.getTimestamp();
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return doFnInvoker.invokeNewTracker(this);
+ }
+ });
+
+ // Don't need to check timestamp since we can always output using the input timestamp.
+ outputTo(
+ mainOutputConsumer,
+ (WindowedValue<OutputT>)
+ currentElement.withValue(
+ KV.of(
+ KV.of(
+ currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)),
+ size)));
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, T output) {
+ // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+ // of these methods when producing output.
+ throw new UnsupportedOperationException(
+ String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+ }
+
@Override
// OutputT == RestrictionT
public void outputWithTimestamp(OutputT output, Instant timestamp) {
@@ -2346,10 +2486,37 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
currentElement.getWindows(),
currentElement.getPane()));
}
+
+ @Override
+ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+ // of these methods when producing output.
+ throw new UnsupportedOperationException(
+ String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+ }
}
/** Provides arguments for a {@link DoFnInvoker} for a non-window observing method. */
- private class NonWindowObservingProcessBundleContext extends ProcessBundleContextBase {
+ private class NonWindowObservingProcessBundleContext
+ extends NonWindowObservingProcessBundleContextBase {
+
+ @Override
+ public void output(OutputT output) {
+ // Don't need to check timestamp since we can always output using the input timestamp.
+ outputTo(mainOutputConsumer, currentElement.withValue(output));
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, T output) {
+ FnDataReceiver<WindowedValue<T>> consumer =
+ (FnDataReceiver) localNameToConsumer.get(tag.getId());
+ if (consumer == null) {
+ throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+ }
+ // Don't need to check timestamp since we can always output using the input timestamp.
+ outputTo(consumer, currentElement.withValue(output));
+ }
+
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
checkTimestamp(timestamp);
@@ -2372,7 +2539,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
WindowedValue.of(
output, timestamp, currentElement.getWindows(), currentElement.getPane()));
}
+ }
+ /** Provides base arguments for a {@link DoFnInvoker} for a non-window observing method. */
+ private abstract class NonWindowObservingProcessBundleContextBase
+ extends ProcessBundleContextBase {
@Override
public BoundedWindow window() {
throw new UnsupportedOperationException(
@@ -2489,8 +2660,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
@Override
public void output(Row output) {
- ProcessBundleContextBase.this.outputWithTimestamp(
- fromRowFunction.apply(output), currentElement.getTimestamp());
+ ProcessBundleContextBase.this.output(fromRowFunction.apply(output));
}
@Override
@@ -2517,14 +2687,16 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
private final Map<TupleTag<?>, OutputReceiver<Row>> taggedRowReceivers = new HashMap<>();
private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> tag) {
+ // Note that it is important that we use the non-tag versions here when using the main
+ // output tag for performance reasons and we also rely on it for the splittable DoFn
+ // context objects as well.
if (tag == null || mainOutputTag.equals(tag)) {
return (OutputReceiver<T>) ProcessBundleContextBase.this;
}
return new OutputReceiver<T>() {
@Override
public void output(T output) {
- ProcessBundleContextBase.this.outputWithTimestamp(
- tag, output, currentElement.getTimestamp());
+ ProcessBundleContextBase.this.output(tag, output);
}
@Override
@@ -2535,6 +2707,9 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) {
+ // Note that it is important that we use the non-tag versions here when using the main
+ // output tag for performance reasons and we also rely on it for the splittable DoFn
+ // context objects as well.
if (tag == null || mainOutputTag.equals(tag)) {
checkState(
mainOutputSchemaCoder != null,
@@ -2555,8 +2730,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
@Override
public void output(Row output) {
- ProcessBundleContextBase.this.outputWithTimestamp(
- tag, fromRowFunction.apply(output), currentElement.getTimestamp());
+ ProcessBundleContextBase.this.output(tag, fromRowFunction.apply(output));
}
@Override
@@ -2615,16 +2789,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
return pipelineOptions;
}
- @Override
- public void output(OutputT output) {
- outputWithTimestamp(output, currentElement.getTimestamp());
- }
-
- @Override
- public <T> void output(TupleTag<T> tag, T output) {
- outputWithTimestamp(tag, output, currentElement.getTimestamp());
- }
-
@Override
public InputT element() {
return currentElement.getValue();
@@ -2777,8 +2941,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
@Override
public void output(Row output) {
- context.outputWithTimestamp(
- fromRowFunction.apply(output), currentElement.getTimestamp());
+ context.output(fromRowFunction.apply(output));
}
@Override
@@ -2810,7 +2973,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
return new OutputReceiver<T>() {
@Override
public void output(T output) {
- context.outputWithTimestamp(tag, output, currentElement.getTimestamp());
+ context.output(tag, output);
}
@Override
@@ -2841,8 +3004,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
@Override
public void output(Row output) {
- context.outputWithTimestamp(
- tag, fromRowFunction.apply(output), currentElement.getTimestamp());
+ context.output(tag, fromRowFunction.apply(output));
}
@Override
@@ -3071,7 +3233,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
return new OutputReceiver<T>() {
@Override
public void output(T output) {
- context.outputWithTimestamp(tag, output, currentElement.getTimestamp());
+ context.output(tag, output);
}
@Override
@@ -3102,8 +3264,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
@Override
public void output(Row output) {
- context.outputWithTimestamp(
- tag, fromRowFunction.apply(output), currentElement.getTimestamp());
+ context.output(tag, fromRowFunction.apply(output));
}
@Override