You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/08 00:34:49 UTC

[1/4] beam git commit: Adds tests for the watermark hold (previously untested)

Repository: beam
Updated Branches:
  refs/heads/master 4a694cebb -> 1594849da


Adds tests for the watermark hold (previously untested)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/29c28021
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/29c28021
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/29c28021

Branch: refs/heads/master
Commit: 29c280211c2431f29c5552c35bd3435c65e4975b
Parents: dad7ace
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 14:00:05 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 17:06:13 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SplittableParDoTest.java  | 56 +++++++++++++++++++-
 1 file changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/29c28021/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index f8d6095..d301113 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
 
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -36,6 +37,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -208,7 +210,7 @@ public class SplittableParDoTest {
     private Instant currentProcessingTime;
 
     private InMemoryTimerInternals timerInternals;
-    private InMemoryStateInternals<String> stateInternals;
+    private TestInMemoryStateInternals<String> stateInternals;
 
     ProcessFnTester(
         Instant currentProcessingTime,
@@ -223,7 +225,7 @@ public class SplittableParDoTest {
               fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
       this.tester = DoFnTester.of(processFn);
       this.timerInternals = new InMemoryTimerInternals();
-      this.stateInternals = InMemoryStateInternals.forKey("dummy");
+      this.stateInternals = new TestInMemoryStateInternals<>("dummy");
       processFn.setStateInternalsFactory(
           new StateInternalsFactory<String>() {
             @Override
@@ -335,6 +337,9 @@ public class SplittableParDoTest {
       return tester.takeOutputElements();
     }
 
+    public Instant getWatermarkHold() {
+      return stateInternals.earliestWatermarkHold();
+    }
   }
 
   private static class OutputWindowedValueToDoFnTester<OutputT>
@@ -425,6 +430,53 @@ public class SplittableParDoTest {
     }
   }
 
+  private static class WatermarkUpdateFn extends DoFn<Instant, String> {
+    @ProcessElement
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+      for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
+        c.updateWatermark(c.element().plus(Duration.standardSeconds(i)));
+        c.output(String.valueOf(i));
+      }
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(Instant elem) {
+      throw new IllegalStateException("Expected to be supplied explicitly in this test");
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(OffsetRange range) {
+      return new OffsetRangeTracker(range);
+    }
+  }
+
+  @Test
+  public void testUpdatesWatermark() throws Exception {
+    DoFn<Instant, String> fn = new WatermarkUpdateFn();
+    Instant base = Instant.now();
+
+    ProcessFnTester<Instant, String, OffsetRange, OffsetRangeTracker> tester =
+        new ProcessFnTester<>(
+            base,
+            fn,
+            InstantCoder.of(),
+            SerializableCoder.of(OffsetRange.class),
+            3,
+            MAX_BUNDLE_DURATION);
+
+    tester.startElement(base, new OffsetRange(0, 8));
+    assertThat(tester.takeOutputElements(), hasItems("0", "1", "2"));
+    assertEquals(base.plus(Duration.standardSeconds(2)), tester.getWatermarkHold());
+
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    assertThat(tester.takeOutputElements(), hasItems("3", "4", "5"));
+    assertEquals(base.plus(Duration.standardSeconds(5)), tester.getWatermarkHold());
+
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    assertThat(tester.takeOutputElements(), hasItems("6", "7"));
+    assertEquals(null, tester.getWatermarkHold());
+  }
+
   /**
    * A splittable {@link DoFn} that generates the sequence [init, init + total).
    */


[3/4] beam git commit: Clarifies doc of ProcessElement re: HasDefaultTracker

Posted by jk...@apache.org.
Clarifies doc of ProcessElement re: HasDefaultTracker


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/19f407c9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/19f407c9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/19f407c9

Branch: refs/heads/master
Commit: 19f407c9497c911ca3cb61d989aa5a78c84896cf
Parents: 4a694ce
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 17:00:39 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 17:06:13 2017 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/transforms/DoFn.java     | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/19f407c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index de33612..5139290 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -548,10 +549,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    * <ul>
    * <li>It <i>must</i> define a {@link GetInitialRestriction} method.
    * <li>It <i>may</i> define a {@link SplitRestriction} method.
-   * <li>It <i>must</i> define a {@link NewTracker} method returning the same type as the type of
+   * <li>It <i>may</i> define a {@link NewTracker} method returning the same type as the type of
    *     the {@link RestrictionTracker} argument of {@link ProcessElement}, which in turn must be a
    *     subtype of {@code RestrictionTracker<R>} where {@code R} is the restriction type returned
-   *     by {@link GetInitialRestriction}.
+   *     by {@link GetInitialRestriction}. This method is optional in case the restriction type
+   *     returned by {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
    * <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
    * <li>The type of restrictions used by all of these methods must be the same.
    * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to


[4/4] beam git commit: This closes #2455

Posted by jk...@apache.org.
This closes #2455


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1594849d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1594849d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1594849d

Branch: refs/heads/master
Commit: 1594849da972f5b4dcfc52eb3991d1df083d6719
Parents: 4a694ce 29c2802
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 17:08:54 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 17:08:54 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnAdapters.java  |   5 +
 ...eBoundedSplittableProcessElementInvoker.java | 125 ++++++------
 .../beam/runners/core/SimpleDoFnRunner.java     |   5 +
 .../beam/runners/core/SplittableParDo.java      |  15 +-
 .../core/SplittableProcessElementInvoker.java   |  22 +--
 ...ndedSplittableProcessElementInvokerTest.java |  16 +-
 .../beam/runners/core/SplittableParDoTest.java  | 197 ++++++-------------
 .../org/apache/beam/sdk/transforms/DoFn.java    |  84 ++------
 .../apache/beam/sdk/transforms/DoFnTester.java  |   5 +
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  20 +-
 .../sdk/transforms/reflect/DoFnInvoker.java     |   4 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  10 +-
 .../sdk/transforms/reflect/DoFnSignatures.java  |  22 +--
 .../splittabledofn/OffsetRangeTracker.java      |  33 +++-
 .../splittabledofn/RestrictionTracker.java      |   8 +
 .../beam/sdk/transforms/SplittableDoFnTest.java |  17 +-
 .../transforms/reflect/DoFnInvokersTest.java    |  99 +++-------
 .../DoFnSignaturesProcessElementTest.java       |   2 +-
 .../DoFnSignaturesSplittableDoFnTest.java       |  74 +------
 .../splittabledofn/OffsetRangeTrackerTest.java  |  49 ++++-
 20 files changed, 306 insertions(+), 506 deletions(-)
----------------------------------------------------------------------



[2/4] beam git commit: Fixes SDF issues re: watermarks and stop/resume

Posted by jk...@apache.org.
Fixes SDF issues re: watermarks and stop/resume

See detailed discussion in document:
https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dad7ace2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dad7ace2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dad7ace2

Branch: refs/heads/master
Commit: dad7ace2b6143850fac0ca5359d9f56f5f0df2c1
Parents: 19f407c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 6 15:07:05 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 17:06:13 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnAdapters.java  |   5 +
 ...eBoundedSplittableProcessElementInvoker.java | 125 +++++++-------
 .../beam/runners/core/SimpleDoFnRunner.java     |   5 +
 .../beam/runners/core/SplittableParDo.java      |  15 +-
 .../core/SplittableProcessElementInvoker.java   |  22 +--
 ...ndedSplittableProcessElementInvokerTest.java |  16 +-
 .../beam/runners/core/SplittableParDoTest.java  | 171 +++----------------
 .../org/apache/beam/sdk/transforms/DoFn.java    |  78 ++-------
 .../apache/beam/sdk/transforms/DoFnTester.java  |   5 +
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  20 +--
 .../sdk/transforms/reflect/DoFnInvoker.java     |   4 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  10 +-
 .../sdk/transforms/reflect/DoFnSignatures.java  |  22 +--
 .../splittabledofn/OffsetRangeTracker.java      |  33 +++-
 .../splittabledofn/RestrictionTracker.java      |   8 +
 .../beam/sdk/transforms/SplittableDoFnTest.java |  17 +-
 .../transforms/reflect/DoFnInvokersTest.java    |  99 +++--------
 .../DoFnSignaturesProcessElementTest.java       |   2 +-
 .../DoFnSignaturesSplittableDoFnTest.java       |  74 +-------
 .../splittabledofn/OffsetRangeTrackerTest.java  |  49 +++++-
 20 files changed, 263 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
index 693cb2f..deb3b7e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -286,6 +286,11 @@ public class DoFnAdapters {
     }
 
     @Override
+    public void updateWatermark(Instant watermark) {
+      throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()");
+    }
+
+    @Override
     public BoundedWindow window() {
       return context.window();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 357094c..27fd0a3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -97,70 +97,57 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
       final WindowedValue<InputT> element,
       final TrackerT tracker) {
     final ProcessContext processContext = new ProcessContext(element, tracker);
-    DoFn.ProcessContinuation cont =
-        invoker.invokeProcessElement(
-            new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
-              @Override
-              public DoFn<InputT, OutputT>.ProcessContext processContext(
-                  DoFn<InputT, OutputT> doFn) {
-                return processContext;
-              }
-
-              @Override
-              public RestrictionTracker<?> restrictionTracker() {
-                return tracker;
-              }
-
-              // Unsupported methods below.
-
-              @Override
-              public BoundedWindow window() {
-                throw new UnsupportedOperationException(
-                    "Access to window of the element not supported in Splittable DoFn");
-              }
-
-              @Override
-              public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) {
-                throw new IllegalStateException(
-                    "Should not access context() from @"
-                        + DoFn.ProcessElement.class.getSimpleName());
-              }
-
-              @Override
-              public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(
-                  DoFn<InputT, OutputT> doFn) {
-                throw new UnsupportedOperationException(
-                    "Access to timers not supported in Splittable DoFn");
-              }
-
-              @Override
-              public State state(String stateId) {
-                throw new UnsupportedOperationException(
-                    "Access to state not supported in Splittable DoFn");
-              }
-
-              @Override
-              public Timer timer(String timerId) {
-                throw new UnsupportedOperationException(
-                    "Access to timers not supported in Splittable DoFn");
-              }
-            });
-    RestrictionT residual;
-    RestrictionT forcedCheckpoint = processContext.extractCheckpoint();
-    if (cont.shouldResume()) {
-      if (forcedCheckpoint == null) {
-        // If no checkpoint was forced, the call returned voluntarily (i.e. all tryClaim() calls
-        // succeeded) - but we still need to have a checkpoint to resume from.
-        residual = tracker.checkpoint();
-      } else {
-        // A checkpoint was forced - i.e. the call probably (but not guaranteed) returned because of
-        // a failed tryClaim() call.
-        residual = forcedCheckpoint;
-      }
-    } else {
-      residual = null;
-    }
-    return new Result(residual, cont);
+    invoker.invokeProcessElement(
+        new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
+          @Override
+          public DoFn<InputT, OutputT>.ProcessContext processContext(
+              DoFn<InputT, OutputT> doFn) {
+            return processContext;
+          }
+
+          @Override
+          public RestrictionTracker<?> restrictionTracker() {
+            return tracker;
+          }
+
+          // Unsupported methods below.
+
+          @Override
+          public BoundedWindow window() {
+            throw new UnsupportedOperationException(
+                "Access to window of the element not supported in Splittable DoFn");
+          }
+
+          @Override
+          public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) {
+            throw new IllegalStateException(
+                "Should not access context() from @"
+                    + DoFn.ProcessElement.class.getSimpleName());
+          }
+
+          @Override
+          public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(
+              DoFn<InputT, OutputT> doFn) {
+            throw new UnsupportedOperationException(
+                "Access to timers not supported in Splittable DoFn");
+          }
+
+          @Override
+          public State state(String stateId) {
+            throw new UnsupportedOperationException(
+                "Access to state not supported in Splittable DoFn");
+          }
+
+          @Override
+          public Timer timer(String timerId) {
+            throw new UnsupportedOperationException(
+                "Access to timers not supported in Splittable DoFn");
+          }
+        });
+
+    tracker.checkDone();
+    return new Result(
+        processContext.extractCheckpoint(), processContext.getLastReportedWatermark());
   }
 
   private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
