You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/01 03:18:20 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #12016: [BEAM-10341] Support drain in python and java SDF

lukecwik commented on a change in pull request #12016:
URL: https://github.com/apache/beam/pull/12016#discussion_r448079892



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
##########
@@ -306,6 +308,18 @@ public static void invokeSplitRestriction(DoFnInvoker.ArgumentProvider argumentP
     }
   }
 
+  /** Default implementation of {@link TruncateRestriction}, for delegation by bytebuddy. */
+  public static class DefaultTruncateRestriction {
+
+    /** Return the current restriction if it's bounded.Otherwise, return null. */

Review comment:
       ```suggestion
       /** Output the current restriction if it is bounded. */
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
##########
@@ -140,4 +140,15 @@ public Progress getProgress() {
         totalWork.subtract(workRemaining, MathContext.DECIMAL128).doubleValue(),
         workRemaining.doubleValue());
   }
+
+  @Override
+  public RestrictionBoundness isBounded() {
+    // If current range has been done, the range should be bounded.
+    if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) {

Review comment:
       Note that for any restriction that is done, it won't matter if we return bounded or unbounded since processing them to completion should be a no-op.
   
   I would drop this logic and only keep the `range.getTo() == Long.MAX_VALUE` you have below.

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -383,6 +383,14 @@ message StandardPTransforms {
     //
     // Input: KV(KV(element, restriction), size); output: DoFn's output.
     PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS = 2 [(beam_urn) = "beam:transform:sdf_process_sized_element_and_restrictions:v1"];
+
+    // Truncates the restriction of each element/restriction pair and returns
+    // the finite restriction which will be processed when a pipeline is
+    // drained: https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#.

Review comment:
       ```suggestion
       // drained. See https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit# for additional details about drain.
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1053,6 +1056,51 @@ public Duration getAllowedTimestampSkew() {
   @Experimental(Kind.SPLITTABLE_DO_FN)
   public @interface SplitRestriction {}
 
+  /**
+   * Annotation for the method that truncates the restriction of a <a
+   * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into a bounded one to
+   * be processed when pipeline starts to drain.
+   *
+   * <p>This method is used to perform truncation of the restriction while it is not actively being
+   * processed.
+   *
+   * <p>Signature: {@code void truncateRestriction(<arguments>);}
+   *
+   * <p>This method must satisfy the following constraints:
+   *
+   * <ul>
+   *   <li>If one of the arguments is of type {@link OutputReceiver}, then it will be passed an
+   *       output receiver for outputting the truncated restrictions. All truncated restrictions
+   *       must be output through this parameter.
+   *   <li>If one of its arguments is tagged with the {@link Element} annotation, then it will be
+   *       passed the current element being processed; the argument must be of type {@code InputT}.
+   *       Note that automatic conversion of {@link Row}s and {@link FieldAccess} parameters are
+   *       currently unsupported.
+   *   <li>If one of its arguments is tagged with the {@link Restriction} annotation, then it will
+   *       be passed the current restriction being processed; the argument must be of type {@code
+   *       RestrictionT}.
+   *   <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
+   *       passed the timestamp of the current element being processed; the argument must be of type
+   *       {@link Instant}.
+   *   <li>If one of its arguments is a {@link RestrictionTracker}, then it will be passed a tracker
+   *       that is initialized for the current {@link Restriction}. The argument must be of the
+   *       exact type {@code RestrictionTracker<RestrictionT, PositionT>}.
+   *   <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the
+   *       window of the current element. When applied by {@link ParDo} the subtype of {@link
+   *       BoundedWindow} must match the type of windows on the input {@link PCollection}. If the
+   *       window is not accessed a runner may perform additional optimizations.
+   *   <li>If one of its arguments is of type {@link PaneInfo}, then it will be passed information
+   *       about the current triggering pane.
+   *   <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
+   *       options for the current pipeline.
+   * </ul>

Review comment:
       ```suggestion
      * </ul>
      *
      * <p>The default behavior when a pipeline is being drained is that {@link IS_BOUNDED} restrictions process entirely while {@link IS_UNBOUNDED} restrictions process till a checkpoint is possible. Splittable {@link DoFn}s should only provide this method if they want to change this default behavior.
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +99,24 @@
    */
   public abstract void checkDone() throws IllegalStateException;
 
+  public enum RestrictionBoundness {
+    IS_BOUNDED,
+    IS_UNBOUNDED
+  }
+
+  /**
+   * Return the boundedness of current tracking restriction. If the current restriction produces
+   * finite output, it should return {@link RestrictionBoundness#IS_BOUNDED}. Otherwise, it should
+   * return {@link RestrictionBoundness#IS_UNBOUNDED}.
+   *
+   * <p>The API is called by the system when the pipeline starts to drain and there is no
+   * implementation of {@link DoFn.TruncateRestriction}. Based on the boundness of the restriction,
+   * the system will give the default behavior of truncating restrictions.
+   *
+   * <p>The API is required to be implemented.
+   */

Review comment:
       ```suggestion
      * Returns the boundedness of the current restriction. If the current restriction represents
      * a finite amount of work, it should return {@link RestrictionBoundedness#IS_BOUNDED}. Otherwise, it should
      * return {@link RestrictionBoundness#IS_UNBOUNDED}.
      */
   ```
   
   Users have to override the implementation so saying it is required is not useful. I know the other docs say things like this but before we say splittable DoFns are not `@Experimental`, we'll want to review the documentation and fix things like this.

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
##########
@@ -742,10 +785,202 @@ public void outputWithTimestamp(String output, Instant instant) {
             new FakeArgumentProvider<String, String>() {
               @Override
               public Object restriction() {
-                return new RestrictionWithDefaultTracker();
+                return new RestrictionWithBoundedDefaultTracker();
               }
             }),
-        instanceOf(DefaultTracker.class));
+        instanceOf(BoundedDefaultTracker.class));
+    assertThat(
+        invoker.invokeNewWatermarkEstimator(
+            new FakeArgumentProvider<String, String>() {
+              @Override
+              public Object watermarkEstimatorState() {
+                return new WatermarkEstimatorStateWithDefaultWatermarkEstimator();
+              }
+            }),
+        instanceOf(DefaultWatermarkEstimator.class));
+  }
+
+  @Test
+  public void testTruncateFnWithHasDefaultMethodsWhenBounded() throws Exception {
+    class BoundedMockFn extends DoFn<String, String> {
+      @ProcessElement
+      public void processElement(
+          ProcessContext c,
+          RestrictionTracker<RestrictionWithBoundedDefaultTracker, Void> tracker,
+          WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator>
+              watermarkEstimator) {}
+
+      @GetInitialRestriction
+      public RestrictionWithBoundedDefaultTracker getInitialRestriction(@Element String element) {
+        return null;
+      }
+
+      @GetInitialWatermarkEstimatorState
+      public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+          getInitialWatermarkEstimatorState() {
+        return null;
+      }
+    }
+
+    BoundedMockFn fn = mock(BoundedMockFn.class);
+    DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
+
+    CoderRegistry coderRegistry = CoderRegistry.createDefault();
+    coderRegistry.registerCoderProvider(
+        CoderProviders.fromStaticMethods(
+            RestrictionWithBoundedDefaultTracker.class, CoderForDefaultTracker.class));
+    coderRegistry.registerCoderForClass(
+        WatermarkEstimatorStateWithDefaultWatermarkEstimator.class,
+        new CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator());
+    assertThat(
+        invoker.<RestrictionWithBoundedDefaultTracker>invokeGetRestrictionCoder(coderRegistry),
+        instanceOf(CoderForDefaultTracker.class));
+    assertThat(
+        invoker.invokeGetWatermarkEstimatorStateCoder(coderRegistry),
+        instanceOf(CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator.class));
+    RestrictionTracker tracker =
+        invoker.invokeNewTracker(
+            new FakeArgumentProvider<String, String>() {
+              @Override
+              public Object restriction() {
+                return new RestrictionWithBoundedDefaultTracker();
+              }
+            });
+    assertThat(tracker, instanceOf(BoundedDefaultTracker.class));
+    invoker.invokeTruncateRestriction(
+        new FakeArgumentProvider<String, String>() {
+          @Override
+          public RestrictionTracker restrictionTracker() {
+            return tracker;
+          }
+
+          @Override
+          public String element(DoFn<String, String> doFn) {
+            return "blah";
+          }
+
+          @Override
+          public Object restriction() {
+            return "foo";
+          }
+
+          @Override
+          public OutputReceiver<String> outputReceiver(DoFn<String, String> doFn) {
+            return new DoFn.OutputReceiver<String>() {
+              private boolean invoked;
+
+              @Override
+              public void output(String output) {
+                assertFalse(invoked);

Review comment:
       This validation only happens within this method so if it is never invoked, no validation occurs. We have to perform the validation outside of this method or validate that the method was invoked somehow.
   
   I know that other tests did this incorrectly as well. We can fix now or as a follow-up PR.

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
##########
@@ -956,6 +983,96 @@ public void splitRestriction(
     DoFnSignatures.getSignature(BadFn.class);
   }
 
+  @Test
+  public void testTruncateRestrictionReturnsWrongType() throws Exception {

Review comment:
       nit: rename this test and the other ones like this since we aren't returning but having the wrong output receiver type
   ```suggestion
     public void testTruncateRestrictionWithOutputReceiverWithWrongGenericType() throws Exception {
   ```

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -320,6 +320,22 @@ def split_and_size(self, element, restriction):
     for part in self.split(element, restriction):
       yield part, self.restriction_size(element, part)
 
+  def truncate(self, element, restriction):
+    """Truncate the given restriction into finite amount of work when the
+    pipeline starts to drain.
+
+    By default, if the restriction is bounded, it will return the entire current
+    restriction. If the restriction is unbounded, it will return None.
+
+    The method throws NotImplementError when RestrictionTracker.is_bounded() is
+    not implemented.
+
+    It's recommended to implement this API if more granularity is required.
+    """
+    restriction_tracker = self.create_tracker(restriction)
+    if restriction_tracker.is_bounded():
+      return restriction

Review comment:
       We expect 0 or 1 which is why the yield is important. It also wouldn't matter if they returned more then 1. It might even help the drain complete sooner.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +99,24 @@
    */
   public abstract void checkDone() throws IllegalStateException;
 
+  public enum RestrictionBoundness {
+    IS_BOUNDED,
+    IS_UNBOUNDED

Review comment:
       ```suggestion
       /** Indicates that a {@link Restriction} represents a bounded amount of work. */
       IS_BOUNDED,
       /** Indicates that a {@link Restriction} represents an unbounded amount of work. */
       IS_UNBOUNDED
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +99,24 @@
    */
   public abstract void checkDone() throws IllegalStateException;
 
+  public enum RestrictionBoundness {

Review comment:
       
   ```suggestion
     public enum RestrictionBoundedness {
   ```
   
   Alternatively we could go with `IsBounded` just like PCollection.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -672,6 +672,9 @@ public Duration getAllowedTimestampSkew() {
    *       perform bulk splitting initially allowing for a rapid increase in parallelism. See {@link
    *       RestrictionTracker#trySplit} for details about splitting when the current element and
    *       restriction are actively being processed.
+   *   <li>It <i>may</i> define a {@link TruncateRestriction} method to override the default
+   *       implementation {@code DefaultTruncateRestriction}, This method truncates a given
+   *       restriction into a bounded restriction when pipeline is draining.

Review comment:
       ```suggestion
      *   <li>It <i>may</i> define a {@link TruncateRestriction} method to choose how to truncate a restriction such that it represents a finite amount of work when the pipeline is draining. See {@link TruncateRestriction} and {@link RestrictionTracker#isBounded} for additional details.
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1053,6 +1056,51 @@ public Duration getAllowedTimestampSkew() {
   @Experimental(Kind.SPLITTABLE_DO_FN)
   public @interface SplitRestriction {}
 
+  /**
+   * Annotation for the method that truncates the restriction of a <a
+   * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into a bounded one to
+   * be processed when pipeline starts to drain.

Review comment:
       ```suggestion
      * Annotation for the method that truncates the restriction of a <a
      * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into a bounded one. This method is invoked when a pipeline is being <a href="https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#">drained</a>.
   ```

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
##########
@@ -742,10 +785,202 @@ public void outputWithTimestamp(String output, Instant instant) {
             new FakeArgumentProvider<String, String>() {
               @Override
               public Object restriction() {
-                return new RestrictionWithDefaultTracker();
+                return new RestrictionWithBoundedDefaultTracker();
               }
             }),
-        instanceOf(DefaultTracker.class));
+        instanceOf(BoundedDefaultTracker.class));
+    assertThat(
+        invoker.invokeNewWatermarkEstimator(
+            new FakeArgumentProvider<String, String>() {
+              @Override
+              public Object watermarkEstimatorState() {
+                return new WatermarkEstimatorStateWithDefaultWatermarkEstimator();
+              }
+            }),
+        instanceOf(DefaultWatermarkEstimator.class));
+  }
+
+  @Test
+  public void testTruncateFnWithHasDefaultMethodsWhenBounded() throws Exception {
+    class BoundedMockFn extends DoFn<String, String> {
+      @ProcessElement
+      public void processElement(
+          ProcessContext c,
+          RestrictionTracker<RestrictionWithBoundedDefaultTracker, Void> tracker,
+          WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator>
+              watermarkEstimator) {}
+
+      @GetInitialRestriction
+      public RestrictionWithBoundedDefaultTracker getInitialRestriction(@Element String element) {
+        return null;
+      }
+
+      @GetInitialWatermarkEstimatorState
+      public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+          getInitialWatermarkEstimatorState() {
+        return null;
+      }
+    }
+
+    BoundedMockFn fn = mock(BoundedMockFn.class);
+    DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
+
+    CoderRegistry coderRegistry = CoderRegistry.createDefault();
+    coderRegistry.registerCoderProvider(
+        CoderProviders.fromStaticMethods(
+            RestrictionWithBoundedDefaultTracker.class, CoderForDefaultTracker.class));
+    coderRegistry.registerCoderForClass(
+        WatermarkEstimatorStateWithDefaultWatermarkEstimator.class,
+        new CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator());
+    assertThat(
+        invoker.<RestrictionWithBoundedDefaultTracker>invokeGetRestrictionCoder(coderRegistry),
+        instanceOf(CoderForDefaultTracker.class));
+    assertThat(
+        invoker.invokeGetWatermarkEstimatorStateCoder(coderRegistry),
+        instanceOf(CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator.class));
+    RestrictionTracker tracker =
+        invoker.invokeNewTracker(
+            new FakeArgumentProvider<String, String>() {
+              @Override
+              public Object restriction() {
+                return new RestrictionWithBoundedDefaultTracker();
+              }
+            });
+    assertThat(tracker, instanceOf(BoundedDefaultTracker.class));
+    invoker.invokeTruncateRestriction(
+        new FakeArgumentProvider<String, String>() {
+          @Override
+          public RestrictionTracker restrictionTracker() {
+            return tracker;
+          }
+
+          @Override
+          public String element(DoFn<String, String> doFn) {
+            return "blah";
+          }
+
+          @Override
+          public Object restriction() {
+            return "foo";
+          }
+
+          @Override
+          public OutputReceiver<String> outputReceiver(DoFn<String, String> doFn) {
+            return new DoFn.OutputReceiver<String>() {
+              private boolean invoked;
+
+              @Override
+              public void output(String output) {
+                assertFalse(invoked);
+                invoked = true;
+                assertEquals("foo", output);
+              }
+
+              @Override
+              public void outputWithTimestamp(String output, Instant instant) {
+                assertFalse(invoked);
+                invoked = true;
+                assertEquals("foo", output);
+              }
+            };
+          }
+        });
+    assertEquals(stop(), invoker.invokeProcessElement(mockArgumentProvider));
+    assertThat(
+        invoker.invokeNewWatermarkEstimator(
+            new FakeArgumentProvider<String, String>() {
+              @Override
+              public Object watermarkEstimatorState() {
+                return new WatermarkEstimatorStateWithDefaultWatermarkEstimator();
+              }
+            }),
+        instanceOf(DefaultWatermarkEstimator.class));
+  }
+
+  @Test
+  public void testTruncateFnWithHasDefaultMethodsWhenUnbounded() throws Exception {
+    class UnboundedMockFn extends DoFn<String, String> {
+      @ProcessElement
+      public void processElement(
+          ProcessContext c,
+          RestrictionTracker<RestrictionWithUnboundedDefaultTracker, Void> tracker,
+          WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator>
+              watermarkEstimator) {}
+
+      @GetInitialRestriction
+      public RestrictionWithUnboundedDefaultTracker getInitialRestriction(@Element String element) {
+        return null;
+      }
+
+      @GetInitialWatermarkEstimatorState
+      public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+          getInitialWatermarkEstimatorState() {
+        return null;
+      }
+    }
+
+    UnboundedMockFn fn = mock(UnboundedMockFn.class);
+    DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
+
+    CoderRegistry coderRegistry = CoderRegistry.createDefault();
+    coderRegistry.registerCoderProvider(
+        CoderProviders.fromStaticMethods(
+            RestrictionWithUnboundedDefaultTracker.class, CoderForDefaultTracker.class));
+    coderRegistry.registerCoderForClass(
+        WatermarkEstimatorStateWithDefaultWatermarkEstimator.class,
+        new CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator());
+    assertThat(
+        invoker.<RestrictionWithBoundedDefaultTracker>invokeGetRestrictionCoder(coderRegistry),
+        instanceOf(CoderForDefaultTracker.class));
+    assertThat(
+        invoker.invokeGetWatermarkEstimatorStateCoder(coderRegistry),
+        instanceOf(CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator.class));
+    RestrictionTracker tracker =
+        invoker.invokeNewTracker(
+            new FakeArgumentProvider<String, String>() {
+              @Override
+              public Object restriction() {
+                return new RestrictionWithUnboundedDefaultTracker();
+              }
+            });
+    assertThat(tracker, instanceOf(UnboundedDefaultTracker.class));
+    invoker.invokeTruncateRestriction(
+        new FakeArgumentProvider<String, String>() {
+          @Override
+          public RestrictionTracker restrictionTracker() {
+            return tracker;
+          }
+
+          @Override
+          public String element(DoFn<String, String> doFn) {
+            return "blah";
+          }
+
+          @Override
+          public Object restriction() {
+            return "foo";
+          }
+
+          @Override
+          public OutputReceiver<String> outputReceiver(DoFn<String, String> doFn) {
+            return new DoFn.OutputReceiver<String>() {
+              private final boolean shouldInvoked = false;
+
+              // This should not be invoked.
+              @Override
+              public void output(String output) {
+                assertTrue(shouldInvoked);
+              }
+
+              // This should not be invoked.
+              @Override
+              public void outputWithTimestamp(String output, Instant instant) {
+                assertTrue(shouldInvoked);
+              }

Review comment:
       ```suggestion
   
                 // This should not be invoked.
                 @Override
                 public void output(String output) {
                   fail("Never expected truncated restriction to be output: " + output);
                 }
   
                 // This should not be invoked.
                 @Override
                 public void outputWithTimestamp(String output, Instant instant) {
                   fail("Never expected truncated restriction to be output: " + output + " timestamp: " + instant);
                 }
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1636,6 +1657,98 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
     }
   }
 
+  /** This context outputs KV<KV<Element, <Restriction, WatemarkEstimatorState>>, size> */

Review comment:
       ```suggestion
     /** This context outputs KV<KV<Element, KV<Restriction, WatemarkEstimatorState>>, Size> */
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1636,6 +1657,98 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
     }
   }
 
+  /** This context outputs KV<KV<Element, <Restriction, WatemarkEstimatorState>>, size> */
+  private class SizedRestrictionWindowObservingProcessBundleContext
+      extends WindowObservingProcessBundleContext {
+    private final String errorContextPrefix;
+
+    SizedRestrictionWindowObservingProcessBundleContext(String errorContextPrefix) {
+      this.errorContextPrefix = errorContextPrefix;
+    }
+
+    @Override
+    // OutputT == RestrictionT
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      double size =
+          doFnInvoker.invokeGetSize(
+              new DelegatingArgumentProvider<InputT, OutputT>(
+                  this, this.errorContextPrefix + "/GetSize") {
+                @Override
+                public Object restriction() {
+                  return output;
+                }
+
+                @Override
+                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                  return timestamp;
+                }
+
+                @Override
+                public RestrictionTracker<?, ?> restrictionTracker() {
+                  return doFnInvoker.invokeNewTracker(this);
+                }
+              });
+
+      outputTo(
+          mainOutputConsumers,
+          (WindowedValue<OutputT>)
+              WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)),
+                      size),
+                  timestamp,
+                  currentWindow,
+                  currentElement.getPane()));
+    }
+  }
+
+  /** This context outputs KV<KV<Element, <Restriction, WatemarkEstimatorState>>, size> */

Review comment:
       ```suggestion
     /** This context outputs KV<KV<Element, KV<Restriction, WatemarkEstimatorState>>, Size> */
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org