You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/02/20 00:57:28 UTC

[beam] 06/13: Use WindowedValue.withValue on hot paths #21250 (#25519)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-jobservices
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0dd240529d120c49155a0eb88274a0e9f2047091
Author: Luke Cwik <lu...@gmail.com>
AuthorDate: Fri Feb 17 16:58:29 2023 -0800

    Use WindowedValue.withValue on hot paths #21250 (#25519)
    
    * Use WindowedValue.withValue on hot paths #21250
    
    This removed about half of the overhead for outputting a value in the common scenario where we are already using a valid timestamp (the input timestamp) and also that we can use the `withValue` hot path which is optimized for certain use cases (e.g. the globally windowed value case).
    
    Before:
    ```
    Benchmark                                Mode  Cnt     Score     Error  Units
    ProcessBundleBenchmark.testLargeBundle  thrpt   15  3616.761 ± 157.844  ops/s
    ```
    
    After:
    ```
    Benchmark                                Mode  Cnt     Score     Error  Units
    ProcessBundleBenchmark.testLargeBundle  thrpt   15  3666.889 ± 151.448  ops/s
    ```
    
    This is for #21250.
---
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 257 +++++++++++++++++----
 1 file changed, 209 insertions(+), 48 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 0cfcb0a84f2..561bb0f39fd 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2165,7 +2165,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   }
 
   /** Provides arguments for a {@link DoFnInvoker} for a window observing method. */
-  private class WindowObservingProcessBundleContext extends ProcessBundleContextBase {
+  private abstract class WindowObservingProcessBundleContextBase extends ProcessBundleContextBase {
     @Override
     public BoundedWindow window() {
       return currentWindow;
@@ -2180,6 +2180,53 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
     public <T> T sideInput(PCollectionView<T> view) {
       return stateAccessor.get(view, currentWindow);
     }
+  }
+
+  private class WindowObservingProcessBundleContext
+      extends WindowObservingProcessBundleContextBase {
+
+    @Override
+    public void output(OutputT output) {
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(
+          mainOutputConsumer,
+          WindowedValue.of(
+              output, currentElement.getTimestamp(), currentWindow, currentElement.getPane()));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      FnDataReceiver<WindowedValue<T>> consumer =
+          (FnDataReceiver) localNameToConsumer.get(tag.getId());
+      if (consumer == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(
+          consumer,
+          WindowedValue.of(
+              output, currentElement.getTimestamp(), currentWindow, currentElement.getPane()));
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
+      outputTo(
+          mainOutputConsumer,
+          WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
+    }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
+      FnDataReceiver<WindowedValue<T>> consumer =
+          (FnDataReceiver) localNameToConsumer.get(tag.getId());
+      if (consumer == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      outputTo(
+          consumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
+    }
 
     @Override
     public State state(String stateId, boolean alwaysFetched) {
@@ -2232,37 +2279,62 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
           currentElement.getTimestamp(),
           currentElement.getPane());
     }
-
-    @Override
-    public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
-      outputTo(
-          mainOutputConsumer,
-          WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      // TODO: Check that timestamp is valid once all runners can provide proper timestamps.
-      FnDataReceiver<WindowedValue<T>> consumer =
-          (FnDataReceiver) localNameToConsumer.get(tag.getId());
-      if (consumer == null) {
-        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
-      }
-      outputTo(
-          consumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane()));
-    }
   }
 
   /** This context outputs KV<KV<Element, KV<Restriction, WatemarkEstimatorState>>, Size>. */
   private class SizedRestrictionWindowObservingProcessBundleContext
-      extends WindowObservingProcessBundleContext {
+      extends WindowObservingProcessBundleContextBase {
     private final String errorContextPrefix;
 
     SizedRestrictionWindowObservingProcessBundleContext(String errorContextPrefix) {
       this.errorContextPrefix = errorContextPrefix;
     }
 
+    @Override
+    // OutputT == RestrictionT
+    public void output(OutputT output) {
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, this.errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return currentElement.getTimestamp();
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)),
+                      size),
+                  currentElement.getTimestamp(),
+                  currentWindow,
+                  currentElement.getPane()));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+    }
+
     @Override
     // OutputT == RestrictionT
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
@@ -2299,17 +2371,85 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
                   currentWindow,
                   currentElement.getPane()));
     }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+    }
+
+    @Override
+    public State state(String stateId, boolean alwaysFetched) {
+      throw new UnsupportedOperationException(
+          String.format("State unsupported in %s", errorContextPrefix));
+    }
+
+    @Override
+    public org.apache.beam.sdk.state.Timer timer(String timerId) {
+      throw new UnsupportedOperationException(
+          String.format("Timer unsupported in %s", errorContextPrefix));
+    }
+
+    @Override
+    public TimerMap timerFamily(String tagId) {
+      throw new UnsupportedOperationException(
+          String.format("Timer unsupported in %s", errorContextPrefix));
+    }
   }
 
   /** This context outputs KV<KV<Element, KV<Restriction, WatermarkEstimatorState>>, Size>. */
   private class SizedRestrictionNonWindowObservingProcessBundleContext