@@ -176,6 +163,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
     private RestrictionT checkpoint;
     // A handle on the scheduled action to take a checkpoint.
     private Future<?> scheduledCheckpoint;
+    private Instant lastReportedWatermark;
 
     public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
       fn.super();
@@ -241,6 +229,15 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
     }
 
     @Override
+    public synchronized void updateWatermark(Instant watermark) {
+      lastReportedWatermark = watermark;
+    }
+
+    public synchronized Instant getLastReportedWatermark() {
+      return lastReportedWatermark;
+    }
+
+    @Override
     public PipelineOptions getPipelineOptions() {
       return pipelineOptions;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 77286b2..98d88b6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -542,6 +542,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
+    public void updateWatermark(Instant watermark) {
+      throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()");
+    }
+
+    @Override
     public void output(OutputT output) {
       context.outputWindowedValue(windowedValue.withValue(output));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 0b311c7..c16bf44 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -324,14 +324,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     /**
      * The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is
      * acquired during the first {@link DoFn.ProcessElement} call for each element and restriction,
-     * and is released when the {@link DoFn.ProcessElement} call returns {@link
-     * DoFn.ProcessContinuation#stop}.
+     * and is released when the {@link DoFn.ProcessElement} call returns and there is no residual
+     * restriction captured by the {@link SplittableProcessElementInvoker}.
      *
      * <p>A hold is needed to avoid letting the output watermark immediately progress together with
      * the input watermark when the first {@link DoFn.ProcessElement} call for this element
      * completes.
-     *
-     * <p>The hold is updated with the future output watermark reported by ProcessContinuation.
      */
     private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag =
         StateTags.makeSystemTagInternal(
@@ -461,7 +459,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
               invoker, elementAndRestriction.element(), tracker);
 
       // Save state for resuming.
-      if (!result.getContinuation().shouldResume()) {
+      if (result.getResidualRestriction() == null) {
         // All work for this element/restriction is completed. Clear state and release hold.
         elementState.clear();
         restrictionState.clear();
@@ -469,16 +467,15 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         return;
       }
       restrictionState.write(result.getResidualRestriction());
-      Instant futureOutputWatermark = result.getContinuation().getWatermark();
+      Instant futureOutputWatermark = result.getFutureOutputWatermark();
       if (futureOutputWatermark == null) {
         futureOutputWatermark = elementAndRestriction.element().getTimestamp();
       }
-      Instant wakeupTime =
-          timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
       holdState.add(futureOutputWatermark);
       // Set a timer to continue processing this element.
       timerInternals.setTimer(
-          TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME));
+          TimerInternals.TimerData.of(
+              stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
index cfc39e7..ced6c01 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
 
 /**
  * A runner-specific hook for invoking a {@link DoFn.ProcessElement} method for a splittable {@link
@@ -31,25 +32,24 @@ public abstract class SplittableProcessElementInvoker<
     InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> {
   /** Specifies how to resume a splittable {@link DoFn.ProcessElement} call. */
   public class Result {
-    @Nullable private final RestrictionT residualRestriction;
-    private final DoFn.ProcessContinuation continuation;
+    @Nullable
+    private final RestrictionT residualRestriction;
+    private final Instant futureOutputWatermark;
 
     public Result(
-        @Nullable RestrictionT residualRestriction, DoFn.ProcessContinuation continuation) {
+        @Nullable RestrictionT residualRestriction, Instant futureOutputWatermark) {
       this.residualRestriction = residualRestriction;
-      this.continuation = continuation;
+      this.futureOutputWatermark = futureOutputWatermark;
     }
 
-    /**
-     * Can be {@code null} only if {@link #getContinuation} specifies the call should not resume.
-     */
+    /** If {@code null}, means the call should not resume. */
     @Nullable
     public RestrictionT getResidualRestriction() {
       return residualRestriction;
     }
 
-    public DoFn.ProcessContinuation getContinuation() {
-      return continuation;
+    public Instant getFutureOutputWatermark() {
+      return futureOutputWatermark;
     }
   }
 
@@ -57,8 +57,8 @@ public abstract class SplittableProcessElementInvoker<
    * Invokes the {@link DoFn.ProcessElement} method using the given {@link DoFnInvoker} for the
    * original {@link DoFn}, on the given element and with the given {@link RestrictionTracker}.
    *
-   * @return Information on how to resume the call: residual restriction and a {@link
-   *     DoFn.ProcessContinuation}.
+   * @return Information on how to resume the call: residual restriction and a
+   * future output watermark.
    */
   public abstract Result invokeProcessElement(
       DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, TrackerT tracker);

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index b85f481..965380b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -17,15 +17,11 @@
  */
 package org.apache.beam.runners.core;
 
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 
 import java.util.Collection;
 import java.util.concurrent.Executors;
@@ -54,17 +50,12 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
     }
 
     @ProcessElement
-    public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker)
+    public void process(ProcessContext context, OffsetRangeTracker tracker)
         throws Exception {
-      OffsetRange range = tracker.currentRestriction();
-      for (int i = (int) range.getFrom(); i < range.getTo(); ++i) {
-        if (!tracker.tryClaim(i)) {
-          return resume();
-        }
+      for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
         Thread.sleep(sleepBeforeEachOutput.getMillis());
         context.output("" + i);
       }
-      return stop();
     }
 
     @GetInitialRestriction
@@ -111,7 +102,6 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
   public void testInvokeProcessElementOutputBounded() throws Exception {
     SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
         runTest(10000, Duration.ZERO);
-    assertTrue(res.getContinuation().shouldResume());
     OffsetRange residualRange = res.getResidualRestriction();
     // Should process the first 100 elements.
     assertEquals(1000, residualRange.getFrom());
@@ -122,7 +112,6 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
   public void testInvokeProcessElementTimeBounded() throws Exception {
     SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
         runTest(10000, Duration.millis(100));
-    assertTrue(res.getContinuation().shouldResume());
     OffsetRange residualRange = res.getResidualRestriction();
     // Should process ideally around 30 elements - but due to timing flakiness, we can't enforce
     // that precisely. Just test that it's not egregiously off.
@@ -135,7 +124,6 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
   public void testInvokeProcessElementVoluntaryReturn() throws Exception {
     SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
         runTest(5, Duration.millis(100));
-    assertFalse(res.getContinuation().shouldResume());
     assertNull(res.getResidualRestriction());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 6205777..f8d6095 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.not;
@@ -43,9 +40,13 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -97,8 +98,12 @@ public class SplittableParDoTest {
     public SomeRestriction checkpoint() {
       return someRestriction;
     }
+
+    @Override
+    public void checkDone() {}
   }
 
+  @BoundedPerElement
   private static class BoundedFakeFn extends DoFn<Integer, String> {
     @ProcessElement
     public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
@@ -109,12 +114,10 @@ public class SplittableParDoTest {
     }
   }
 
+  @UnboundedPerElement
   private static class UnboundedFakeFn extends DoFn<Integer, String> {
     @ProcessElement
-    public ProcessContinuation processElement(
-        ProcessContext context, SomeRestrictionTracker tracker) {
-      return stop();
-    }
+    public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
 
     @GetInitialRestriction
     public SomeRestriction getInitialRestriction(Integer element) {
@@ -422,164 +425,40 @@ public class SplittableParDoTest {
     }
   }
 
-  /** A simple splittable {@link DoFn} that outputs the given element every 5 seconds forever. */
-  private static class SelfInitiatedResumeFn extends DoFn<Integer, String> {
-    @ProcessElement
-    public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) {
-      c.output(c.element().toString());
-      return resume().withResumeDelay(Duration.standardSeconds(5)).withWatermark(c.timestamp());
-    }
-
-    @GetInitialRestriction
-    public SomeRestriction getInitialRestriction(Integer elem) {
-      return new SomeRestriction();
-    }
-  }
-
-  @Test
-  public void testResumeSetsTimer() throws Exception {
-    DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
-    Instant base = Instant.now();
-    ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
-        new ProcessFnTester<>(
-            base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class),
-            MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
-
-    tester.startElement(42, new SomeRestriction());
-    assertThat(tester.takeOutputElements(), contains("42"));
-
-    // Should resume after 5 seconds: advancing by 3 seconds should have no effect.
-    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
-    assertTrue(tester.takeOutputElements().isEmpty());
-
-    // 6 seconds should be enough - should invoke the fn again.
-    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
-    assertThat(tester.takeOutputElements(), contains("42"));
-
-    // Should again resume after 5 seconds: advancing by 3 seconds should again have no effect.
-    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
-    assertTrue(tester.takeOutputElements().isEmpty());
-
-    // 6 seconds should again be enough.
-    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
-    assertThat(tester.takeOutputElements(), contains("42"));
-  }
-
-  private static class SomeCheckpoint
-      implements Serializable, HasDefaultTracker<SomeCheckpoint, SomeCheckpointTracker> {
-    private int firstUnprocessedIndex;
-
-    private SomeCheckpoint(int firstUnprocessedIndex) {
-      this.firstUnprocessedIndex = firstUnprocessedIndex;
-    }
-
-    @Override
-    public SomeCheckpointTracker newTracker() {
-      return new SomeCheckpointTracker(this);
-    }
-  }
-
-  private static class SomeCheckpointTracker implements RestrictionTracker<SomeCheckpoint> {
-    private SomeCheckpoint current;
-    private boolean isActive = true;
-
-    private SomeCheckpointTracker(SomeCheckpoint current) {
-      this.current = current;
-    }
-
-    @Override
-    public SomeCheckpoint currentRestriction() {
-      return current;
-    }
-
-    public boolean tryUpdateCheckpoint(int firstUnprocessedIndex) {
-      if (!isActive) {
-        return false;
-      }
-      current = new SomeCheckpoint(firstUnprocessedIndex);
-      return true;
-    }
-
-    @Override
-    public SomeCheckpoint checkpoint() {
-      isActive = false;
-      return current;
-    }
-  }
-
   /**
-   * A splittable {@link DoFn} that generates the sequence [init, init + total) in batches of given
-   * size.
+   * A splittable {@link DoFn} that generates the sequence [init, init + total).
    */
   private static class CounterFn extends DoFn<Integer, String> {
-    private final int numTotalOutputs;
-    private final int numOutputsPerCall;
-
-    private CounterFn(int numTotalOutputs, int numOutputsPerCall) {
-      this.numTotalOutputs = numTotalOutputs;
-      this.numOutputsPerCall = numOutputsPerCall;
-    }
-
     @ProcessElement
-    public ProcessContinuation process(ProcessContext c, SomeCheckpointTracker tracker) {
-      int start = tracker.currentRestriction().firstUnprocessedIndex;
-      for (int i = 0; i < numOutputsPerCall; ++i) {
-        int index = start + i;
-        if (!tracker.tryUpdateCheckpoint(index + 1)) {
-          return resume();
-        }
-        if (index >= numTotalOutputs) {
-          return stop();
-        }
-        c.output(String.valueOf(c.element() + index));
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+      for (long i = tracker.currentRestriction().getFrom();
+          tracker.tryClaim(i); ++i) {
+        c.output(String.valueOf(c.element() + i));
       }
-      return resume();
     }
 
     @GetInitialRestriction
-    public SomeCheckpoint getInitialRestriction(Integer elem) {
+    public OffsetRange getInitialRestriction(Integer elem) {
       throw new UnsupportedOperationException("Expected to be supplied explicitly in this test");
     }
   }
 
   @Test
-  public void testResumeCarriesOverState() throws Exception {
-    DoFn<Integer, String> fn = new CounterFn(3, 1);
-    Instant base = Instant.now();
-    ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester =
-        new ProcessFnTester<>(
-            base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class),
-            MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
-
-    tester.startElement(42, new SomeCheckpoint(0));
-    assertThat(tester.takeOutputElements(), contains("42"));
-    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
-    assertThat(tester.takeOutputElements(), contains("43"));
-    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
-    assertThat(tester.takeOutputElements(), contains("44"));
-    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
-    // After outputting all 3 items, should not output anything more.
-    assertEquals(0, tester.takeOutputElements().size());
-    // Should also not ask to resume.
-    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
-  }
-
-  @Test
   public void testCheckpointsAfterNumOutputs() throws Exception {
     int max = 100;
-    // Create an fn that attempts to 2x output more than checkpointing allows.
-    DoFn<Integer, String> fn = new CounterFn(2 * max + max / 2, 2 * max);
+    DoFn<Integer, String> fn = new CounterFn();
     Instant base = Instant.now();
     int baseIndex = 42;
 
-    ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester =
+    ProcessFnTester<Integer, String, OffsetRange, OffsetRangeTracker> tester =
         new ProcessFnTester<>(
-            base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class),
+            base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(OffsetRange.class),
             max, MAX_BUNDLE_DURATION);
 
     List<String> elements;
 
-    tester.startElement(baseIndex, new SomeCheckpoint(0));
+    // Create an fn that attempts to 2x output more than checkpointing allows.
+    tester.startElement(baseIndex, new OffsetRange(0, 2 * max + max / 2));
     elements = tester.takeOutputElements();
     assertEquals(max, elements.size());
     // Should output the range [0, max)
@@ -609,18 +488,18 @@ public class SplittableParDoTest {
     // But bound bundle duration - the bundle should terminate.
     Duration maxBundleDuration = Duration.standardSeconds(1);
     // Create an fn that attempts to 2x output more than checkpointing allows.
-    DoFn<Integer, String> fn = new CounterFn(max, max);
+    DoFn<Integer, String> fn = new CounterFn();
     Instant base = Instant.now();
     int baseIndex = 42;
 
-    ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester =
+    ProcessFnTester<Integer, String, OffsetRange, OffsetRangeTracker> tester =
         new ProcessFnTester<>(
-            base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class),
+            base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(OffsetRange.class),
             max, maxBundleDuration);
 
     List<String> elements;
 
-    tester.startElement(baseIndex, new SomeCheckpoint(0));
+    tester.startElement(baseIndex, new OffsetRange(0, Long.MAX_VALUE));
     // Bundle should terminate, and should do at least some processing.
     elements = tester.takeOutputElements();
     assertFalse(elements.isEmpty());

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 5139290..e35457c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
@@ -32,7 +31,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -293,6 +291,18 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
      * See {@link Window} for more information.
      */
     public abstract PaneInfo pane();
+
+    /**
+     * Gives the runner a (best-effort) lower bound about the timestamps of future output associated
+     * with the current element.
+     *
+     * <p>If the {@link DoFn} has multiple outputs, the watermark applies to all of them.
+     *
+     * <p>Only splittable {@link DoFn DoFns} are allowed to call this method. It is safe to call
+     * this method from a different thread than the one running {@link ProcessElement}, but
+     * all calls must finish before {@link ProcessElement} returns.
+     */
+    public abstract void updateWatermark(Instant watermark);
   }
 
   /**
@@ -556,15 +566,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    *     returned by {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
    * <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
    * <li>The type of restrictions used by all of these methods must be the same.
-   * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to
-   *     indicate whether there is more work to be done for the current element.
    * <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such as
    *     {@link BoundedWindow}.
    * <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or
    *     {@link UnboundedPerElement}, but not both at the same time. If it's not annotated with
-   *     either of these, it's assumed to be {@link BoundedPerElement} if its {@link
-   *     ProcessElement} method returns {@code void} and {@link UnboundedPerElement} if it
-   *     returns a {@link ProcessContinuation}.
+   *     either of these, it's assumed to be {@link BoundedPerElement}.
    * </ul>
    *
    * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods.
@@ -692,61 +698,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   @Experimental(Kind.SPLITTABLE_DO_FN)
   public @interface UnboundedPerElement {}
 
-  // This can't be put into ProcessContinuation itself due to the following problem:
-  // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
-  private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
-      new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO, null);
-
-  /**
-   * When used as a return value of {@link ProcessElement}, indicates whether there is more work to
-   * be done for the current element.
-   */
-  @Experimental(Kind.SPLITTABLE_DO_FN)
-  @AutoValue
-  public abstract static class ProcessContinuation {
-    /** Indicates that there is no more work to be done for the current element. */
-    public static ProcessContinuation stop() {
-      return PROCESS_CONTINUATION_STOP;
-    }
-
-    /** Indicates that there is more work to be done for the current element. */
-    public static ProcessContinuation resume() {
-      return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO, null);
-    }
-
-    /**
-     * If false, the {@link DoFn} promises that there is no more work remaining for the current
-     * element, so the runner should not resume the {@link ProcessElement} call.
-     */
-    public abstract boolean shouldResume();
-
-    /**
-     * A minimum duration that should elapse between the end of this {@link ProcessElement} call and
-     * the {@link ProcessElement} call continuing processing of the same element. By default, zero.
-     */
-    public abstract Duration resumeDelay();
-
-    /**
-     * A lower bound provided by the {@link DoFn} on timestamps of the output that will be emitted
-     * by future {@link ProcessElement} calls continuing processing of the current element.
-     *
-     * <p>A runner should treat an absent value as equivalent to the timestamp of the input element.
-     */
-    @Nullable
-    public abstract Instant getWatermark();
-
-    /** Builder method to set the value of {@link #resumeDelay()}. */
-    public ProcessContinuation withResumeDelay(Duration resumeDelay) {
-      return new AutoValue_DoFn_ProcessContinuation(
-          shouldResume(), resumeDelay, getWatermark());
-    }
-
-    /** Builder method to set the value of {@link #getWatermark()}. */
-    public ProcessContinuation withWatermark(Instant watermark) {
-      return new AutoValue_DoFn_ProcessContinuation(
-          shouldResume(), resumeDelay(), watermark);
-    }
-  }
+  /** Do not use. See https://issues.apache.org/jira/browse/BEAM-1904 */
+  @Deprecated
+  public class ProcessContinuation {}
 
   /**
    * Returns an {@link Aggregator} with aggregation logic specified by the {@link CombineFn}

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 01f0291..88f4035 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -672,6 +672,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
 
     @Override
+    public void updateWatermark(Instant watermark) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
     public PipelineOptions getPipelineOptions() {
       return context.getPipelineOptions();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 6746d3a..4b0cbf7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -50,6 +50,7 @@ import net.bytebuddy.implementation.bytecode.Throw;
 import net.bytebuddy.implementation.bytecode.assign.Assigner;
 import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
 import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.constant.NullConstant;
 import net.bytebuddy.implementation.bytecode.constant.TextConstant;
 import net.bytebuddy.implementation.bytecode.member.FieldAccess;
 import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
@@ -625,17 +626,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
    * {@link ProcessElement} method.
    */
   private static final class ProcessElementDelegation extends DoFnMethodDelegation {
-    private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD;
-
-    static {
-      try {
-        PROCESS_CONTINUATION_STOP_METHOD =
-            new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop"));
-      } catch (NoSuchMethodException e) {
-        throw new RuntimeException("Failed to locate ProcessContinuation.stop()");
-      }
-    }
-
     private final DoFnSignature.ProcessElementMethod signature;
 
     /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
@@ -672,12 +662,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
 
     @Override
     protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
-      if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) {
-        return new StackManipulation.Compound(
-            MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE);
-      } else {
-        return MethodReturn.of(targetMethod.getReturnType().asErasure());
-      }
+      return new StackManipulation.Compound(
+          NullConstant.INSTANCE, MethodReturn.REFERENCE);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 85831a7..cc06e70 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -53,8 +53,8 @@ public interface DoFnInvoker<InputT, OutputT> {
    * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
    *
    * @param extra Factory for producing extra parameter objects (such as window), if necessary.
-   * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link
-   *     DoFn.ProcessContinuation#stop()} if it returns {@code void}.
+   * @return {@code null} - see <a href="https://issues.apache.org/jira/browse/BEAM-1904">JIRA</a>
+   *     tracking the complete removal of {@link DoFn.ProcessContinuation}.
    */
   DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 007d8be..1be741f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
@@ -397,21 +396,16 @@ public abstract class DoFnSignature {
     @Nullable
     public abstract TypeDescriptor<? extends BoundedWindow> windowT();
 
-    /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */
-    public abstract boolean hasReturnValue();
-
     static ProcessElementMethod create(
         Method targetMethod,
         List<Parameter> extraParameters,
         TypeDescriptor<?> trackerT,
-        @Nullable TypeDescriptor<? extends BoundedWindow> windowT,
-        boolean hasReturnValue) {
+        @Nullable TypeDescriptor<? extends BoundedWindow> windowT) {
       return new AutoValue_DoFnSignature_ProcessElementMethod(
           targetMethod,
           Collections.unmodifiableList(extraParameters),
           trackerT,
-          windowT,
-          hasReturnValue);
+          windowT);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 006d012..80dbe10 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
@@ -428,8 +426,6 @@ public class DoFnSignatures {
    * <li>If the {@link DoFn} (or any of its supertypes) is annotated as {@link
    *     DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, use that. Only one of
    *     these must be specified.
-   * <li>If {@link DoFn.ProcessElement} returns {@link DoFn.ProcessContinuation}, assume it is
-   *     unbounded. Otherwise (if it returns {@code void}), assume it is bounded.
    * <li>If {@link DoFn.ProcessElement} returns {@code void}, but the {@link DoFn} is annotated
    *     {@link DoFn.UnboundedPerElement}, this is an error.
    * </ol>
@@ -455,10 +451,7 @@ public class DoFnSignatures {
     }
     if (processElement.isSplittable()) {
       if (isBounded == null) {
-        isBounded =
-            processElement.hasReturnValue()
-                ? PCollection.IsBounded.UNBOUNDED
-                : PCollection.IsBounded.BOUNDED;
+        isBounded = PCollection.IsBounded.BOUNDED;
       }
     } else {
       errors.checkArgument(
@@ -467,7 +460,6 @@ public class DoFnSignatures {
               + ((isBounded == PCollection.IsBounded.BOUNDED)
                   ? DoFn.BoundedPerElement.class.getSimpleName()
                   : DoFn.UnboundedPerElement.class.getSimpleName()));
-      checkState(!processElement.hasReturnValue(), "Should have been inferred splittable");
       isBounded = PCollection.IsBounded.BOUNDED;
     }
     return isBounded;
@@ -691,10 +683,8 @@ public class DoFnSignatures {
       TypeDescriptor<?> outputT,
       FnAnalysisContext fnContext) {
     errors.checkArgument(
-        void.class.equals(m.getReturnType())
-            || DoFn.ProcessContinuation.class.equals(m.getReturnType()),
-        "Must return void or %s",
-        DoFn.ProcessContinuation.class.getSimpleName());
+        void.class.equals(m.getReturnType()),
+        "Must return void");
 
 
     MethodAnalysisContext methodContext = MethodAnalysisContext.create();
@@ -734,11 +724,7 @@ public class DoFnSignatures {
     }
 
     return DoFnSignature.ProcessElementMethod.create(
-        m,
-        methodContext.getExtraParameters(),
-        trackerT,
-        windowT,
-        DoFn.ProcessContinuation.class.equals(m.getReturnType()));
+        m, methodContext.getExtraParameters(), trackerT, windowT);
   }
 
   private static void checkParameterOneOf(

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 87c7bfd..0271a0d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -19,6 +19,9 @@ package org.apache.beam.sdk.transforms.splittabledofn;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.transforms.DoFn;
 
 /**
  * A {@link RestrictionTracker} for claiming offsets in an {@link OffsetRange} in a monotonically
@@ -27,6 +30,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
   private OffsetRange range;
   private Long lastClaimedOffset = null;
+  private Long lastAttemptedOffset = null;
 
   public OffsetRangeTracker(OffsetRange range) {
     this.range = checkNotNull(range);
@@ -59,12 +63,13 @@ public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
    */
   public synchronized boolean tryClaim(long i) {
     checkArgument(
-        lastClaimedOffset == null || i > lastClaimedOffset,
-        "Trying to claim offset %s while last claimed was %s",
+        lastAttemptedOffset == null || i > lastAttemptedOffset,
+        "Trying to claim offset %s while last attempted was %s",
         i,
-        lastClaimedOffset);
+        lastAttemptedOffset);
     checkArgument(
         i >= range.getFrom(), "Trying to claim offset %s before start of the range %s", i, range);
+    lastAttemptedOffset = i;
     // No respective checkArgument for i < range.to() - it's ok to try claiming offsets beyond it.
     if (i >= range.getTo()) {
       return false;
@@ -72,4 +77,26 @@ public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
     lastClaimedOffset = i;
     return true;
   }
+
+  /**
+   * Marks that there are no more offsets to be claimed in the range.
+   *
+   * <p>E.g., a {@link DoFn} reading a file and claiming the offset of each record in the file might
+   * call this if it hits EOF - even though the last attempted claim was before the end of the
+   * range, there are no more offsets to claim.
+   */
+  public synchronized void markDone() {
+    lastAttemptedOffset = Long.MAX_VALUE;
+  }
+
+  @Override
+  public synchronized void checkDone() throws IllegalStateException {
+    checkState(
+        lastAttemptedOffset >= range.getTo() - 1,
+        "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",
+        lastAttemptedOffset,
+        range,
+        lastAttemptedOffset + 1,
+        range.getTo());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index e9b718e..27ef68f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -38,5 +38,13 @@ public interface RestrictionTracker<RestrictionT> {
    */
   RestrictionT checkpoint();
 
+  /**
+   * Called by the runner after {@link DoFn.ProcessElement} returns.
+   *
+   * <p>Must throw an exception with an informative error message, if there is still any unclaimed
+   * work remaining in the restriction.
+   */
+  void checkDone() throws IllegalStateException;
+
   // TODO: Add the more general splitRemainderAfterFraction() and other methods.
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 154a088..a122f67 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -67,14 +65,10 @@ public class SplittableDoFnTest {
 
   static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
     @ProcessElement
-    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
       for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
         c.output(KV.of(c.element(), (int) i));
-        if (i % 3 == 0) {
-          return resume();
-        }
       }
-      return stop();
     }
 
     @GetInitialRestriction
@@ -196,19 +190,14 @@ public class SplittableDoFnTest {
     }
 
     @ProcessElement
-    public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
+    public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
       int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
       int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
-      int trueEnd = snapToNextBlock((int) tracker.currentRestriction().getTo(), blockStarts);
-      for (int i = trueStart; i < trueEnd; ++i) {
-        if (!tracker.tryClaim(blockStarts[i])) {
-          return resume();
-        }
+      for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
         for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
           c.output(index);
         }
       }
-      return stop();
     }
 
     @GetInitialRestriction

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 425f453..8b4df4c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
@@ -83,8 +82,8 @@ public class DoFnInvokersTest {
     when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
   }
 
-  private ProcessContinuation invokeProcessElement(DoFn<String, String> fn) {
-    return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
+  private void invokeProcessElement(DoFn<String, String> fn) {
+    DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
   }
 
   private void invokeOnTimer(String timerId, DoFn<String, String> fn) {
@@ -113,7 +112,7 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c) throws Exception {}
     }
     MockFn mockFn = mock(MockFn.class);
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(mockFn));
+    invokeProcessElement(mockFn);
     verify(mockFn).processElement(mockProcessContext);
   }
 
@@ -134,7 +133,7 @@ public class DoFnInvokersTest {
   public void testDoFnWithProcessElementInterface() throws Exception {
     IdentityUsingInterfaceWithProcessElement fn =
         mock(IdentityUsingInterfaceWithProcessElement.class);
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).processElement(mockProcessContext);
   }
 
@@ -155,14 +154,14 @@ public class DoFnInvokersTest {
   @Test
   public void testDoFnWithMethodInSuperclass() throws Exception {
     IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class);
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).process(mockProcessContext);
   }
 
   @Test
   public void testDoFnWithMethodInSubclass() throws Exception {
     IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class);
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).process(mockProcessContext);
   }
 
@@ -173,7 +172,7 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c, IntervalWindow w) throws Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).processElement(mockProcessContext, mockWindow);
   }
 
