You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/13 17:16:51 UTC

[beam] branch master updated: [BEAM-2939] Implement interfaces and concrete watermark estimators

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new feea2ba  [BEAM-2939] Implement interfaces and concrete watermark estimators
     new feefaca  Merge pull request #10992 from lukecwik/splittabledofn2
feea2ba is described below

commit feea2ba610aa35344d30bae23a76d18e7b6afe93
Author: Luke Cwik <lc...@google.com>
AuthorDate: Thu Feb 27 13:00:30 2020 -0800

    [BEAM-2939] Implement interfaces and concrete watermark estimators
    
    Plumb watermark estimator, watermark estimator state, and watermark estimator state coder methods through ByteBuddy DoFnInvoker adding method definitions to DoFnSignature and appropriate validation.
    
    This does not update any of the existing Splittable Dofns to use these watermark estimators or any existing Splittable DoFn "runners" (beyond adding throws unsupported operations exception). This is marked as work under BEAM-9430.
---
 .../construction/SplittableParDoNaiveBounded.java  |  12 +
 ...TimeBoundedSplittableProcessElementInvoker.java |   7 +-
 .../apache/beam/runners/core/SimpleDoFnRunner.java |  23 ++
 .../java/org/apache/beam/sdk/transforms/DoFn.java  | 143 +++++++-
 .../org/apache/beam/sdk/transforms/DoFnTester.java |  54 +--
 .../reflect/ByteBuddyDoFnInvokerFactory.java       | 114 +++++-
 .../beam/sdk/transforms/reflect/DoFnInvoker.java   |  50 +++
 .../beam/sdk/transforms/reflect/DoFnSignature.java | 162 ++++++++-
 .../sdk/transforms/reflect/DoFnSignatures.java     | 394 ++++++++++++++++++---
 .../HasDefaultWatermarkEstimator.java              |  34 ++
 .../splittabledofn/ManualWatermarkEstimator.java   |  46 +++
 .../TimestampObservingWatermarkEstimator.java      |  38 ++
 .../splittabledofn/WatermarkEstimator.java         |  47 +++
 .../splittabledofn/WatermarkEstimators.java        | 153 ++++++++
 .../sdk/transforms/reflect/DoFnInvokersTest.java   | 120 ++++++-
 .../reflect/DoFnSignaturesSplittableDoFnTest.java  | 321 ++++++++++++++++-
 .../splittabledofn/WatermarkEstimatorsTest.java    | 102 ++++++
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |  12 +
 18 files changed, 1706 insertions(+), 126 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 92a443a..17d435c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.BaseArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
@@ -416,6 +417,17 @@ public class SplittableParDoNaiveBounded {
         // Ignore watermark updates
       }
 