-      extends NonWindowObservingProcessBundleContext {
+      extends NonWindowObservingProcessBundleContextBase {
     private final String errorContextPrefix;
 
     SizedRestrictionNonWindowObservingProcessBundleContext(String errorContextPrefix) {
       this.errorContextPrefix = errorContextPrefix;
     }
 
+    @Override
+    // OutputT == RestrictionT
+    public void output(OutputT output) {
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return currentElement.getTimestamp();
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(
+          mainOutputConsumer,
+          (WindowedValue<OutputT>)
+              currentElement.withValue(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)),
+                      size)));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+    }
+
     @Override
     // OutputT == RestrictionT
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
@@ -2346,10 +2486,37 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
                   currentElement.getWindows(),
                   currentElement.getPane()));
     }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions
+      // of these methods when producing output.
+      throw new UnsupportedOperationException(
+          String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix));
+    }
   }
 
   /** Provides arguments for a {@link DoFnInvoker} for a non-window observing method. */
-  private class NonWindowObservingProcessBundleContext extends ProcessBundleContextBase {
+  private class NonWindowObservingProcessBundleContext
+      extends NonWindowObservingProcessBundleContextBase {
+
+    @Override
+    public void output(OutputT output) {
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(mainOutputConsumer, currentElement.withValue(output));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      FnDataReceiver<WindowedValue<T>> consumer =
+          (FnDataReceiver) localNameToConsumer.get(tag.getId());
+      if (consumer == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      // Don't need to check timestamp since we can always output using the input timestamp.
+      outputTo(consumer, currentElement.withValue(output));
+    }
+
     @Override
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
       checkTimestamp(timestamp);
@@ -2372,7 +2539,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
           WindowedValue.of(
               output, timestamp, currentElement.getWindows(), currentElement.getPane()));
     }
+  }
 
+  /** Provides base arguments for a {@link DoFnInvoker} for a non-window observing method. */
+  private abstract class NonWindowObservingProcessBundleContextBase
+      extends ProcessBundleContextBase {
     @Override
     public BoundedWindow window() {
       throw new UnsupportedOperationException(
@@ -2489,8 +2660,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
               @Override
               public void output(Row output) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    fromRowFunction.apply(output), currentElement.getTimestamp());
+                ProcessBundleContextBase.this.output(fromRowFunction.apply(output));
               }
 
               @Override
@@ -2517,14 +2687,16 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
           private final Map<TupleTag<?>, OutputReceiver<Row>> taggedRowReceivers = new HashMap<>();
 
           private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> tag) {
+            // Note that it is important that we use the non-tag versions here when using the main
+            // output tag for performance reasons and we also rely on it for the splittable DoFn
+            // context objects as well.
             if (tag == null || mainOutputTag.equals(tag)) {
               return (OutputReceiver<T>) ProcessBundleContextBase.this;
             }
             return new OutputReceiver<T>() {
               @Override
               public void output(T output) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    tag, output, currentElement.getTimestamp());
+                ProcessBundleContextBase.this.output(tag, output);
               }
 
               @Override
@@ -2535,6 +2707,9 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
           }
 
           private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) {
+            // Note that it is important that we use the non-tag versions here when using the main
+            // output tag for performance reasons and we also rely on it for the splittable DoFn
+            // context objects as well.
             if (tag == null || mainOutputTag.equals(tag)) {
               checkState(
                   mainOutputSchemaCoder != null,
@@ -2555,8 +2730,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
               @Override
               public void output(Row output) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    tag, fromRowFunction.apply(output), currentElement.getTimestamp());
+                ProcessBundleContextBase.this.output(tag, fromRowFunction.apply(output));
               }
 
               @Override
@@ -2615,16 +2789,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
       return pipelineOptions;
     }
 
-    @Override
-    public void output(OutputT output) {
-      outputWithTimestamp(output, currentElement.getTimestamp());
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-      outputWithTimestamp(tag, output, currentElement.getTimestamp());
-    }
-
     @Override
     public InputT element() {
       return currentElement.getValue();
@@ -2777,8 +2941,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
               @Override
               public void output(Row output) {
-                context.outputWithTimestamp(
-                    fromRowFunction.apply(output), currentElement.getTimestamp());
+                context.output(fromRowFunction.apply(output));
               }
 
               @Override
@@ -2810,7 +2973,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
             return new OutputReceiver<T>() {
               @Override
               public void output(T output) {
-                context.outputWithTimestamp(tag, output, currentElement.getTimestamp());
+                context.output(tag, output);
               }
 
               @Override
@@ -2841,8 +3004,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
               @Override
               public void output(Row output) {
-                context.outputWithTimestamp(
-                    tag, fromRowFunction.apply(output), currentElement.getTimestamp());
+                context.output(tag, fromRowFunction.apply(output));
               }
 
               @Override
@@ -3071,7 +3233,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
             return new OutputReceiver<T>() {
               @Override
               public void output(T output) {
-                context.outputWithTimestamp(tag, output, currentElement.getTimestamp());
+                context.output(tag, output);
               }
 
               @Override
@@ -3102,8 +3264,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
               @Override
               public void output(Row output) {
-                context.outputWithTimestamp(
-                    tag, fromRowFunction.apply(output), currentElement.getTimestamp());
+                context.output(tag, fromRowFunction.apply(output));
               }
 
               @Override