@@ -197,7 +196,7 @@ public class DoFnInvokersTest {
           throws Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).processElement(mockProcessContext, mockState);
   }
 
@@ -223,35 +222,11 @@ public class DoFnInvokersTest {
       public void onTimer() {}
     }
     MockFn fn = mock(MockFn.class);
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).processElement(mockProcessContext, mockTimer);
   }
 
   @Test
-  public void testDoFnWithReturn() throws Exception {
-    class MockFn extends DoFn<String, String> {
-      @DoFn.ProcessElement
-      public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker)
-          throws Exception {
-        return null;
-      }
-
-      @GetInitialRestriction
-      public SomeRestriction getInitialRestriction(String element) {
-        return null;
-      }
-
-      @NewTracker
-      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
-        return null;
-      }
-    }
-    MockFn fn = mock(MockFn.class);
-    when(fn.processElement(mockProcessContext, null)).thenReturn(ProcessContinuation.resume());
-    assertEquals(ProcessContinuation.resume(), invokeProcessElement(fn));
-  }
-
-  @Test
   public void testDoFnWithStartBundleSetupTeardown() throws Exception {
     class MockFn extends DoFn<String, String> {
       @ProcessElement
@@ -306,9 +281,7 @@ public class DoFnInvokersTest {
   /** Public so Mockito can do "delegatesTo()" in the test below. */
   public static class MockFn extends DoFn<String, String> {
     @ProcessElement
-    public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) {
-      return null;
-    }
+    public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {}
 
     @GetInitialRestriction
     public SomeRestriction getInitialRestriction(String element) {
@@ -360,7 +333,7 @@ public class DoFnInvokersTest {
         .splitRestriction(
             eq("blah"), same(restriction), Mockito.<DoFn.OutputReceiver<SomeRestriction>>any());
     when(fn.newTracker(restriction)).thenReturn(tracker);
-    when(fn.processElement(mockProcessContext, tracker)).thenReturn(ProcessContinuation.resume());
+    fn.processElement(mockProcessContext, tracker);
 
     assertEquals(coder, invoker.invokeGetRestrictionCoder(new CoderRegistry()));
     assertEquals(restriction, invoker.invokeGetInitialRestriction("blah"));
@@ -376,8 +349,6 @@ public class DoFnInvokersTest {
         });
     assertEquals(Arrays.asList(part1, part2, part3), outputs);
     assertEquals(tracker, invoker.invokeNewTracker(restriction));
-    assertEquals(
-        ProcessContinuation.resume(),
         invoker.invokeProcessElement(
             new FakeArgumentProvider<String, String>() {
               @Override
@@ -389,7 +360,7 @@ public class DoFnInvokersTest {
               public RestrictionTracker<?> restrictionTracker() {
                 return tracker;
               }
-            }));
+            });
   }
 
   private static class RestrictionWithDefaultTracker
@@ -410,6 +381,9 @@ public class DoFnInvokersTest {
     public RestrictionWithDefaultTracker checkpoint() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void checkDone() throws IllegalStateException {}
   }
 
   private static class CoderForDefaultTracker extends CustomCoder<RestrictionWithDefaultTracker> {
@@ -459,8 +433,7 @@ public class DoFnInvokersTest {
             assertEquals("foo", output);
           }
         });
-    assertEquals(
-        ProcessContinuation.stop(), invoker.invokeProcessElement(mockArgumentProvider));
+    invoker.invokeProcessElement(mockArgumentProvider);
     assertThat(
         invoker.invokeNewTracker(new RestrictionWithDefaultTracker()),
         instanceOf(DefaultTracker.class));
@@ -550,14 +523,14 @@ public class DoFnInvokersTest {
   @Test
   public void testLocalPrivateDoFnClass() throws Exception {
     PrivateDoFnClass fn = mock(PrivateDoFnClass.class);
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     verify(fn).processThis(mockProcessContext);
   }
 
   @Test
   public void testStaticPackagePrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass());
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockProcessContext);
   }
 