+      @Override
+      public Object watermarkEstimatorState() {
+        throw new UnsupportedOperationException(
+            "@WatermarkEstimatorState parameters are not supported.");
+      }
+
+      @Override
+      public WatermarkEstimator<?> watermarkEstimator() {
+        throw new UnsupportedOperationException("WatermarkEstimator parameters are not supported.");
+      }
+
       // ----------- Unsupported methods --------------------
 
       @Override
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 010eb11..9c0e79a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -111,7 +111,12 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
 
     DoFn.ProcessContinuation cont =
         invoker.invokeProcessElement(
-            new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
+            new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+              @Override
+              public String getErrorContext() {
+                return OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName();
+              }
+
               @Override
               public DoFn<InputT, OutputT>.ProcessContext processContext(
                   DoFn<InputT, OutputT> doFn) {
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index a37644a..aec8365 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
@@ -541,6 +542,17 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
+    public Object watermarkEstimatorState() {
+      throw new UnsupportedOperationException(
+          "@WatermarkEstimatorState parameters are not supported.");
+    }
+
+    @Override
+    public WatermarkEstimator<?> watermarkEstimator() {
+      throw new UnsupportedOperationException("WatermarkEstimator parameters are not supported.");
+    }
+
+    @Override
     public State state(String stateId, boolean alwaysFetched) {
       try {
         StateSpec<?> spec =
@@ -745,6 +757,17 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
+    public Object watermarkEstimatorState() {
+      throw new UnsupportedOperationException(
+          "@WatermarkEstimatorState parameters are not supported.");
+    }
+
+    @Override
+    public WatermarkEstimator<?> watermarkEstimator() {
+      throw new UnsupportedOperationException("WatermarkEstimator parameters are not supported.");
+    }
+
+    @Override
     public State state(String stateId, boolean alwaysFetched) {
       try {
         StateSpec<?> spec =
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index d00950d..1790ecc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -39,8 +39,12 @@ import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
+import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -670,6 +674,14 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    *       GetInitialRestriction}. This method is optional only if the restriction type returned by
    *       {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
    *   <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
+   *   <li>It <i>may</i> define a {@link GetInitialWatermarkEstimatorState} method. If none is
+   *       defined then the watermark estimator state is of type {@link Void}.
+   *   <li>It <i>may</i> define a {@link GetWatermarkEstimatorStateCoder} method.
+   *   <li>It <i>may</i> define a {@link NewWatermarkEstimator} method returning a subtype of {@code
+   *       WatermarkEstimator<W>} where {@code W} is the watermark estimator state type returned by
+   *       {@link GetInitialWatermarkEstimatorState}. This method is optional only if {@link
+   *       GetInitialWatermarkEstimatorState} has not been defined or {@code W} implements {@link
+   *       HasDefaultWatermarkEstimator}.
    *   <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or {@link
    *       UnboundedPerElement}, but not both at the same time. If it's not annotated with either of
    *       these, it's assumed to be {@link BoundedPerElement} if its {@link ProcessElement} method
@@ -692,6 +704,12 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    *   <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
    *       passed the timestamp of the current element being processed; the argument must be of type
    *       {@link Instant}.
+   *   <li>If one of its arguments is of the type {@link WatermarkEstimator}, then it will be passed
+   *       the watermark estimator.
+   *   <li>If one of its arguments is of the type {@link ManualWatermarkEstimator}, then it will be
+   *       passed a watermark estimator that can be updated manually. This parameter can only be
+   *       supplied if the method annotated with {@link GetInitialWatermarkEstimatorState} returns a
+   *       sub-type of {@link ManualWatermarkEstimator}.
    *   <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the
    *       window of the current element. When applied by {@link ParDo} the subtype of {@link
    *       BoundedWindow} must match the type of windows on the input {@link PCollection}. If the
@@ -720,7 +738,8 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
 
   /**
    * Parameter annotation for the input element for {@link ProcessElement}, {@link
-   * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, and {@link NewTracker}
+   * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, {@link
+   * GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and {@link NewTracker}
    * methods.
    */
   @Documented
@@ -729,8 +748,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   public @interface Element {}
 
   /**
-   * Parameter annotation for the restriction for {@link GetSize}, {@link SplitRestriction}, and
-   * {@link NewTracker} methods. Must match the return type used on the method annotated with {@link
+   * Parameter annotation for the restriction for {@link GetSize}, {@link SplitRestriction}, {@link
+   * GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and {@link NewTracker}
+   * methods. Must match the return type used on the method annotated with {@link
    * GetInitialRestriction}.
    */
   @Documented
@@ -740,7 +760,8 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
 
   /**
    * Parameter annotation for the input element timestamp for {@link ProcessElement}, {@link
-   * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, and {@link NewTracker}
+   * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, {@link
+   * GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and {@link NewTracker}
    * methods.
    */
   @Documented
@@ -1071,6 +1092,120 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   public @interface NewTracker {}
 
   /**
+   * Annotation for the method that maps an element and restriction to initial watermark estimator
+   * state for a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+   *
+   * <p>Signature: {@code WatermarkEstimatorStateT getInitialWatermarkState(<arguments>);}
+   *
+   * <p>This method must satisfy the following constraints:
+   *
+   * <ul>
+   *   <li>The return type {@code WatermarkEstimatorStateT} defines the watermark state type used
+   *       within this splittable DoFn. All other methods that use a {@link
+   *       WatermarkEstimatorState @WatermarkEstimatorState} parameter must use the same type that
+   *       is used here. It is suggested to use as narrow of a return type definition as possible
+   *       (for example prefer to use a square type over a shape type as a square is a type of a
+   *       shape).
+   *   <li>If one of its arguments is tagged with the {@link Element} annotation, then it will be
+   *       passed the current element being processed; the argument must be of type {@code InputT}.
+   *       Note that automatic conversion of {@link Row}s and {@link FieldAccess} parameters are
+   *       currently unsupported.
+   *   <li>If one of its arguments is tagged with the {@link Restriction} annotation, then it will
+   *       be passed the current restriction being processed; the argument must be of type {@code
+   *       RestrictionT}.
+   *   <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
+   *       passed the timestamp of the current element being processed; the argument must be of type
+   *       {@link Instant}.
+   *   <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the
+   *       window of the current element. When applied by {@link ParDo} the subtype of {@link
+   *       BoundedWindow} must match the type of windows on the input {@link PCollection}. If the
+   *       window is not accessed a runner may perform additional optimizations.
+   *   <li>If one of its arguments is of type {@link PaneInfo}, then it will be passed information
+   *       about the current triggering pane.
+   *   <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
+   *       options for the current pipeline.
+   * </ul>
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface GetInitialWatermarkEstimatorState {}
+
+  /**
+   * Annotation for the method that returns the coder to use for the watermark estimator state of a
+   * <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+   *
+   * <p>If not defined, a coder will be inferred using standard coder inference rules and the
+   * pipeline's {@link Pipeline#getCoderRegistry coder registry}.
+   *
+   * <p>This method will be called only at pipeline construction time.
+   *
+   * <p>Signature: {@code Coder<WatermarkEstimatorStateT> getWatermarkEstimatorStateCoder();}
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface GetWatermarkEstimatorStateCoder {}
+
+  /**
+   * Annotation for the method that creates a new {@link WatermarkEstimator} for the watermark state
+   * of a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+   *
+   * <p>Signature: {@code MyWatermarkEstimator newWatermarkEstimator(<optional arguments>);}
+   *
+   * <p>If the return type is a subtype of {@link TimestampObservingWatermarkEstimator} then the
+   * timestamp of each element output from this DoFn is provided to the watermark estimator.
+   *
+   * <p>This method must satisfy the following constraints:
+   *
+   * <ul>
+   *   <li>The return type must be a subtype of {@code
+   *       WatermarkEstimator<WatermarkEstimatorStateT>}. It is suggested to use as narrow of a
+   *       return type definition as possible (for example prefer to use a square type over a shape
+   *       type as a square is a type of a shape).
+   *   <li>If one of its arguments is tagged with the {@link WatermarkEstimatorState} annotation,
+   *       then it will be passed the current watermark estimator state; the argument must be of
+   *       type {@code WatermarkEstimatorStateT}.
+   *   <li>If one of its arguments is tagged with the {@link Element} annotation, then it will be
+   *       passed the current element being processed; the argument must be of type {@code InputT}.
+   *       Note that automatic conversion of {@link Row}s and {@link FieldAccess} parameters are
+   *       currently unsupported.
+   *   <li>If one of its arguments is tagged with the {@link Restriction} annotation, then it will
+   *       be passed the current restriction being processed; the argument must be of type {@code
+   *       RestrictionT}.
+   *   <li>If one of its arguments is tagged with the {@link Timestamp} annotation, then it will be
+   *       passed the timestamp of the current element being processed; the argument must be of type
+   *       {@link Instant}.
+   *   <li>If one of its arguments is a subtype of {@link BoundedWindow}, then it will be passed the
+   *       window of the current element. When applied by {@link ParDo} the subtype of {@link
+   *       BoundedWindow} must match the type of windows on the input {@link PCollection}. If the
+   *       window is not accessed a runner may perform additional optimizations.
+   *   <li>If one of its arguments is of type {@link PaneInfo}, then it will be passed information
+   *       about the current triggering pane.
+   *   <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
+   *       options for the current pipeline.
+   * </ul>
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface NewWatermarkEstimator {}
+
+  /**
+   * Parameter annotation for the watermark estimator state for the {@link NewWatermarkEstimator}
+   * method. Must match the return type on the method annotated with {@link
+   * GetInitialWatermarkEstimatorState}.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.PARAMETER)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface WatermarkEstimatorState {}
+
+  /**
    * Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}
    * specifying that the {@link DoFn} performs a bounded amount of work per input element, so
    * applying it to a bounded {@link PCollection} will produce also a bounded {@link PCollection}.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 112046a..6c445ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -33,13 +33,9 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.state.Timer;
-import org.apache.beam.sdk.state.TimerMap;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
 import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.StartBundleContext;
 import org.apache.beam.sdk.transforms.Materializations.MultimapView;
@@ -55,7 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -217,7 +212,13 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
           createProcessContext(
               ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING));
       fnInvoker.invokeProcessElement(
-          new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
+          new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+
+            @Override
+            public String getErrorContext() {
+              return "DoFnTester";
+            }
+
             @Override
             public BoundedWindow window() {
               return window;
@@ -258,16 +259,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
             }
 
             @Override
-            public InputT sideInput(String sideInputTag) {
-              throw new UnsupportedOperationException("SideInputs are not supported by DoFnTester");
-            }
-
-            @Override
-            public InputT schemaElement(int index) {
-              throw new UnsupportedOperationException("Schemas are not supported by DoFnTester");
-            }
-
-            @Override
             public Instant timestamp(DoFn<InputT, OutputT> doFn) {
               return processContext.timestamp();
             }
@@ -290,21 +281,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
             }
 
             @Override
-            public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
-              throw new UnsupportedOperationException("Schemas are not supported by DoFnTester");
-            }
-
-            @Override
             public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
               return DoFnOutputReceivers.windowedMultiReceiver(processContext, null);
             }
 
             @Override
-            public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-              throw new UnsupportedOperationException("DoFnTester doesn't support timers yet.");
-            }
-
-            @Override
             public Object restriction() {
               throw new UnsupportedOperationException(
                   "Not expected to access Restriction from a regular DoFn in DoFnTester");
@@ -315,27 +296,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
               throw new UnsupportedOperationException(
                   "Not expected to access RestrictionTracker from a regular DoFn in DoFnTester");
             }
-
-            @Override
-            public org.apache.beam.sdk.state.State state(String stateId, boolean alwaysFetched) {
-              throw new UnsupportedOperationException("DoFnTester doesn't support state yet");
-            }
-
-            @Override
-            public Timer timer(String timerId) {
-              throw new UnsupportedOperationException("DoFnTester doesn't support timers yet");
-            }
-
-            @Override
-            public TimerMap timerFamily(String tagId) {
-              throw new UnsupportedOperationException("DoFnTester doesn't support timerFamily yet");
-            }
-
-            @Override
-            public BundleFinalizer bundleFinalizer() {
-              throw new UnsupportedOperationException(
-                  "DoFnTester doesn't support bundleFinalizer yet");
-            }
           });
     } catch (UserCodeException e) {
       unwrapUserCodeException(e);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index f8aa338..014e6dd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -55,13 +55,19 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimeDomain
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerFamilyParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
 import org.apache.beam.sdk.transforms.splittabledofn.Sizes.HasSize;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.ByteBuddy;
 import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.field.FieldDescription;
 import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.method.MethodDescription;
@@ -98,6 +104,7 @@ import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.jar.asm.Type;
 import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.matcher.ElementMatchers;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Primitives;
+import org.joda.time.Instant;
 
 /** Dynamically generates a {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
 class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
@@ -119,6 +126,8 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
   public static final String PIPELINE_OPTIONS_PARAMETER_METHOD = "pipelineOptions";
   public static final String RESTRICTION_PARAMETER_METHOD = "restriction";
   public static final String RESTRICTION_TRACKER_PARAMETER_METHOD = "restrictionTracker";
+  public static final String WATERMARK_ESTIMATOR_PARAMETER_METHOD = "watermarkEstimator";
+  public static final String WATERMARK_ESTIMATOR_STATE_PARAMETER_METHOD = "watermarkEstimatorState";
   public static final String STATE_PARAMETER_METHOD = "state";
   public static final String TIMER_PARAMETER_METHOD = "timer";
   public static final String SIDE_INPUT_PARAMETER_METHOD = "sideInput";
@@ -313,6 +322,51 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
     }
   }
 
+  /**
+   * Default implementation of {@link DoFn.GetWatermarkEstimatorStateCoder}, for delegation by
+   * bytebuddy.
+   */
+  public static class DefaultWatermarkEstimatorStateCoder {
+    private final TypeDescriptor<?> watermarkEstimatorStateType;
+
+    DefaultWatermarkEstimatorStateCoder(TypeDescriptor<?> watermarkEstimatorStateType) {
+      this.watermarkEstimatorStateType = watermarkEstimatorStateType;
+    }
+
+    @SuppressWarnings({"unused", "unchecked"})
+    public <WatermarkEstimatorStateT>
+        Coder<WatermarkEstimatorStateT> invokeGetWatermarkEstimatorStateCoder(
+            CoderRegistry registry) throws CannotProvideCoderException {
+      return (Coder) registry.getCoder(watermarkEstimatorStateType);
+    }
+  }
+
+  /** Default implementation of {@link DoFn.NewWatermarkEstimator}, for delegation by bytebuddy. */
+  public static class DefaultNewWatermarkEstimator {
+
+    /** Returns a watermark estimator that always reports the minimum watermark. */
+    @SuppressWarnings("unused")
+    public static <InputT, OutputT, WatermarkEstimatorStateT>
+        WatermarkEstimator<WatermarkEstimatorStateT> invokeNewTracker(
+            DoFnInvoker.ArgumentProvider<InputT, OutputT> argumentProvider) {
+      if (argumentProvider.watermarkEstimatorState() instanceof HasDefaultWatermarkEstimator) {
+        return ((HasDefaultWatermarkEstimator) argumentProvider.watermarkEstimatorState())
+            .newWatermarkEstimator();
+      }
+      return new WatermarkEstimator<WatermarkEstimatorStateT>() {
+        @Override
+        public Instant currentWatermark() {
+          return GlobalWindow.TIMESTAMP_MIN_VALUE;
+        }
+
+        @Override
+        public WatermarkEstimatorStateT getState() {
+          return null;
+        }
+      };
+    }
+  }
+
   /** Default implementation of {@link DoFn.NewTracker}, for delegation by bytebuddy. */
   public static class DefaultNewTracker {
     /** Uses {@link HasDefaultTracker} to produce the tracker. */
@@ -392,7 +446,17 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
             .method(ElementMatchers.named("invokeNewTracker"))
             .intercept(newTrackerDelegation(clazzDescription, signature.newTracker()))
             .method(ElementMatchers.named("invokeGetSize"))
-            .intercept(getSizeDelegation(clazzDescription, signature.getSize()));
+            .intercept(getSizeDelegation(clazzDescription, signature.getSize()))
+            .method(ElementMatchers.named("invokeGetWatermarkEstimatorStateCoder"))
+            .intercept(getWatermarkEstimatorStateCoderDelegation(clazzDescription, signature))
+            .method(ElementMatchers.named("invokeGetInitialWatermarkEstimatorState"))
+            .intercept(
+                delegateMethodWithExtraParametersOrNoop(
+                    clazzDescription, signature.getInitialWatermarkEstimatorState()))
+            .method(ElementMatchers.named("invokeNewWatermarkEstimator"))
+            .intercept(
+                newWatermarkEstimatorDelegation(
+                    clazzDescription, signature.newWatermarkEstimator()));
 
     DynamicType.Unloaded<?> unloaded = builder.make();
     @SuppressWarnings("unchecked")
@@ -421,6 +485,24 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
     }
   }
 
+  private static Implementation getWatermarkEstimatorStateCoderDelegation(
+      TypeDescription doFnType, DoFnSignature signature) {
+    if (signature.processElement().isSplittable()) {
+      if (signature.getWatermarkEstimatorStateCoder() == null) {
+        return MethodDelegation.to(
+            new DefaultWatermarkEstimatorStateCoder(
+                signature.getInitialWatermarkEstimatorState() == null
+                    ? TypeDescriptors.voids()
+                    : signature.getInitialWatermarkEstimatorState().watermarkEstimatorStateT()));
+      } else {
+        return new DowncastingParametersMethodDelegation(
+            doFnType, signature.getWatermarkEstimatorStateCoder().targetMethod());
+      }
+    } else {
+      return ExceptionMethod.throwing(UnsupportedOperationException.class);
+    }
+  }
+
   private static Implementation splitRestrictionDelegation(
       TypeDescription doFnType, DoFnSignature.SplitRestrictionMethod signature) {
     if (signature == null) {
@@ -430,6 +512,15 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
     }
   }
 
+  private static Implementation newWatermarkEstimatorDelegation(
+      TypeDescription doFnType, @Nullable DoFnSignature.NewWatermarkEstimatorMethod signature) {
+    if (signature == null) {
+      return MethodDelegation.to(DefaultNewWatermarkEstimator.class);
+    } else {
+      return new DoFnMethodWithExtraParametersDelegation(doFnType, signature);
+    }
+  }
+
   private static Implementation newTrackerDelegation(
       TypeDescription doFnType, @Nullable DoFnSignature.NewTrackerMethod signature) {
     if (signature == null) {
@@ -862,6 +953,27 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
           }
 
           @Override
+          public StackManipulation dispatch(WatermarkEstimatorParameter p) {
+            // DoFnInvoker.ArgumentProvider.watermarkEstimator() returns a WatermarkEstimator,
+            // but the methods expect a concrete subtype of it.
+            // Insert a downcast.
+            return new StackManipulation.Compound(
+                simpleExtraContextParameter(WATERMARK_ESTIMATOR_PARAMETER_METHOD),
+                TypeCasting.to(new TypeDescription.ForLoadedType(p.estimatorT().getRawType())));
+          }
+
+          @Override
+          public StackManipulation dispatch(WatermarkEstimatorStateParameter p) {
+            // DoFnInvoker.ArgumentProvider.watermarkEstimatorState() returns an Object,
+            // but the methods expect a concrete subtype of it.
+            // Insert a downcast.
+            return new StackManipulation.Compound(
+                simpleExtraContextParameter(WATERMARK_ESTIMATOR_STATE_PARAMETER_METHOD),
+                TypeCasting.to(
+                    new TypeDescription.ForLoadedType(p.estimatorStateT().getRawType())));
+          }
+
+          @Override
           public StackManipulation dispatch(StateParameter p) {
             return new StackManipulation.Compound(
                 new TextConstant(p.referent().id()),
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index d52872e..412d5f7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.Row;
@@ -104,6 +105,21 @@ public interface DoFnInvoker<InputT, OutputT> {
   <RestrictionT, PositionT> RestrictionTracker<RestrictionT, PositionT> invokeNewTracker(
       ArgumentProvider<InputT, OutputT> arguments);
 
+  /** Invoke the {@link DoFn.NewWatermarkEstimator} method on the bound {@link DoFn}. */
+  @SuppressWarnings("TypeParameterUnusedInFormals")
+  <WatermarkEstimatorStateT>
+      WatermarkEstimator<WatermarkEstimatorStateT> invokeNewWatermarkEstimator(
+          ArgumentProvider<InputT, OutputT> arguments);
+
+  /** Invoke the {@link DoFn.GetInitialWatermarkEstimatorState} method on the bound {@link DoFn}. */
+  Object invokeGetInitialWatermarkEstimatorState(ArgumentProvider<InputT, OutputT> arguments);
+
+  /**
+   * Invoke the {@link DoFn.GetWatermarkEstimatorStateCoder} method on the bound {@link DoFn}.
+   * Called only during pipeline construction time.
+   */
+  Coder<?> invokeGetWatermarkEstimatorStateCoder(CoderRegistry coderRegistry);
+
   /** Get the bound {@link DoFn}. */
   DoFn<InputT, OutputT> getFn();
 
@@ -190,6 +206,18 @@ public interface DoFnInvoker<InputT, OutputT> {
      */
     RestrictionTracker<?, ?> restrictionTracker();
 
+    /**
+     * If this is a splittable {@link DoFn}, returns the associated watermark estimator state with
+     * the current call.
+     */
+    Object watermarkEstimatorState();
+
+    /**
+     * If this is a splittable {@link DoFn}, returns the associated {@link WatermarkEstimator} with
+     * the current call.
+     */
+    WatermarkEstimator<?> watermarkEstimator();
+
     /** Returns the state cell for the given {@link StateId}. */
     State state(String stateId, boolean alwaysFetched);
 
@@ -338,6 +366,18 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
+    public Object watermarkEstimatorState() {
+      throw new UnsupportedOperationException(
+          String.format("WatermarkEstimatorState unsupported in %s", getErrorContext()));
+    }
+
+    @Override
+    public WatermarkEstimator<?> watermarkEstimator() {
+      throw new UnsupportedOperationException(
+          String.format("WatermarkEstimator unsupported in %s", getErrorContext()));
+    }
+
+    @Override
     public BundleFinalizer bundleFinalizer() {
       throw new UnsupportedOperationException(
           String.format("BundleFinalizer unsupported in %s", getErrorContext()));
@@ -449,6 +489,16 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
+    public Object watermarkEstimatorState() {
+      return delegate.watermarkEstimatorState();
+    }
+
+    @Override
+    public WatermarkEstimator<?> watermarkEstimator() {
+      return delegate.watermarkEstimator();
+    }
+
+    @Override
     public State state(String stateId, boolean alwaysFetch) {
       return delegate.state(stateId, alwaysFetch);
     }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index b675596..1df70db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParam
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -114,6 +115,18 @@ public abstract class DoFnSignature {
   @Nullable
   public abstract GetRestrictionCoderMethod getRestrictionCoder();
 
+  /** Details about this {@link DoFn}'s {@link DoFn.GetWatermarkEstimatorStateCoder} method. */
+  @Nullable
+  public abstract GetWatermarkEstimatorStateCoderMethod getWatermarkEstimatorStateCoder();
+
+  /** Details about this {@link DoFn}'s {@link DoFn.GetInitialWatermarkEstimatorState} method. */
+  @Nullable
+  public abstract GetInitialWatermarkEstimatorStateMethod getInitialWatermarkEstimatorState();
+
+  /** Details about this {@link DoFn}'s {@link DoFn.NewWatermarkEstimator} method. */
+  @Nullable
+  public abstract NewWatermarkEstimatorMethod newWatermarkEstimator();
+
   /** Details about this {@link DoFn}'s {@link DoFn.NewTracker} method. */
   @Nullable
   public abstract NewTrackerMethod newTracker();
@@ -178,6 +191,14 @@ public abstract class DoFnSignature {
 
     abstract Builder setGetSize(GetSizeMethod getSize);
 
+    abstract Builder setGetInitialWatermarkEstimatorState(
+        GetInitialWatermarkEstimatorStateMethod getInitialWatermarkEstimatorState);
+
+    abstract Builder setNewWatermarkEstimator(NewWatermarkEstimatorMethod newWatermarkEstimator);
+
+    abstract Builder setGetWatermarkEstimatorStateCoder(
+        GetWatermarkEstimatorStateCoderMethod getWatermarkEstimatorStateCoder);
+
     abstract Builder setStateDeclarations(Map<String, StateDeclaration> stateDeclarations);
 
     abstract Builder setTimerDeclarations(Map<String, TimerDeclaration> timerDeclarations);
@@ -247,6 +268,10 @@ public abstract class DoFnSignature {
         return cases.dispatch((RestrictionParameter) this);
       } else if (this instanceof RestrictionTrackerParameter) {
         return cases.dispatch((RestrictionTrackerParameter) this);
+      } else if (this instanceof WatermarkEstimatorParameter) {
+        return cases.dispatch((WatermarkEstimatorParameter) this);
+      } else if (this instanceof WatermarkEstimatorStateParameter) {
+        return cases.dispatch((WatermarkEstimatorStateParameter) this);
       } else if (this instanceof StateParameter) {
         return cases.dispatch((StateParameter) this);
       } else if (this instanceof TimerParameter) {
@@ -311,6 +336,10 @@ public abstract class DoFnSignature {
 
       ResultT dispatch(RestrictionTrackerParameter p);
 
+      ResultT dispatch(WatermarkEstimatorParameter p);
+
+      ResultT dispatch(WatermarkEstimatorStateParameter p);
+
       ResultT dispatch(StateParameter p);
 
       ResultT dispatch(TimerParameter p);
@@ -406,6 +435,16 @@ public abstract class DoFnSignature {
         }
 
         @Override
+        public ResultT dispatch(WatermarkEstimatorParameter p) {
+          return dispatchDefault(p);
+        }
+
+        @Override
+        public ResultT dispatch(WatermarkEstimatorStateParameter p) {
+          return dispatchDefault(p);
+        }
+
+        @Override
         public ResultT dispatch(BundleFinalizerParameter p) {
           return dispatchDefault(p);
         }
@@ -551,6 +590,19 @@ public abstract class DoFnSignature {
       return new AutoValue_DoFnSignature_Parameter_RestrictionTrackerParameter(trackerT);
     }
 
+    /** Returns a {@link WatermarkEstimatorParameter}. */
+    public static WatermarkEstimatorParameter watermarkEstimator(
+        TypeDescriptor<?> watermarkEstimatorT) {
+      return new AutoValue_DoFnSignature_Parameter_WatermarkEstimatorParameter(watermarkEstimatorT);
+    }
+
+    /** Returns a {@link WatermarkEstimatorStateParameter}. */
+    public static WatermarkEstimatorStateParameter watermarkEstimatorState(
+        TypeDescriptor<?> watermarkEstimatorStateT) {
+      return new AutoValue_DoFnSignature_Parameter_WatermarkEstimatorStateParameter(
+          watermarkEstimatorStateT);
+    }
+
     /** Returns a {@link StateParameter} referring to the given {@link StateDeclaration}. */
     public static StateParameter stateParameter(StateDeclaration decl, boolean alwaysFetched) {
       return new AutoValue_DoFnSignature_Parameter_StateParameter(decl, alwaysFetched);
@@ -768,6 +820,32 @@ public abstract class DoFnSignature {
     }
 
     /**
+     * Descriptor for a {@link Parameter} of type {@link DoFn.WatermarkEstimatorState}.
+     *
+     * <p>All such descriptors are equal.
+     */
+    @AutoValue
+    public abstract static class WatermarkEstimatorStateParameter extends Parameter {
+      // Package visible for AutoValue
+      WatermarkEstimatorStateParameter() {}
+
+      public abstract TypeDescriptor<?> estimatorStateT();
+    }
+
+    /**
+     * Descriptor for a {@link Parameter} of type {@link DoFn.WatermarkEstimatorState}.
+     *
+     * <p>All such descriptors are equal.
+     */
+    @AutoValue
+    public abstract static class WatermarkEstimatorParameter extends Parameter {
+      // Package visible for AutoValue
+      WatermarkEstimatorParameter() {}
+
+      public abstract TypeDescriptor<?> estimatorT();
+    }
+
+    /**
      * Descriptor for a {@link Parameter} of a subclass of {@link RestrictionTracker}.
      *
      * <p>All such descriptors are equal.
@@ -845,6 +923,10 @@ public abstract class DoFnSignature {
     @Nullable
     public abstract TypeDescriptor<?> trackerT();
 
+    /** Concrete type of the {@link WatermarkEstimator} parameter, if present. */
+    @Nullable
+    public abstract TypeDescriptor<?> watermarkEstimatorT();
+
     /** The window type used by this method, if any. */
     @Nullable
     @Override
@@ -859,6 +941,7 @@ public abstract class DoFnSignature {
         boolean requiresStableInput,
         boolean requiresTimeSortedInput,
         TypeDescriptor<?> trackerT,
+        TypeDescriptor<?> watermarkEstimatorT,
         @Nullable TypeDescriptor<? extends BoundedWindow> windowT,
         boolean hasReturnValue) {
       return new AutoValue_DoFnSignature_ProcessElementMethod(
@@ -867,6 +950,7 @@ public abstract class DoFnSignature {
           requiresStableInput,
           requiresTimeSortedInput,
           trackerT,
+          watermarkEstimatorT,
           windowT,
           hasReturnValue);
     }
@@ -1228,7 +1312,7 @@ public abstract class DoFnSignature {
     }
   }
 
-  /** Describes a {@link DoFn.NewTracker} method. */
+  /** Describes a {@link DoFn.GetSize} method. */
   @AutoValue
   public abstract static class GetSizeMethod implements MethodWithExtraParameters {
     /** The annotated method itself. */
@@ -1266,4 +1350,80 @@ public abstract class DoFnSignature {
       return new AutoValue_DoFnSignature_GetRestrictionCoderMethod(targetMethod, coderT);
     }
   }
+
+  /** Describes a {@link DoFn.GetInitialWatermarkEstimatorState} method. */
+  @AutoValue
+  public abstract static class GetInitialWatermarkEstimatorStateMethod
+      implements MethodWithExtraParameters {
+    /** The annotated method itself. */
+    @Override
+    public abstract Method targetMethod();
+
+    /** Type of the returned watermark estimator state. */
+    public abstract TypeDescriptor<?> watermarkEstimatorStateT();
+
+    /** The window type used by this method, if any. */
+    @Nullable
+    @Override
+    public abstract TypeDescriptor<? extends BoundedWindow> windowT();
+
+    /** Types of optional parameters of the annotated method, in the order they appear. */
+    @Override
+    public abstract List<Parameter> extraParameters();
+
+    static GetInitialWatermarkEstimatorStateMethod create(
+        Method targetMethod,
+        TypeDescriptor<?> watermarkEstimatorStateT,
+        TypeDescriptor<? extends BoundedWindow> windowT,
+        List<Parameter> extraParameters) {
+      return new AutoValue_DoFnSignature_GetInitialWatermarkEstimatorStateMethod(
+          targetMethod, watermarkEstimatorStateT, windowT, extraParameters);
+    }
+  }
+
+  /** Describes a {@link DoFn.NewWatermarkEstimator} method. */
+  @AutoValue
+  public abstract static class NewWatermarkEstimatorMethod implements MethodWithExtraParameters {
+    /** The annotated method itself. */
+    @Override
+    public abstract Method targetMethod();
+
+    /** Type of the returned {@link WatermarkEstimator}. */
+    public abstract TypeDescriptor<?> watermarkEstimatorT();
+
+    /** The window type used by this method, if any. */
+    @Nullable
+    @Override
+    public abstract TypeDescriptor<? extends BoundedWindow> windowT();
+
+    /** Types of optional parameters of the annotated method, in the order they appear. */
+    @Override
+    public abstract List<Parameter> extraParameters();
+
+    static NewWatermarkEstimatorMethod create(
+        Method targetMethod,
+        TypeDescriptor<?> watermarkEstimatorT,
+        TypeDescriptor<? extends BoundedWindow> windowT,
+        List<Parameter> extraParameters) {
+      return new AutoValue_DoFnSignature_NewWatermarkEstimatorMethod(
+          targetMethod, watermarkEstimatorT, windowT, extraParameters);
+    }
+  }
+
+  /** Describes a {@link DoFn.GetRestrictionCoder} method. */
+  @AutoValue
+  public abstract static class GetWatermarkEstimatorStateCoderMethod implements DoFnMethod {
+    /** The annotated method itself. */
+    @Override
+    public abstract Method targetMethod();
+
+    /** Type of the returned {@link Coder}. */
+    public abstract TypeDescriptor<?> coderT();
+
+    static GetWatermarkEstimatorStateCoderMethod create(
+        Method targetMethod, TypeDescriptor<?> coderT) {
+      return new AutoValue_DoFnSignature_GetWatermarkEstimatorStateCoderMethod(
+          targetMethod, coderT);
+    }
+  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index fb76330..cea704b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -37,6 +37,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
@@ -62,25 +63,33 @@ import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.FieldAccessDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.GetInitialRestrictionMethod;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.GetInitialWatermarkEstimatorStateMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PipelineOptionsParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SchemaElementParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerFamilyParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerFamilyDeclaration;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.TypeParameter;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
@@ -127,6 +136,7 @@ public class DoFnSignatures {
               Parameter.TaggedOutputReceiverParameter.class,
               Parameter.ProcessContextParameter.class,
               Parameter.RestrictionTrackerParameter.class,
+              Parameter.WatermarkEstimatorParameter.class,
               Parameter.SideInputParameter.class,
               Parameter.BundleFinalizerParameter.class);
 
@@ -213,12 +223,32 @@ public class DoFnSignatures {
       ImmutableList.of(
           Parameter.ElementParameter.class,
           Parameter.RestrictionParameter.class,
-          Parameter.RestrictionTrackerParameter.class,
           Parameter.WindowParameter.class,
           Parameter.TimestampParameter.class,
           Parameter.PaneInfoParameter.class,
           Parameter.PipelineOptionsParameter.class);
 
+  private static final Collection<Class<? extends Parameter>>
+      ALLOWED_GET_INITIAL_WATERMARK_ESTIMATOR_STATE_PARAMETERS =
+          ImmutableList.of(
+              Parameter.ElementParameter.class,
+              Parameter.RestrictionParameter.class,
+              Parameter.WindowParameter.class,
+              Parameter.TimestampParameter.class,
+              Parameter.PaneInfoParameter.class,
+              Parameter.PipelineOptionsParameter.class);
+
+  private static final Collection<Class<? extends Parameter>>
+      ALLOWED_NEW_WATERMARK_ESTIMATOR_PARAMETERS =
+          ImmutableList.of(
+              Parameter.WatermarkEstimatorStateParameter.class,
+              Parameter.ElementParameter.class,
+              Parameter.RestrictionParameter.class,
+              Parameter.WindowParameter.class,
+              Parameter.TimestampParameter.class,
+              Parameter.PaneInfoParameter.class,
+              Parameter.PipelineOptionsParameter.class);
+
   /** @return the {@link DoFnSignature} for the given {@link DoFn} instance. */
   public static <FnT extends DoFn<?, ?>> DoFnSignature signatureForDoFn(FnT fn) {
     return getSignature(fn.getClass());
@@ -332,21 +362,34 @@ public class DoFnSignatures {
 
     private MethodAnalysisContext() {}
 
-    /** Indicates whether a {@link RestrictionTrackerParameter} is known in this context. */
-    public boolean hasRestrictionTrackerParameter() {
-      return extraParameters.stream()
-          .anyMatch(Predicates.instanceOf(RestrictionTrackerParameter.class)::apply);
+    /** Indicates whether the specified {@link Parameter} is known in this context. */
+    public boolean hasParameter(Class<? extends Parameter> type) {
+      return extraParameters.stream().anyMatch(Predicates.instanceOf(type)::apply);
     }
 
-    /** Indicates whether a {@link WindowParameter} is known in this context. */
-    public boolean hasWindowParameter() {
-      return extraParameters.stream().anyMatch(Predicates.instanceOf(WindowParameter.class)::apply);
+    /**
+     * Returns the specified {@link Parameter} if it is known in this context. Throws {@link
+     * IllegalStateException} if there is more than one instance of the parameter.
+     */
+    @Nullable
+    public <T extends Parameter> Optional<T> findParameter(Class<T> type) {
+      List<T> parameters = findParameters(type);
+      switch (parameters.size()) {
+        case 0:
+          return Optional.empty();
+        case 1:
+          return Optional.of(parameters.get(0));
+        default:
+          throw new IllegalStateException(
+              String.format(
+                  "Expected to have found at most one parameter of type %s but found %s.",
+                  type, parameters));
+      }
     }
 
-    /** Indicates whether a {@link Parameter.PipelineOptionsParameter} is known in this context. */
-    public boolean hasPipelineOptionsParamter() {
-      return extraParameters.stream()
-          .anyMatch(Predicates.instanceOf(Parameter.PipelineOptionsParameter.class)::apply);
+    public <T extends Parameter> List<T> findParameters(Class<T> type) {
+      return (List<T>)
+          extraParameters.stream().filter(Predicates.instanceOf(type)).collect(Collectors.toList());
     }
 
     /** The window type, if any, used by this method. */
@@ -477,6 +520,12 @@ public class DoFnSignatures {
         findAnnotatedMethod(errors, DoFn.GetRestrictionCoder.class, fnClass, false);
     Method newTrackerMethod = findAnnotatedMethod(errors, DoFn.NewTracker.class, fnClass, false);
     Method getSizeMethod = findAnnotatedMethod(errors, DoFn.GetSize.class, fnClass, false);
+    Method getWatermarkEstimatorStateCoderMethod =
+        findAnnotatedMethod(errors, DoFn.GetWatermarkEstimatorStateCoder.class, fnClass, false);
+    Method getInitialWatermarkEstimatorStateMethod =
+        findAnnotatedMethod(errors, DoFn.GetInitialWatermarkEstimatorState.class, fnClass, false);
+    Method newWatermarkEstimatorMethod =
+        findAnnotatedMethod(errors, DoFn.NewWatermarkEstimator.class, fnClass, false);
 
     Collection<Method> onTimerMethods =
         declaredMethodsWithAnnotation(DoFn.OnTimer.class, fnClass, DoFn.class);
@@ -601,7 +650,7 @@ public class DoFnSignatures {
           "Splittable, but does not define the required @%s method.",
           DoFnSignatures.format(DoFn.GetInitialRestriction.class));
 
-      GetInitialRestrictionMethod method =
+      GetInitialRestrictionMethod initialRestrictionMethod =
           analyzeGetInitialRestrictionMethod(
               errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestrictionMethod),
               fnT,
@@ -610,8 +659,24 @@ public class DoFnSignatures {
               outputT,
               fnContext);
 
-      signatureBuilder.setGetInitialRestriction(method);
-      TypeDescriptor<?> restrictionT = method.restrictionT();
+      signatureBuilder.setGetInitialRestriction(initialRestrictionMethod);
+      TypeDescriptor<?> restrictionT = initialRestrictionMethod.restrictionT();
+
+      TypeDescriptor<?> watermarkEstimatorStateT = TypeDescriptors.voids();
+      if (getInitialWatermarkEstimatorStateMethod != null) {
+        GetInitialWatermarkEstimatorStateMethod initialWatermarkEstimatorStateMethod =
+            analyzeGetInitialWatermarkEstimatorStateMethod(
+                errors.forMethod(
+                    DoFn.GetInitialWatermarkEstimatorState.class,
+                    getInitialWatermarkEstimatorStateMethod),
+                fnT,
+                getInitialWatermarkEstimatorStateMethod,
+                inputT,
+                outputT,
+                fnContext);
+        watermarkEstimatorStateT = initialWatermarkEstimatorStateMethod.watermarkEstimatorStateT();
+        signatureBuilder.setGetInitialWatermarkEstimatorState(initialWatermarkEstimatorStateMethod);
+      }
 
       if (newTrackerMethod != null) {
         signatureBuilder.setNewTracker(
@@ -665,6 +730,39 @@ public class DoFnSignatures {
                 fnT,
                 getRestrictionCoderMethod));
       }
+
+      if (getWatermarkEstimatorStateCoderMethod != null) {
+        signatureBuilder.setGetWatermarkEstimatorStateCoder(
+            analyzeGetWatermarkEstimatorStateCoderMethod(
+                errors.forMethod(
+                    DoFn.GetWatermarkEstimatorStateCoder.class,
+                    getWatermarkEstimatorStateCoderMethod),
+                fnT,
+                getWatermarkEstimatorStateCoderMethod));
+      }
+
+      if (newWatermarkEstimatorMethod != null) {
+        signatureBuilder.setNewWatermarkEstimator(
+            analyzeNewWatermarkEstimatorMethod(
+                errors.forMethod(DoFn.NewWatermarkEstimator.class, newWatermarkEstimatorMethod),
+                fnT,
+                newWatermarkEstimatorMethod,
+                inputT,
+                outputT,
+                restrictionT,
+                watermarkEstimatorStateT,
+                fnContext));
+      } else if (getInitialWatermarkEstimatorStateMethod != null) {
+        errors
+            .forMethod(DoFn.NewWatermarkEstimator.class, null)
+            .checkArgument(
+                watermarkEstimatorStateT.isSubtypeOf(
+                    TypeDescriptor.of(HasDefaultWatermarkEstimator.class)),
+                "Splittable, either @%s method must be defined or %s must implement %s.",
+                format(DoFn.NewWatermarkEstimator.class),
+                format(watermarkEstimatorStateT),
+                format(HasDefaultWatermarkEstimator.class));
+      }
     } else {
       // Validate that none of the splittable DoFn only methods have been declared.
       List<String> forbiddenMethods = new ArrayList<>();
@@ -683,6 +781,15 @@ public class DoFnSignatures {
       if (getSizeMethod != null) {
         forbiddenMethods.add("@" + format(DoFn.GetSize.class));
       }
+      if (getInitialWatermarkEstimatorStateMethod != null) {
+        forbiddenMethods.add("@" + format(DoFn.GetInitialWatermarkEstimatorState.class));
+      }
+      if (getWatermarkEstimatorStateCoderMethod != null) {
+        forbiddenMethods.add("@" + format(DoFn.GetWatermarkEstimatorStateCoder.class));
+      }
+      if (newWatermarkEstimatorMethod != null) {
+        forbiddenMethods.add("@" + format(DoFn.NewWatermarkEstimator.class));
+      }
       errors.checkArgument(
           forbiddenMethods.isEmpty(), "Non-splittable, but defines methods: %s", forbiddenMethods);
     }
@@ -785,11 +892,19 @@ public class DoFnSignatures {
         signature.getInitialRestriction();
     DoFnSignature.NewTrackerMethod newTracker = signature.newTracker();
     DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder = signature.getRestrictionCoder();
+    DoFnSignature.GetInitialWatermarkEstimatorStateMethod getInitialWatermarkEstimatorState =
+        signature.getInitialWatermarkEstimatorState();
+    DoFnSignature.GetWatermarkEstimatorStateCoderMethod getWatermarkEstimatorStateCoder =
+        signature.getWatermarkEstimatorStateCoder();
 
     ErrorReporter processElementErrors =
         errors.forMethod(DoFn.ProcessElement.class, processElement.targetMethod());
 
     TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
+    TypeDescriptor<?> watermarkEstimatorStateT =
+        getInitialWatermarkEstimatorState == null
+            ? TypeDescriptors.voids()
+            : getInitialWatermarkEstimatorState.watermarkEstimatorStateT();
 
     if (newTracker == null) {
       ErrorReporter newTrackerErrors = errors.forMethod(DoFn.NewTracker.class, null);
@@ -806,6 +921,17 @@ public class DoFnSignatures {
         "Has tracker type %s, but the DoFn's tracker type must be of type RestrictionTracker.",
         format(processElement.trackerT()));
 
+    if (processElement.watermarkEstimatorT() != null) {
+      processElementErrors.checkArgument(
+          processElement.watermarkEstimatorT().getRawType().equals(WatermarkEstimator.class)
+              || processElement
+                  .watermarkEstimatorT()
+                  .getRawType()
+                  .equals(ManualWatermarkEstimator.class),
+          "Has watermark estimator type %s, but the DoFn's watermark estimator type must be one of [WatermarkEstimator, ManualWatermarkEstimator] types.",
+          format(processElement.watermarkEstimatorT()));
+    }
+
     if (getRestrictionCoder != null) {
       ErrorReporter getInitialRestrictionErrors =
           errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod());
@@ -819,6 +945,26 @@ public class DoFnSignatures {
           format(getRestrictionCoder.coderT()),
           format(coderTypeOf(restrictionT)));
     }
+
+    if (getWatermarkEstimatorStateCoder != null) {
+      ErrorReporter getInitialWatermarkEstimatorStateReporter =
+          errors.forMethod(
+              DoFn.GetInitialWatermarkEstimatorState.class,
+              getInitialWatermarkEstimatorState == null
+                  ? null
+                  : getInitialWatermarkEstimatorState.targetMethod());
+      getInitialWatermarkEstimatorStateReporter.checkArgument(
+          getWatermarkEstimatorStateCoder
+              .coderT()
+              .isSubtypeOf(coderTypeOf(watermarkEstimatorStateT)),
+          "Uses watermark estimator state type %s, but @%s method %s returns %s "
+              + "which is not a subtype of %s",
+          format(watermarkEstimatorStateT),
+          format(DoFn.GetInitialWatermarkEstimatorState.class),
+          format(getWatermarkEstimatorStateCoder.targetMethod()),
+          format(getWatermarkEstimatorStateCoder.coderT()),
+          format(coderTypeOf(watermarkEstimatorStateT)));
+    }
   }
 
   /**
@@ -1022,11 +1168,9 @@ public class DoFnSignatures {
     boolean requiresStableInput = m.isAnnotationPresent(DoFn.RequiresStableInput.class);
     boolean requiresTimeSortedInput = m.isAnnotationPresent(DoFn.RequiresTimeSortedInput.class);
 
-    Type[] params = m.getGenericParameterTypes();
-
-    TypeDescriptor<?> trackerT = getTrackerType(fnClass, m);
     TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnClass, m);
 
+    Type[] params = m.getGenericParameterTypes();
     for (int i = 0; i < params.length; ++i) {
       Parameter extraParam =
           analyzeExtraParameter(
@@ -1055,8 +1199,19 @@ public class DoFnSignatures {
       }
     }
 
+    TypeDescriptor<?> trackerT =
+        methodContext
+            .findParameter(RestrictionTrackerParameter.class)
+            .map(p -> p.trackerT())
+            .orElse(null);
+    TypeDescriptor<?> watermarkEstimatorT =
+        methodContext
+            .findParameter(WatermarkEstimatorParameter.class)
+            .map(p -> p.estimatorT())
+            .orElse(null);
+
     // The allowed parameters depend on whether this DoFn is splittable
-    if (methodContext.hasRestrictionTrackerParameter()) {
+    if (trackerT != null) {
       for (Parameter parameter : methodContext.getExtraParameters()) {
         checkParameterOneOf(errors, parameter, ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS);
       }
@@ -1072,6 +1227,7 @@ public class DoFnSignatures {
         requiresStableInput,
         requiresTimeSortedInput,
         trackerT,
+        watermarkEstimatorT,
         windowT,
         DoFn.ProcessContinuation.class.equals(m.getReturnType()));
   }
@@ -1113,20 +1269,22 @@ public class DoFnSignatures {
     String fieldAccessString = getFieldAccessId(param.getAnnotations());
     if (fieldAccessString != null) {
       return Parameter.schemaElementParameter(paramT, fieldAccessString, param.getIndex());
-    } else if (hasElementAnnotation(param.getAnnotations())) {
+    } else if (hasAnnotation(DoFn.Element.class, param.getAnnotations())) {
       return (paramT.equals(inputT))
           ? Parameter.elementParameter(paramT)
           : Parameter.schemaElementParameter(paramT, null, param.getIndex());
-    } else if (hasRestrictionAnnotation(param.getAnnotations())) {
+    } else if (hasAnnotation(DoFn.Restriction.class, param.getAnnotations())) {
       return Parameter.restrictionParameter(paramT);
-    } else if (hasTimestampAnnotation(param.getAnnotations())) {
+    } else if (hasAnnotation(DoFn.WatermarkEstimatorState.class, param.getAnnotations())) {
+      return Parameter.watermarkEstimatorState(paramT);
+    } else if (hasAnnotation(DoFn.Timestamp.class, param.getAnnotations())) {
       methodErrors.checkArgument(
           rawType.equals(Instant.class),
           "@Timestamp argument must have type org.joda.time.Instant.");
       return Parameter.timestampParameter();
     } else if (rawType.equals(TimeDomain.class)) {
       return Parameter.timeDomainParameter();
-    } else if (hasSideInputAnnotation(param.getAnnotations())) {
+    } else if (hasAnnotation(DoFn.SideInput.class, param.getAnnotations())) {
       String sideInputId = getSideInputId(param.getAnnotations());
       paramErrors.checkArgument(
           sideInputId != null, "%s missing %s annotation", format(SideInput.class));
@@ -1161,7 +1319,7 @@ public class DoFnSignatures {
       return Parameter.onTimerContext();
     } else if (BoundedWindow.class.isAssignableFrom(rawType)) {
       methodErrors.checkArgument(
-          !methodContext.hasWindowParameter(),
+          !methodContext.hasParameter(WindowParameter.class),
           "Multiple %s parameters",
           format(BoundedWindow.class));
       return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) paramT);
@@ -1183,17 +1341,22 @@ public class DoFnSignatures {
       return Parameter.taggedOutputReceiverParameter();
     } else if (PipelineOptions.class.equals(rawType)) {
       methodErrors.checkArgument(
-          !methodContext.hasPipelineOptionsParamter(),
+          !methodContext.hasParameter(PipelineOptionsParameter.class),
           "Multiple %s parameters",
           format(PipelineOptions.class));
       return Parameter.pipelineOptions();
     } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
       methodErrors.checkArgument(
-          !methodContext.hasRestrictionTrackerParameter(),
+          !methodContext.hasParameter(RestrictionTrackerParameter.class),
           "Multiple %s parameters",
           format(RestrictionTracker.class));
       return Parameter.restrictionTracker(paramT);
-
+    } else if (WatermarkEstimator.class.isAssignableFrom(rawType)) {
+      methodErrors.checkArgument(
+          !methodContext.hasParameter(WatermarkEstimatorParameter.class),
+          "Multiple %s parameters",
+          format(WatermarkEstimator.class));
+      return Parameter.watermarkEstimator(paramT);
     } else if (rawType.equals(Timer.class)) {
       // m.getParameters() is not available until Java 8
       String id = getTimerId(param.getAnnotations());
@@ -1221,7 +1384,7 @@ public class DoFnSignatures {
 
       return Parameter.timerParameter(timerDecl);
 
-    } else if (hasTimerIdAnnotation(param.getAnnotations())) {
+    } else if (hasAnnotation(DoFn.TimerId.class, param.getAnnotations())) {
       boolean isValidTimerIdForTimerFamily =
           fnContext.getTimerFamilyDeclarations().size() > 0 && rawType.equals(String.class);
       paramErrors.checkArgument(
@@ -1348,36 +1511,8 @@ public class DoFnSignatures {
     return annotation.isPresent() ? (T) annotation.get() : null;
   }
 
-  private static boolean hasElementAnnotation(List<Annotation> annotations) {
-    return annotations.stream().anyMatch(a -> a.annotationType().equals(DoFn.Element.class));
-  }
-
-  private static boolean hasRestrictionAnnotation(List<Annotation> annotations) {
-    return annotations.stream().anyMatch(a -> a.annotationType().equals(DoFn.Restriction.class));
-  }
-
-  private static boolean hasTimestampAnnotation(List<Annotation> annotations) {
-    return annotations.stream().anyMatch(a -> a.annotationType().equals(DoFn.Timestamp.class));
-  }
-
-  private static boolean hasSideInputAnnotation(List<Annotation> annotations) {
-    return annotations.stream().anyMatch(a -> a.annotationType().equals(DoFn.SideInput.class));
-  }
-
-  private static boolean hasTimerIdAnnotation(List<Annotation> annotations) {
-    return annotations.stream().anyMatch(a -> a.annotationType().equals(DoFn.TimerId.class));
-  }
-
-  @Nullable
-  private static TypeDescriptor<?> getTrackerType(TypeDescriptor<?> fnClass, Method method) {
-    Type[] params = method.getGenericParameterTypes();
-    for (Type param : params) {
-      TypeDescriptor<?> paramT = fnClass.resolveType(param);
-      if (RestrictionTracker.class.isAssignableFrom(paramT.getRawType())) {
-        return paramT;
-      }
-    }
-    return null;
+  private static boolean hasAnnotation(Class<?> annotation, List<Annotation> annotations) {
+    return annotations.stream().anyMatch(a -> a.annotationType().equals(annotation));
   }
 
   @Nullable
@@ -1509,6 +1644,53 @@ public class DoFnSignatures {
         m, fnT.resolveType(m.getGenericReturnType()), windowT, methodContext.extraParameters);
   }
 
+  @VisibleForTesting
+  static DoFnSignature.GetInitialWatermarkEstimatorStateMethod
+      analyzeGetInitialWatermarkEstimatorStateMethod(
+          ErrorReporter errors,
+          TypeDescriptor<? extends DoFn<?, ?>> fnT,
+          Method m,
+          TypeDescriptor<?> inputT,
+          TypeDescriptor<?> outputT,
+          FnAnalysisContext fnContext) {
+    // Method is of the form:
+    // @GetInitialWatermarkEstimatorState
+    // WatermarkEstimatorStateT getInitialWatermarkEstimatorState(... parameters ...);
+
+    Type[] params = m.getGenericParameterTypes();
+    MethodAnalysisContext methodContext = MethodAnalysisContext.create();
+    TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnT, m);
+    for (int i = 0; i < params.length; ++i) {
+      Parameter extraParam =
+          analyzeExtraParameter(
+              errors,
+              fnContext,
+              methodContext,
+              fnT,
+              ParameterDescription.of(
+                  m, i, fnT.resolveType(params[i]), Arrays.asList(m.getParameterAnnotations()[i])),
+              inputT,
+              outputT);
+      if (extraParam instanceof SchemaElementParameter) {
+        errors.throwIllegalArgument(
+            "Schema @%s are not supported for @%s method. Found %s, did you mean to use %s?",
+            format(DoFn.Element.class),
+            format(DoFn.GetInitialWatermarkEstimatorState.class),
+            format(((SchemaElementParameter) extraParam).elementT()),
+            format(inputT));
+      }
+      methodContext.addParameter(extraParam);
+    }
+
+    for (Parameter parameter : methodContext.getExtraParameters()) {
+      checkParameterOneOf(
+          errors, parameter, ALLOWED_GET_INITIAL_WATERMARK_ESTIMATOR_STATE_PARAMETERS);
+    }
+
+    return DoFnSignature.GetInitialWatermarkEstimatorStateMethod.create(
+        m, fnT.resolveType(m.getGenericReturnType()), windowT, methodContext.extraParameters);
+  }
+
   /**
    * Generates a {@link TypeDescriptor} for {@code DoFn.OutputReceiver<OutputT>} given {@code
    * OutputT}.
@@ -1683,8 +1865,21 @@ public class DoFnSignatures {
     return DoFnSignature.GetRestrictionCoderMethod.create(m, resT);
   }
 
+  @VisibleForTesting
+  static DoFnSignature.GetWatermarkEstimatorStateCoderMethod
+      analyzeGetWatermarkEstimatorStateCoderMethod(
+          ErrorReporter errors, TypeDescriptor<? extends DoFn> fnT, Method m) {
+    errors.checkArgument(m.getParameterTypes().length == 0, "Must have zero arguments");
+    TypeDescriptor<?> resT = fnT.resolveType(m.getGenericReturnType());
+    errors.checkArgument(
+        resT.isSubtypeOf(TypeDescriptor.of(Coder.class)),
+        "Must return a Coder, but returns %s",
+        format(resT));
+    return DoFnSignature.GetWatermarkEstimatorStateCoderMethod.create(m, resT);
+  }
+
   /**
-   * Generates a {@link TypeDescriptor} for {@code RestrictionTracker<RestrictionT>} given {@code
+   * Generates a {@link TypeDescriptor} for {@code RestrictionTracker<RestrictionT, ?>} given {@code
    * RestrictionT}.
    */
   private static <RestrictionT>
@@ -1694,6 +1889,17 @@ public class DoFnSignatures {
         new TypeParameter<RestrictionT>() {}, restrictionT);
   }
 
+  /**
+   * Generates a {@link TypeDescriptor} for {@code WatermarkEstimator<WatermarkEstimatorStateT>}
+   * given {@code WatermarkEstimatorStateT}.
+   */
+  private static <WatermarkEstimatorStateT>
+      TypeDescriptor<WatermarkEstimator<WatermarkEstimatorStateT>> watermarkEstimatorTypeOf(
+          TypeDescriptor<WatermarkEstimatorStateT> watermarkEstimatorStateT) {
+    return new TypeDescriptor<WatermarkEstimator<WatermarkEstimatorStateT>>() {}.where(
+        new TypeParameter<WatermarkEstimatorStateT>() {}, watermarkEstimatorStateT);
+  }
+
   @VisibleForTesting
   static DoFnSignature.NewTrackerMethod analyzeNewTrackerMethod(
       ErrorReporter errors,
@@ -1755,6 +1961,76 @@ public class DoFnSignatures {
   }
 
   @VisibleForTesting
+  static DoFnSignature.NewWatermarkEstimatorMethod analyzeNewWatermarkEstimatorMethod(
+      ErrorReporter errors,
+      TypeDescriptor<? extends DoFn<?, ?>> fnT,
+      Method m,
+      TypeDescriptor<?> inputT,
+      TypeDescriptor<?> outputT,
+      TypeDescriptor<?> restrictionT,
+      TypeDescriptor<?> watermarkEstimatorStateT,
+      FnAnalysisContext fnContext) {
+    // Method is of the form:
+    // @NewWatermarkEstimator
+    // WatermarkEstimatorT newWatermarkEstimator(... parameters ...);
+    Type[] params = m.getGenericParameterTypes();
+    TypeDescriptor<?> watermarkEstimatorT = fnT.resolveType(m.getGenericReturnType());
+    TypeDescriptor<?> expectedWatermarkEstimatorT =
+        watermarkEstimatorTypeOf(watermarkEstimatorStateT);
+    errors.checkArgument(
+        watermarkEstimatorT.isSubtypeOf(expectedWatermarkEstimatorT),
+        "Returns %s, but must return a subtype of %s",
+        format(watermarkEstimatorT),
+        format(expectedWatermarkEstimatorT));
+
+    MethodAnalysisContext methodContext = MethodAnalysisContext.create();
+    TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnT, m);
+    for (int i = 0; i < params.length; ++i) {
+      Parameter extraParam =
+          analyzeExtraParameter(
+              errors,
+              fnContext,
+              methodContext,
+              fnT,
+              ParameterDescription.of(
+                  m, i, fnT.resolveType(params[i]), Arrays.asList(m.getParameterAnnotations()[i])),
+              inputT,
+              outputT);
+      if (extraParam instanceof SchemaElementParameter) {
+        errors.throwIllegalArgument(
+            "Schema @%s are not supported for @%s method. Found %s, did you mean to use %s?",
+            format(DoFn.Element.class),
+            format(DoFn.NewWatermarkEstimator.class),
+            format(((SchemaElementParameter) extraParam).elementT()),
+            format(inputT));
+      } else if (extraParam instanceof RestrictionParameter) {
+        errors.checkArgument(
+            restrictionT.equals(((RestrictionParameter) extraParam).restrictionT()),
+            "Uses restriction type %s, but @%s method uses restriction type %s",
+            format(((RestrictionParameter) extraParam).restrictionT()),
+            format(DoFn.GetInitialWatermarkEstimatorState.class),
+            format(restrictionT));
+      } else if (extraParam instanceof WatermarkEstimatorStateParameter) {
+        errors.checkArgument(
+            watermarkEstimatorStateT.equals(
+                ((WatermarkEstimatorStateParameter) extraParam).estimatorStateT()),
+            "Uses watermark estimator state type %s, but @%s method uses watermark estimator state type %s",
+            format(((WatermarkEstimatorStateParameter) extraParam).estimatorStateT()),
+            format(DoFn.GetInitialWatermarkEstimatorState.class),
+            format(watermarkEstimatorStateT));
+      }
+      methodContext.addParameter(extraParam);
+    }
+
+    for (Parameter parameter : methodContext.getExtraParameters()) {
+      checkParameterOneOf(errors, parameter, ALLOWED_NEW_WATERMARK_ESTIMATOR_PARAMETERS);
+    }
+
+    return DoFnSignature.NewWatermarkEstimatorMethod.create(
+        m, fnT.resolveType(m.getGenericReturnType()), windowT, methodContext.getExtraParameters());
+  }
+
+  @VisibleForTesting
   static DoFnSignature.GetSizeMethod analyzeGetSizeMethod(
       ErrorReporter errors,
       TypeDescriptor<? extends DoFn<?, ?>> fnT,
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultWatermarkEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultWatermarkEstimator.java
new file mode 100644
index 0000000..fac64ad
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultWatermarkEstimator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Interface for watermark estimator state for which a default implementation of {@link
+ * DoFn.NewWatermarkEstimator} is available, depending only on the watermark estimator state itself.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface HasDefaultWatermarkEstimator<
+    WatermarkEstimatorStateT,
+    WatermarkEstimatorT extends WatermarkEstimator<WatermarkEstimatorStateT>> {
+  /** Creates a new watermark estimator for {@code this}. */
+  WatermarkEstimatorT newWatermarkEstimator();
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java
new file mode 100644
index 0000000..a6e2797
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * A {@link WatermarkEstimator} which is controlled manually from within a {@link DoFn}. The {@link
+ * DoFn} must invoke {@link #setWatermark} to advance the watermark.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface ManualWatermarkEstimator<WatermarkEstimatorStateT>
+    extends WatermarkEstimator<WatermarkEstimatorStateT> {
+
+  /**
+   * Sets a timestamp before or at the timestamps of all future elements produced by the associated
+   * DoFn.
+   *
+   * <p>This can be approximate. If records are output that violate this guarantee, they will be
+   * considered late, which will affect how they will be processed. See <a
+   * href="https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data">watermarks
+   * and late data</a> for more information on late data and how to handle it.
+   *
+   * <p>However, this value should be as late as possible. Downstream windows may not be able to
+   * close until this watermark passes their end.
+   */
+  void setWatermark(Instant watermark);
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/TimestampObservingWatermarkEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/TimestampObservingWatermarkEstimator.java
new file mode 100644
index 0000000..b217690
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/TimestampObservingWatermarkEstimator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * A {@link WatermarkEstimator} that observes the timestamps of all records output from a {@link
+ * DoFn}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface TimestampObservingWatermarkEstimator<WatermarkEstimatorStateT>
+    extends WatermarkEstimator<WatermarkEstimatorStateT> {
+
+  /**
+   * Update watermark estimate with latest output timestamp. This is called with the timestamp of
+   * every element output from the DoFn.
+   */
+  void observeTimestamp(Instant timestamp);
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
new file mode 100644
index 0000000..343fea6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * A {@link WatermarkEstimator} which is used for estimating output watermarks of a splittable
+ * {@link DoFn}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface WatermarkEstimator<WatermarkEstimatorStateT> {
+  /**
+   * Return estimated output watermark. This method must return monotonically increasing watermarks
+   * across instances that are constructed from prior state.
+   */
+  Instant currentWatermark();
+
+  /**
+   * Get current state of the {@link WatermarkEstimator} instance, which can be used to recreate the
+   * {@link WatermarkEstimator} when processing the restriction. See {@link
+   * DoFn.NewWatermarkEstimator} for additional details.
+   *
+   * <p>The internal state of the estimator must not be mutated by this method.
+   *
+   * <p>The state returned must not be mutated.
+   */
+  WatermarkEstimatorStateT getState();
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
new file mode 100644
index 0000000..40992f1
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.joda.time.Instant;
+
+/**
+ * A set of {@link WatermarkEstimator}s that users can use to advance the output watermark for their
+ * associated {@link DoFn splittable DoFn}s.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public class WatermarkEstimators {
+  /** Concrete implementation of a {@link ManualWatermarkEstimator}. */
+  public static class Manual implements ManualWatermarkEstimator<Instant> {
+    private Instant watermark;
+
+    public Manual(Instant watermark) {
+      this.watermark = checkNotNull(watermark, "watermark must not be null.");
+      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Provided watermark %s must be within bounds [%s, %s].",
+                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.TIMESTAMP_MAX_VALUE));
+      }
+    }
+
+    @Override
+    public void setWatermark(Instant watermark) {
+      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Provided watermark %s must be within bounds [%s, %s].",
+                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.TIMESTAMP_MAX_VALUE));
+      }
+      if (watermark.isBefore(this.watermark)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Watermark must be monotonically increasing. Provided watermark %s is less then "
+                    + "current watermark %s.",
+                watermark, this.watermark));
+      }
+      this.watermark = watermark;
+    }
+
+    @Override
+    public Instant currentWatermark() {
+      return watermark;
+    }
+
+    @Override
+    public Instant getState() {
+      return watermark;
+    }
+  }
+
+  /** A watermark estimator that tracks wall time. */
+  public static class WallTime implements WatermarkEstimator<Instant> {
+    private Instant watermark;
+
+    public WallTime(Instant watermark) {
+      this.watermark = checkNotNull(watermark, "watermark must not be null.");
+      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Provided watermark %s must be within bounds [%s, %s].",
+                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.TIMESTAMP_MAX_VALUE));
+      }
+    }
+
+    @Override
+    public Instant currentWatermark() {
+      Instant now = Instant.now();
+      this.watermark = now.isAfter(watermark) ? now : watermark;
+      return watermark;
+    }
+
+    @Override
+    public Instant getState() {
+      return watermark;
+    }
+  }
+
+  /**
+   * A watermark estimator that observes and timestamps of records output from a DoFn reporting the
+   * timestamp of the last element seen as the current watermark.
+   *
+   * <p>Note that this watermark estimator requires output timestamps in monotonically increasing
+   * order.
+   */
+  public static class MonotonicallyIncreasing
+      implements TimestampObservingWatermarkEstimator<Instant> {
+    private Instant watermark;
+
+    public MonotonicallyIncreasing(Instant watermark) {
+      this.watermark = checkNotNull(watermark, "timestamp must not be null.");
+      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Provided watermark %s must be within bounds [%s, %s].",
+                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.TIMESTAMP_MAX_VALUE));
+      }
+    }
+
+    @Override
+    public void observeTimestamp(Instant timestamp) {
+      // Beyond bounds error checking isn't important since the system is expected to perform output
+      // timestamp bounds checking already.
+      if (timestamp.isBefore(this.watermark)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Timestamp must be monotonically increasing. Provided timestamp %s is less then "
+                    + "previously provided timestamp %s.",
+                timestamp, this.watermark));
+      }
+      this.watermark = timestamp;
+    }
+
+    @Override
+    public Instant currentWatermark() {
+      return watermark;
+    }
+
+    @Override
+    public Instant getState() {
+      return watermark;
+    }
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index f2e4676..9dcc86a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -34,14 +34,17 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -57,8 +60,11 @@ import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -402,7 +408,9 @@ public class DoFnInvokersTest {
   public static class MockFn extends DoFn<String, String> {
     @ProcessElement
     public ProcessContinuation processElement(
-        ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) {
+        ProcessContext c,
+        RestrictionTracker<SomeRestriction, Void> tracker,
+        WatermarkEstimator<Instant> watermarkEstimator) {
       return null;
     }
 
@@ -426,6 +434,22 @@ public class DoFnInvokersTest {
     public SomeRestrictionCoder getRestrictionCoder() {
       return null;
     }
+
+    @GetInitialWatermarkEstimatorState
+    public Instant getInitialWatermarkEstimatorState() {
+      return null;
+    }
+
+    @GetWatermarkEstimatorStateCoder
+    public InstantCoder getWatermarkEstimatorStateCoder() {
+      return null;
+    }
+
+    @NewWatermarkEstimator
+    public WatermarkEstimator<Instant> newWatermarkEstimator(
+        @WatermarkEstimatorState Instant watermarkEstimatorState) {
+      return null;
+    }
   }
 
   @Test
@@ -434,11 +458,16 @@ public class DoFnInvokersTest {
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
     final SomeRestrictionTracker tracker = mock(SomeRestrictionTracker.class);
     final SomeRestrictionCoder coder = mock(SomeRestrictionCoder.class);
+    final InstantCoder watermarkEstimatorStateCoder = InstantCoder.of();
+    final Instant watermarkEstimatorState = Instant.now();
+    final WatermarkEstimator<Instant> watermarkEstimator =
+        new WatermarkEstimators.Manual(watermarkEstimatorState);
     SomeRestriction restriction = new SomeRestriction();
     final SomeRestriction part1 = new SomeRestriction();
     final SomeRestriction part2 = new SomeRestriction();
     final SomeRestriction part3 = new SomeRestriction();
     when(fn.getRestrictionCoder()).thenReturn(coder);
+    when(fn.getWatermarkEstimatorStateCoder()).thenReturn(watermarkEstimatorStateCoder);
     when(fn.getInitialRestriction(mockElement)).thenReturn(restriction);
     doAnswer(
             AdditionalAnswers.delegatesTo(
@@ -456,11 +485,15 @@ public class DoFnInvokersTest {
                 }))
         .when(fn)
         .splitRestriction(eq(mockElement), same(restriction), any());
+    when(fn.getInitialWatermarkEstimatorState()).thenReturn(watermarkEstimatorState);
     when(fn.newTracker(restriction)).thenReturn(tracker);
-    when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume());
+    when(fn.newWatermarkEstimator(watermarkEstimatorState)).thenReturn(watermarkEstimator);
+    when(fn.processElement(mockProcessContext, tracker, watermarkEstimator)).thenReturn(resume());
 
     assertEquals(coder, invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault()));
-
+    assertEquals(
+        watermarkEstimatorStateCoder,
+        invoker.invokeGetWatermarkEstimatorStateCoder(CoderRegistry.createDefault()));
     assertEquals(
         restriction,
         invoker.invokeGetInitialRestriction(
@@ -501,6 +534,9 @@ public class DoFnInvokersTest {
 
     assertEquals(Arrays.asList(part1, part2, part3), outputs);
     assertEquals(
+        watermarkEstimatorState,
+        invoker.invokeGetInitialWatermarkEstimatorState(new FakeArgumentProvider<>()));
+    assertEquals(
         tracker,
         invoker.invokeNewTracker(
             new FakeArgumentProvider<String, String>() {
@@ -515,6 +551,15 @@ public class DoFnInvokersTest {
               }
             }));
     assertEquals(
+        watermarkEstimator,
+        invoker.invokeNewWatermarkEstimator(
+            new FakeArgumentProvider<String, String>() {
+              @Override
+              public Object watermarkEstimatorState() {
+                return watermarkEstimatorState;
+              }
+            }));
+    assertEquals(
         resume(),
         invoker.invokeProcessElement(
             new FakeArgumentProvider<String, String>() {
@@ -527,6 +572,11 @@ public class DoFnInvokersTest {
               public RestrictionTracker<?, ?> restrictionTracker() {
                 return tracker;
               }
+
+              @Override
+              public WatermarkEstimator<?> watermarkEstimator() {
+                return watermarkEstimator;
+              }
             }));
   }
 
@@ -573,17 +623,64 @@ public class DoFnInvokersTest {
     }
   }
 
+  private static class WatermarkEstimatorStateWithDefaultWatermarkEstimator
+      implements HasDefaultWatermarkEstimator<
+          WatermarkEstimatorStateWithDefaultWatermarkEstimator, DefaultWatermarkEstimator> {
+
+    @Override
+    public DefaultWatermarkEstimator newWatermarkEstimator() {
+      return new DefaultWatermarkEstimator();
+    }
+  }
+
+  private static class DefaultWatermarkEstimator
+      implements WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator> {
+    @Override
+    public Instant currentWatermark() {
+      return null;
+    }
+
+    @Override
+    public WatermarkEstimatorStateWithDefaultWatermarkEstimator getState() {
+      return null;
+    }
+  }
+
+  private static class CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator
+      extends AtomicCoder<WatermarkEstimatorStateWithDefaultWatermarkEstimator> {
+
+    @Override
+    public void encode(
+        WatermarkEstimatorStateWithDefaultWatermarkEstimator value, OutputStream outStream)
+        throws CoderException, IOException {}
+
+    @Override
+    public WatermarkEstimatorStateWithDefaultWatermarkEstimator decode(InputStream inStream)
+        throws CoderException, IOException {
+      return null;
+    }
+  }
+
   @Test
   public void testSplittableDoFnDefaultMethods() throws Exception {
     class MockFn extends DoFn<String, String> {
       @ProcessElement
       public void processElement(
-          ProcessContext c, RestrictionTracker<RestrictionWithDefaultTracker, Void> tracker) {}
+          ProcessContext c,
+          RestrictionTracker<RestrictionWithDefaultTracker, Void> tracker,
+          WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator>
+              watermarkEstimator) {}
 
       @GetInitialRestriction
       public RestrictionWithDefaultTracker getInitialRestriction(@Element String element) {
         return null;
       }
+
+      @GetInitialWatermarkEstimatorState
+      public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+          getInitialWatermarkEstimatorState() {
+        return null;
+      }
     }
 
     MockFn fn = mock(MockFn.class);
@@ -593,9 +690,15 @@ public class DoFnInvokersTest {
     coderRegistry.registerCoderProvider(
         CoderProviders.fromStaticMethods(
             RestrictionWithDefaultTracker.class, CoderForDefaultTracker.class));
+    coderRegistry.registerCoderForClass(
+        WatermarkEstimatorStateWithDefaultWatermarkEstimator.class,
+        new CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator());
     assertThat(
         invoker.<RestrictionWithDefaultTracker>invokeGetRestrictionCoder(coderRegistry),
         instanceOf(CoderForDefaultTracker.class));
+    assertThat(
+        invoker.invokeGetWatermarkEstimatorStateCoder(coderRegistry),
+        instanceOf(CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator.class));
     invoker.invokeSplitRestriction(
         new FakeArgumentProvider<String, String>() {
           @Override
@@ -639,6 +742,15 @@ public class DoFnInvokersTest {
               }
             }),
         instanceOf(DefaultTracker.class));
+    assertThat(
+        invoker.invokeNewWatermarkEstimator(
+            new FakeArgumentProvider<String, String>() {
+              @Override
+              public Object watermarkEstimatorState() {
+                return new WatermarkEstimatorStateWithDefaultWatermarkEstimator();
+              }
+            }),
+        instanceOf(DefaultWatermarkEstimator.class));
   }
 
   // ---------------------------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 2a2a08e..e030d99 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.lang.reflect.Method;
 import java.util.List;
+import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
@@ -36,15 +37,19 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.DoFn.Element;
 import org.apache.beam.sdk.transforms.DoFn.Restriction;
-import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures.FnAnalysisContext;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
@@ -272,7 +277,9 @@ public class DoFnSignaturesSplittableDoFnTest {
     class GoodSplittableDoFn extends DoFn<Integer, String> {
       @ProcessElement
       public ProcessContinuation processElement(
-          ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {
+          ProcessContext context,
+          RestrictionTracker<SomeRestriction, Void> tracker,
+          ManualWatermarkEstimator<Instant> watermarkEstimator) {
         return null;
       }
 
@@ -312,7 +319,6 @@ public class DoFnSignaturesSplittableDoFnTest {
       public double getSize(
           @Element Integer element,
           @Restriction SomeRestriction restriction,
-          RestrictionTracker<SomeRestriction, Void> restrictionTracker,
           PipelineOptions pipelineOptions,
           BoundedWindow boundedWindow,
           PaneInfo paneInfo,
@@ -324,10 +330,41 @@ public class DoFnSignaturesSplittableDoFnTest {
       public SomeRestrictionCoder getRestrictionCoder() {
         return null;
       }
+
+      @GetInitialWatermarkEstimatorState
+      public Instant getInitialWatermarkEstimatorState(
+          @Element Integer element,
+          @Restriction SomeRestriction restriction,
+          PipelineOptions pipelineOptions,
+          BoundedWindow boundedWindow,
+          PaneInfo paneInfo,
+          @Timestamp Instant timestamp) {
+        return null;
+      }
+
+      @GetWatermarkEstimatorStateCoder
+      public InstantCoder getWatermarkEstimatorStateCoder() {
+        return null;
+      }
+
+      @NewWatermarkEstimator
+      public WatermarkEstimators.Manual newWatermarkEstimator(
+          @WatermarkEstimatorState Instant watermarkEstimatorState,
+          @Element Integer element,
+          @Restriction SomeRestriction restriction,
+          PipelineOptions pipelineOptions,
+          BoundedWindow boundedWindow,
+          PaneInfo paneInfo,
+          @Timestamp Instant timestamp) {
+        return null;
+      }
     }
 
     DoFnSignature signature = DoFnSignatures.getSignature(GoodSplittableDoFn.class);
     assertEquals(RestrictionTracker.class, signature.processElement().trackerT().getRawType());
+    assertEquals(
+        ManualWatermarkEstimator.class,
+        signature.processElement().watermarkEstimatorT().getRawType());
     assertTrue(signature.processElement().isSplittable());
     assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
@@ -350,6 +387,18 @@ public class DoFnSignaturesSplittableDoFnTest {
         getParameterOfType(signature.getSize().extraParameters(), RestrictionParameter.class)
             .restrictionT()
             .getRawType());
+    assertEquals(
+        Instant.class,
+        signature.getInitialWatermarkEstimatorState().watermarkEstimatorStateT().getRawType());
+    assertEquals(
+        Instant.class,
+        getParameterOfType(
+                signature.newWatermarkEstimator().extraParameters(),
+                WatermarkEstimatorStateParameter.class)
+            .estimatorStateT()
+            .getRawType());
+    assertEquals(
+        InstantCoder.class, signature.getWatermarkEstimatorStateCoder().coderT().getRawType());
   }
 
   /**
@@ -358,9 +407,17 @@ public class DoFnSignaturesSplittableDoFnTest {
    */
   @Test
   public void testSplittableWithAllFunctionsGeneric() throws Exception {
-    class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> {
+    class GoodGenericSplittableDoFn<
+            RestrictionT,
+            TrackerT,
+            RestrictionCoderT,
+            WatermarkEstimatorStateT,
+            WatermarkEstimatorStateCoderT,
+            WatermarkEstimatorT>
+        extends DoFn<Integer, String> {
       @ProcessElement
-      public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) {
+      public ProcessContinuation processElement(
+          ProcessContext context, TrackerT tracker, WatermarkEstimatorT watermarkEstimatorT) {
         return null;
       }
 
@@ -379,14 +436,30 @@ public class DoFnSignaturesSplittableDoFnTest {
       }
 
       @GetRestrictionCoder
-      public CoderT getRestrictionCoder() {
+      public RestrictionCoderT getRestrictionCoder() {
         return null;
       }
 
       @GetSize
-      public double getSize(@Restriction RestrictionT restriction, TrackerT restrictionTracker) {
+      public double getSize(@Restriction RestrictionT restriction) {
         return 1.0;
       }
+
+      @GetInitialWatermarkEstimatorState
+      public WatermarkEstimatorStateT getInitialWatermarkEstimatorState() {
+        return null;
+      }
+
+      @GetWatermarkEstimatorStateCoder
+      public WatermarkEstimatorStateCoderT getWatermarkEstimatorStateCoder() {
+        return null;
+      }
+
+      @NewWatermarkEstimator
+      public WatermarkEstimatorT newWatermarkEstimator(
+          @WatermarkEstimatorState WatermarkEstimatorStateT watermarkEstimatorState) {
+        return null;
+      }
     }
 
     DoFnSignature signature =
@@ -394,8 +467,14 @@ public class DoFnSignaturesSplittableDoFnTest {
             new GoodGenericSplittableDoFn<
                 SomeRestriction,
                 RestrictionTracker<SomeRestriction, ?>,
-                SomeRestrictionCoder>() {}.getClass());
+                SomeRestrictionCoder,
+                Instant,
+                InstantCoder,
+                ManualWatermarkEstimator<Instant>>() {}.getClass());
     assertEquals(RestrictionTracker.class, signature.processElement().trackerT().getRawType());
+    assertEquals(
+        ManualWatermarkEstimator.class,
+        signature.processElement().watermarkEstimatorT().getRawType());
     assertTrue(signature.processElement().isSplittable());
     assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
@@ -413,6 +492,18 @@ public class DoFnSignaturesSplittableDoFnTest {
             .restrictionT()
             .getRawType());
     assertEquals(SomeRestrictionCoder.class, signature.getRestrictionCoder().coderT().getRawType());
+    assertEquals(
+        Instant.class,
+        signature.getInitialWatermarkEstimatorState().watermarkEstimatorStateT().getRawType());
+    assertEquals(
+        Instant.class,
+        getParameterOfType(
+                signature.newWatermarkEstimator().extraParameters(),
+                WatermarkEstimatorStateParameter.class)
+            .estimatorStateT()
+            .getRawType());
+    assertEquals(
+        InstantCoder.class, signature.getWatermarkEstimatorStateCoder().coderT().getRawType());
   }
 
   @Test
@@ -446,6 +537,68 @@ public class DoFnSignaturesSplittableDoFnTest {
   }
 
   @Test
+  public void testGetInitialWatermarkEstimatorStateUnsupportedSchemaElementArgument()
+      throws Exception {
+    thrown.expectMessage(
+        "Schema @Element are not supported for @GetInitialWatermarkEstimatorState method. Found String, did you mean to use Integer?");
+    DoFnSignatures.analyzeGetInitialWatermarkEstimatorStateMethod(
+        errors(),
+        TypeDescriptor.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          SomeRestriction method(@Element String element) {
+            return null;
+          }
+        }.getMethod(),
+        TypeDescriptor.of(Integer.class),
+        TypeDescriptor.of(String.class),
+        FnAnalysisContext.create());
+  }
+
+  @Test
+  public void testNewWatermarkEstimatorUnsupportedSchemaElementArgument() throws Exception {
+    thrown.expectMessage(
+        "Schema @Element are not supported for @NewWatermarkEstimator method. Found String, did you mean to use Integer?");
+    DoFnSignatures.analyzeNewWatermarkEstimatorMethod(
+        errors(),
+        TypeDescriptor.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          WatermarkEstimator<Instant> method(@Element String element) {
+            return null;
+          }
+        }.getMethod(),
+        TypeDescriptor.of(Integer.class),
+        TypeDescriptor.of(String.class),
+        TypeDescriptor.of(SomeRestriction.class),
+        TypeDescriptor.of(Instant.class),
+        FnAnalysisContext.create());
+  }
+
+  @Test
+  public void testMissingNewWatermarkEstimatorMethod() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext context,
+          RestrictionTracker<SomeRestriction, Void> tracker,
+          ManualWatermarkEstimator<Instant> watermarkEstimator) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction() {
+        return null;
+      }
+
+      @GetInitialWatermarkEstimatorState
+      public Instant getInitialWatermarkEstimatorState() {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "Splittable, either @NewWatermarkEstimator method must be defined or Instant must implement HasDefaultWatermarkEstimator.");
+    DoFnSignatures.getSignature(BadFn.class);
+  }
+
+  @Test
   public void testSplittableMissingNewTrackerMethod() throws Exception {
     class OtherRestriction {}
 
@@ -488,6 +641,40 @@ public class DoFnSignaturesSplittableDoFnTest {
     assertEquals(RestrictionTracker.class, signature.processElement().trackerT().getRawType());
   }
 
+  abstract static class SomeDefaultWatermarkEstimator
+      implements WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator> {}
+
+  abstract static class WatermarkEstimatorStateWithDefaultWatermarkEstimator
+      implements HasDefaultWatermarkEstimator<
+          WatermarkEstimatorStateWithDefaultWatermarkEstimator, SomeDefaultWatermarkEstimator> {}
+
+  @Test
+  public void testHasDefaultWatermarkEstimator() throws Exception {
+    class Fn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext c,
+          RestrictionTracker<SomeRestriction, Void> tracker,
+          WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator>
+              watermarkEstimator) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(@Element Integer element) {
+        return null;
+      }
+
+      @GetInitialWatermarkEstimatorState
+      public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+          getInitialWatermarkEstimatorState() {
+        return null;
+      }
+    }
+
+    DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
+    assertEquals(
+        WatermarkEstimator.class, signature.processElement().watermarkEstimatorT().getRawType());
+  }
+
   @Test
   public void testRestrictionHasDefaultTrackerProcessUsesWrongTracker() throws Exception {
     class Fn extends DoFn<Integer, String> {
@@ -507,6 +694,34 @@ public class DoFnSignaturesSplittableDoFnTest {
   }
 
   @Test
+  public void
+      testWatermarkEstimatorStateHasDefaultWatermarkEstimatorProcessUsesWrongWatermarkEstimator()
+          throws Exception {
+    class Fn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext c,
+          RestrictionTracker<SomeRestriction, Void> tracker,
+          SomeDefaultWatermarkEstimator watermarkEstimator) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(@Element Integer element) {
+        return null;
+      }
+
+      @GetInitialWatermarkEstimatorState
+      public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+          getInitialWatermarkEstimatorState() {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "Has watermark estimator type SomeDefaultWatermarkEstimator, but the DoFn's watermark estimator type must be one of [WatermarkEstimator, ManualWatermarkEstimator] types.");
+    DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
+  }
+
+  @Test
   public void testNewTrackerReturnsWrongType() throws Exception {
     class BadFn extends DoFn<Integer, String> {
       @ProcessElement
@@ -528,6 +743,26 @@ public class DoFnSignaturesSplittableDoFnTest {
   }
 
   @Test
+  public void testNewWatermarkEstimatorReturnsWrongType() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(@Element Integer element) {
+        return null;
+      }
+
+      @NewWatermarkEstimator
+      public void newWatermarkEstimator() {}
+    }
+
+    thrown.expectMessage("Returns void, but must return a subtype of WatermarkEstimator<Void>");
+    DoFnSignatures.getSignature(BadFn.class);
+  }
+
+  @Test
   public void testGetInitialRestrictionMismatchesNewTracker() throws Exception {
     class BadFn extends DoFn<Integer, String> {
       @ProcessElement
@@ -551,6 +786,36 @@ public class DoFnSignaturesSplittableDoFnTest {
   }
 
   @Test
+  public void testGetInitialWatermarkEstimatorStateMismatchesNewWatermarkEstimator()
+      throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(@Element Integer element) {
+        return null;
+      }
+
+      @GetInitialWatermarkEstimatorState
+      public Instant getInitalWatermarkEstimatorState() {
+        return null;
+      }
+
+      @NewWatermarkEstimator
+      public WatermarkEstimator<Void> newWatermarkEstimator(
+          @WatermarkEstimatorState Instant watermarkEstimatorState) {
+        return null;
+      }
+    }
+
+    thrown.expectMessage("but must return a subtype of WatermarkEstimator<Instant>");
+    thrown.expectMessage("newWatermarkEstimator(Instant): Returns WatermarkEstimator<Void>");
+    DoFnSignatures.getSignature(BadFn.class);
+  }
+
+  @Test
   public void testGetRestrictionCoderReturnsWrongType() throws Exception {
     class BadFn extends DoFn<Integer, String> {
       @ProcessElement
@@ -579,6 +844,29 @@ public class DoFnSignaturesSplittableDoFnTest {
   }
 
   @Test
+  public void testGetWatermarkEstimatorStateCoderReturnsWrongType() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext context, RestrictionTracker<SomeRestriction, Void> tracker) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(@Element Integer element) {
+        return null;
+      }
+
+      @GetWatermarkEstimatorStateCoder
+      public KvCoder getWatermarkEstimatorStateCoder() {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "getWatermarkEstimatorStateCoder() returns KvCoder which is not a subtype of Coder<Void>");
+    DoFnSignatures.getSignature(BadFn.class);
+  }
+
+  @Test
   public void testSplitRestrictionReturnsWrongType() throws Exception {
     thrown.expectMessage(
         "OutputReceiver should be parameterized by "
@@ -696,11 +984,26 @@ public class DoFnSignaturesSplittableDoFnTest {
       public double getSize() {
         return 1.0;
       }
+
+      @GetInitialWatermarkEstimatorState
+      public Instant getInitialWatermarkEstimatorState() {
+        return null;
+      }
+
+      @GetWatermarkEstimatorStateCoder
+      public InstantCoder getWatermarkEstimatorStateCoder() {
+        return null;
+      }
+
+      @NewWatermarkEstimator
+      public WatermarkEstimator<Instant> newWatermarkEstimator() {
+        return null;
+      }
     }
 
     thrown.expectMessage(
         "Non-splittable, but defines methods: "
-            + "[@GetInitialRestriction, @SplitRestriction, @NewTracker, @GetRestrictionCoder, @GetSize]");
+            + "[@GetInitialRestriction, @SplitRestriction, @NewTracker, @GetRestrictionCoder, @GetSize, @GetInitialWatermarkEstimatorState, @GetWatermarkEstimatorStateCoder, @NewWatermarkEstimator]");
     DoFnSignatures.getSignature(BadFn.class);
   }
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
new file mode 100644
index 0000000..3fd0dec
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.apache.beam.sdk.testing.ResetDateTimeProvider;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.joda.time.DateTimeUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class WatermarkEstimatorsTest {
+  public @Rule ResetDateTimeProvider resetDateTimeProvider = new ResetDateTimeProvider();
+
+  @Test
+  public void testManualWatermarkEstimator() throws Exception {
+    ManualWatermarkEstimator<Instant> watermarkEstimator =
+        new WatermarkEstimators.Manual(GlobalWindow.TIMESTAMP_MIN_VALUE);
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE, watermarkEstimator.currentWatermark());
+    watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE);
+    watermarkEstimator.setWatermark(
+        watermarkEstimator.currentWatermark().plus(Duration.standardHours(1)));
+    assertEquals(
+        GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(1)),
+        watermarkEstimator.currentWatermark());
+    assertThrows(
+        "must be within bounds",
+        IllegalArgumentException.class,
+        () -> watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE.minus(1)));
+    assertThrows(
+        "must be within bounds",
+        IllegalArgumentException.class,
+        () -> watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MAX_VALUE.plus(1)));
+    assertThrows(
+        "monotonically increasing",
+        IllegalArgumentException.class,
+        () -> watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE));
+    watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MAX_VALUE);
+    assertEquals(GlobalWindow.TIMESTAMP_MAX_VALUE, watermarkEstimator.currentWatermark());
+  }
+
+  @Test
+  public void testWallTimeWatermarkEstimator() throws Exception {
+    DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.getMillis());
+    WatermarkEstimator<Instant> watermarkEstimator =
+        new WatermarkEstimators.WallTime(new Instant());
+    DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1).getMillis());
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1), watermarkEstimator.currentWatermark());
+
+    DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2).getMillis());
+    // Make sure that we don't mutate state even if the clock advanced
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1), watermarkEstimator.getState());
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2), watermarkEstimator.currentWatermark());
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2), watermarkEstimator.getState());
+
+    // Handle the case if the clock ever goes backwards. Could happen if we resumed processing
+    // on a machine that had misconfigured clock or due to clock skew.
+    DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1).getMillis());
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2), watermarkEstimator.currentWatermark());
+  }
+
+  @Test
+  public void testMonotonicallyIncreasingWatermarkEstimator() throws Exception {
+    TimestampObservingWatermarkEstimator<Instant> watermarkEstimator =
+        new WatermarkEstimators.MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE);
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE, watermarkEstimator.currentWatermark());
+    watermarkEstimator.observeTimestamp(GlobalWindow.TIMESTAMP_MIN_VALUE);
+    watermarkEstimator.observeTimestamp(
+        watermarkEstimator.currentWatermark().plus(Duration.standardHours(1)));
+    assertEquals(
+        GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(1)),
+        watermarkEstimator.currentWatermark());
+    assertThrows(
+        "monotonically increasing",
+        IllegalArgumentException.class,
+        () -> watermarkEstimator.observeTimestamp(GlobalWindow.TIMESTAMP_MIN_VALUE));
+    watermarkEstimator.observeTimestamp(GlobalWindow.TIMESTAMP_MAX_VALUE);
+    assertEquals(GlobalWindow.TIMESTAMP_MAX_VALUE, watermarkEstimator.currentWatermark());
+  }
+}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 9fb5d3e..681d041 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -81,6 +81,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -1242,6 +1243,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, OutputT> {
           currentOutputWatermark);
       currentOutputWatermark = watermark;
     }
+
+    @Override
+    public Object watermarkEstimatorState() {
+      throw new UnsupportedOperationException(
+          "@WatermarkEstimatorState parameters are not supported.");
+    }
+
+    @Override
+    public WatermarkEstimator<?> watermarkEstimator() {
+      throw new UnsupportedOperationException("WatermarkEstimator parameters are not supported.");
+    }
   }
 
   /** Provides arguments for a {@link DoFnInvoker} for {@link DoFn.OnTimer @OnTimer}. */