@@ -565,28 +538,28 @@ public class DoFnInvokersTest {
   public void testInnerPackagePrivateDoFnClass() throws Exception {
     DoFn<String, String> fn =
         mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass());
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockProcessContext);
   }
 
   @Test
   public void testStaticPrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass());
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockProcessContext);
   }
 
   @Test
   public void testInnerPrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass());
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockProcessContext);
   }
 
   @Test
   public void testAnonymousInnerDoFn() throws Exception {
     DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass());
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockProcessContext);
   }
 
@@ -594,7 +567,7 @@ public class DoFnInvokersTest {
   public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
     // Can't use mockito for this one - the anonymous class is final and can't be mocked.
     DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFn();
-    assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+    invokeProcessElement(fn);
     DoFnInvokersTestHelper.verifyStaticAnonymousDoFnInvoked(fn, mockProcessContext);
   }
 
@@ -623,32 +596,6 @@ public class DoFnInvokersTest {
   }
 
   @Test
-  public void testProcessElementExceptionWithReturn() throws Exception {
-    thrown.expect(UserCodeException.class);
-    thrown.expectMessage("bogus");
-    DoFnInvokers
-        .invokerFor(
-            new DoFn<Integer, Integer>() {
-              @ProcessElement
-              public ProcessContinuation processElement(
-                  @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) {
-                throw new IllegalArgumentException("bogus");
-              }
-
-              @GetInitialRestriction
-              public SomeRestriction getInitialRestriction(Integer element) {
-                return null;
-              }
-
-              @NewTracker
-              public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
-                return null;
-              }
-            })
-        .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>());
-  }
-
-  @Test
   public void testStartBundleException() throws Exception {
     DoFnInvoker<Integer, Integer> invoker =
         DoFnInvokers.invokerFor(

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
index 44ae5c4..d321f54 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -50,7 +50,7 @@ public class DoFnSignaturesProcessElementTest {
   @Test
   public void testBadReturnType() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Must return void or ProcessContinuation");
+    thrown.expectMessage("Must return void");
 
     analyzeProcessElementMethod(
         new AnonymousMethod() {

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 052feb8..b937e84 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -60,20 +60,6 @@ public class DoFnSignaturesSplittableDoFnTest {
   private abstract static class SomeRestrictionCoder implements Coder<SomeRestriction> {}
 
   @Test
-  public void testReturnsProcessContinuation() throws Exception {
-    DoFnSignature.ProcessElementMethod signature =
-        analyzeProcessElementMethod(
-            new AnonymousMethod() {
-              private DoFn.ProcessContinuation method(
-                  DoFn<Integer, String>.ProcessContext context) {
-                return null;
-              }
-            });
-
-    assertTrue(signature.hasReturnValue());
-  }
-
-  @Test
   public void testHasRestrictionTracker() throws Exception {
     DoFnSignature.ProcessElementMethod signature =
         analyzeProcessElementMethod(
@@ -157,54 +143,6 @@ public class DoFnSignaturesSplittableDoFnTest {
             .isBoundedPerElement());
   }
 
-  private static class BaseFnWithContinuation extends DoFn<Integer, String> {
-    @ProcessElement
-    public ProcessContinuation processElement(
-        ProcessContext context, SomeRestrictionTracker tracker) {
-      return null;
-    }
-
-    @GetInitialRestriction
-    public SomeRestriction getInitialRestriction(Integer element) {
-      return null;
-    }
-
-    @NewTracker
-    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
-      return null;
-    }
-  }
-
-  @Test
-  public void testSplittableIsBoundedByDefault() throws Exception {
-    assertEquals(
-        PCollection.IsBounded.UNBOUNDED,
-        DoFnSignatures
-            .getSignature(BaseFnWithContinuation.class)
-            .isBoundedPerElement());
-  }
-
-  @Test
-  public void testSplittableRespectsBoundednessAnnotation() throws Exception {
-    @BoundedPerElement
-    class BoundedFnWithContinuation extends BaseFnWithContinuation {}
-
-    assertEquals(
-        PCollection.IsBounded.BOUNDED,
-        DoFnSignatures
-            .getSignature(BoundedFnWithContinuation.class)
-            .isBoundedPerElement());
-
-    @UnboundedPerElement
-    class UnboundedFnWithContinuation extends BaseFnWithContinuation {}
-
-    assertEquals(
-        PCollection.IsBounded.UNBOUNDED,
-        DoFnSignatures
-            .getSignature(UnboundedFnWithContinuation.class)
-            .isBoundedPerElement());
-  }
-
   @Test
   public void testUnsplittableButDeclaresBounded() throws Exception {
     @BoundedPerElement
@@ -234,10 +172,8 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testSplittableWithAllFunctions() throws Exception {
     class GoodSplittableDoFn extends DoFn<Integer, String> {
       @ProcessElement
-      public ProcessContinuation processElement(
-          ProcessContext context, SomeRestrictionTracker tracker) {
-        return null;
-      }
+      public void processElement(
+          ProcessContext context, SomeRestrictionTracker tracker) {}
 
       @GetInitialRestriction
       public SomeRestriction getInitialRestriction(Integer element) {
@@ -262,7 +198,6 @@ public class DoFnSignaturesSplittableDoFnTest {
     DoFnSignature signature = DoFnSignatures.getSignature(GoodSplittableDoFn.class);
     assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
     assertTrue(signature.processElement().isSplittable());
-    assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
         SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
     assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
@@ -279,9 +214,7 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testSplittableWithAllFunctionsGeneric() throws Exception {
     class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> {
       @ProcessElement
-      public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) {
-        return null;
-      }
+      public void processElement(ProcessContext context, TrackerT tracker) {}
 
       @GetInitialRestriction
       public RestrictionT getInitialRestriction(Integer element) {
@@ -309,7 +242,6 @@ public class DoFnSignaturesSplittableDoFnTest {
                 SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass());
     assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
     assertTrue(signature.processElement().isSplittable());
-    assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
         SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
     assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());

http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
index c8a530c..831894c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
@@ -95,7 +95,7 @@ public class OffsetRangeTrackerTest {
 
   @Test
   public void testNonMonotonicClaim() throws Exception {
-    expected.expectMessage("Trying to claim offset 103 while last claimed was 110");
+    expected.expectMessage("Trying to claim offset 103 while last attempted was 110");
     OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
     assertTrue(tracker.tryClaim(105));
     assertTrue(tracker.tryClaim(110));
@@ -108,4 +108,51 @@ public class OffsetRangeTrackerTest {
     OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
     tracker.tryClaim(90);
   }
+
+  @Test
+  public void testCheckDoneAfterTryClaimPastEndOfRange() {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+    assertTrue(tracker.tryClaim(150));
+    assertTrue(tracker.tryClaim(175));
+    assertFalse(tracker.tryClaim(220));
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testCheckDoneAfterTryClaimAtEndOfRange() {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+    assertTrue(tracker.tryClaim(150));
+    assertTrue(tracker.tryClaim(175));
+    assertFalse(tracker.tryClaim(200));
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+    assertTrue(tracker.tryClaim(150));
+    assertTrue(tracker.tryClaim(175));
+    assertTrue(tracker.tryClaim(199));
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testCheckDoneWhenNotDone() {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+    assertTrue(tracker.tryClaim(150));
+    assertTrue(tracker.tryClaim(175));
+    expected.expectMessage(
+        "Last attempted offset was 175 in range [100, 200), "
+            + "claiming work in [176, 200) was not attempted");
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testCheckDoneWhenExplicitlyMarkedDone() {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+    assertTrue(tracker.tryClaim(150));
+    assertTrue(tracker.tryClaim(175));
+    tracker.markDone();
+    tracker.checkDone();
+  }
 }