You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/12 23:29:29 UTC

[1/2] beam git commit: [BEAM-2447] Reintroduces DoFn.ProcessContinuation

Repository: beam
Updated Branches:
  refs/heads/master 91c7d3d1f -> 66b4a1be0


[BEAM-2447] Reintroduces DoFn.ProcessContinuation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bff4a78
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bff4a78
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bff4a78

Branch: refs/heads/master
Commit: 1bff4a786536ff1a4ffe9904079c7a89058e6b4e
Parents: 91c7d3d
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jun 13 16:50:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Jul 12 16:10:07 2017 -0700

----------------------------------------------------------------------
 .../core/construction/SplittableParDoTest.java  |  10 +-
 ...eBoundedSplittableProcessElementInvoker.java |  35 ++++++-
 .../core/SplittableParDoViaKeyedWorkItems.java  |   9 +-
 .../core/SplittableProcessElementInvoker.java   |  25 ++++-
 ...ndedSplittableProcessElementInvokerTest.java |  45 +++++++--
 .../core/SplittableParDoProcessFnTest.java      |  99 ++++++++++++++++--
 .../org/apache/beam/sdk/transforms/DoFn.java    |  51 +++++++++-
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  19 +++-
 .../sdk/transforms/reflect/DoFnInvoker.java     |   4 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  10 +-
 .../sdk/transforms/reflect/DoFnSignatures.java  |  22 +++-
 .../splittabledofn/OffsetRangeTracker.java      |  10 ++
 .../splittabledofn/RestrictionTracker.java      |  11 +-
 .../beam/sdk/transforms/SplittableDoFnTest.java | 100 ++++++++-----------
 .../transforms/reflect/DoFnInvokersTest.java    |  93 +++++++++++++----
 .../DoFnSignaturesProcessElementTest.java       |   2 +-
 .../DoFnSignaturesSplittableDoFnTest.java       |  83 +++++++++++++--
 17 files changed, 487 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index f4c596e..267232c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core.construction;
 
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
@@ -24,8 +25,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
-import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -70,7 +69,6 @@ public class SplittableParDoTest {
     public void checkDone() {}
   }
 
-  @BoundedPerElement
   private static class BoundedFakeFn extends DoFn<Integer, String> {
     @ProcessElement
     public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
@@ -81,10 +79,12 @@ public class SplittableParDoTest {
     }
   }
 
-  @UnboundedPerElement
   private static class UnboundedFakeFn extends DoFn<Integer, String> {
     @ProcessElement
-    public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
+    public ProcessContinuation processElement(
+        ProcessContext context, SomeRestrictionTracker tracker) {
+      return stop();
+    }
 
     @GetInitialRestriction
     public SomeRestriction getInitialRestriction(Integer element) {

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 475abf2..0c956d5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -96,7 +96,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
       final WindowedValue<InputT> element,
       final TrackerT tracker) {
     final ProcessContext processContext = new ProcessContext(element, tracker);
-    invoker.invokeProcessElement(
+    DoFn.ProcessContinuation cont = invoker.invokeProcessElement(
         new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
           @Override
           public DoFn<InputT, OutputT>.ProcessContext processContext(
@@ -155,10 +155,37 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
                 "Access to timers not supported in Splittable DoFn");
           }
         });
-
+    // TODO: verify that if there was a failed tryClaim() call, then cont.shouldResume() is false.
+    // Currently we can't verify this because there are no hooks into tryClaim().
+    // See https://issues.apache.org/jira/browse/BEAM-2607
+    RestrictionT residual = processContext.extractCheckpoint();
+    if (cont.shouldResume()) {
+      if (residual == null) {
+        // No checkpoint had been taken by the runner while the ProcessElement call ran, however
+        // the call says that not the whole restriction has been processed. So we need to take
+        // a checkpoint now: checkpoint() guarantees that the primary restriction describes exactly
+        // the work that was done in the current ProcessElement call, and returns a residual
+        // restriction that describes exactly the work that wasn't done in the current call.
+        residual = tracker.checkpoint();
+      } else {
+        // A checkpoint was taken by the runner, and then the ProcessElement call returned resume()
+        // without making more tryClaim() calls (since no tryClaim() calls can succeed after
+        // checkpoint(), and since if it had made a failed tryClaim() call, it should have returned
+        // stop()).
+        // This means that the resulting primary restriction and the taken checkpoint already
+        // accurately describe respectively the work that was and wasn't done in the current
+        // ProcessElement call.
+        // In other words, if we took a checkpoint *after* ProcessElement completed (like in the
+        // branch above), it would have been equivalent to this one.
+      }
+    } else {
+      // The ProcessElement call returned stop() - that means the tracker's current restriction
+      // has been fully processed by the call. A checkpoint may or may not have been taken in
+      // "residual"; if it was, then we'll need to process it; if no, then we don't - nothing
+      // special needs to be done.
+    }
     tracker.checkDone();
-    return new Result(
-        processContext.extractCheckpoint(), processContext.getLastReportedWatermark());
+    return new Result(residual, cont, processContext.getLastReportedWatermark());
   }
 
   private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext {

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 09f3b15..6e97645 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -200,8 +200,8 @@ public class SplittableParDoViaKeyedWorkItems {
     /**
      * The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is
      * acquired during the first {@link DoFn.ProcessElement} call for each element and restriction,
-     * and is released when the {@link DoFn.ProcessElement} call returns and there is no residual
-     * restriction captured by the {@link SplittableProcessElementInvoker}.
+     * and is released when the {@link DoFn.ProcessElement} call returns {@link
+     * ProcessContinuation#stop()}.
      *
      * <p>A hold is needed to avoid letting the output watermark immediately progress together with
      * the input watermark when the first {@link DoFn.ProcessElement} call for this element
@@ -365,11 +365,12 @@ public class SplittableParDoViaKeyedWorkItems {
       if (futureOutputWatermark == null) {
         futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
       }
+      Instant wakeupTime =
+          timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
       holdState.add(futureOutputWatermark);
       // Set a timer to continue processing this element.
       timerInternals.setTimer(
-          TimerInternals.TimerData.of(
-              stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME));
+          TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME));
     }
 
     private DoFn<InputT, OutputT>.StartBundleContext wrapContextAsStartBundle(

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
index ced6c01..7732df3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.core;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -34,20 +36,35 @@ public abstract class SplittableProcessElementInvoker<
   public class Result {
     @Nullable
     private final RestrictionT residualRestriction;
+    private final DoFn.ProcessContinuation continuation;
     private final Instant futureOutputWatermark;
 
     public Result(
-        @Nullable RestrictionT residualRestriction, Instant futureOutputWatermark) {
+        @Nullable RestrictionT residualRestriction,
+        DoFn.ProcessContinuation continuation,
+        Instant futureOutputWatermark) {
+      this.continuation = checkNotNull(continuation);
+      if (continuation.shouldResume()) {
+        checkNotNull(residualRestriction);
+      }
       this.residualRestriction = residualRestriction;
       this.futureOutputWatermark = futureOutputWatermark;
     }
 
-    /** If {@code null}, means the call should not resume. */
+    /**
+     * Can be {@code null} only if {@link #getContinuation} specifies the call should not resume.
+     * However, the converse is not true: this can be non-null even if {@link #getContinuation}
+     * is {@link DoFn.ProcessContinuation#stop()}.
+     */
     @Nullable
     public RestrictionT getResidualRestriction() {
       return residualRestriction;
     }
 
+    public DoFn.ProcessContinuation getContinuation() {
+      return continuation;
+    }
+
     public Instant getFutureOutputWatermark() {
       return futureOutputWatermark;
     }
@@ -57,8 +74,8 @@ public abstract class SplittableProcessElementInvoker<
    * Invokes the {@link DoFn.ProcessElement} method using the given {@link DoFnInvoker} for the
    * original {@link DoFn}, on the given element and with the given {@link RestrictionTracker}.
    *
-   * @return Information on how to resume the call: residual restriction and a
-   * future output watermark.
+   * @return Information on how to resume the call: residual restriction, a {@link
+   *     DoFn.ProcessContinuation}, and a future output watermark.
    */
   public abstract Result invokeProcessElement(
       DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, TrackerT tracker);

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index b80a632..959909e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -17,11 +17,15 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Collection;
 import java.util.concurrent.Executors;
@@ -42,19 +46,27 @@ import org.junit.Test;
 /** Tests for {@link OutputAndTimeBoundedSplittableProcessElementInvoker}. */
 public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
   private static class SomeFn extends DoFn<Integer, String> {
+    private final int numOutputsPerProcessCall;
     private final Duration sleepBeforeEachOutput;
 
-    private SomeFn(Duration sleepBeforeEachOutput) {
+    private SomeFn(int numOutputsPerProcessCall, Duration sleepBeforeEachOutput) {
+      this.numOutputsPerProcessCall = numOutputsPerProcessCall;
       this.sleepBeforeEachOutput = sleepBeforeEachOutput;
     }
 
     @ProcessElement
-    public void process(ProcessContext context, OffsetRangeTracker tracker)
+    public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker)
         throws Exception {
-      for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
+      for (long i = tracker.currentRestriction().getFrom(), numIterations = 1;
+          tracker.tryClaim(i);
+          ++i, ++numIterations) {
         Thread.sleep(sleepBeforeEachOutput.getMillis());
         context.output("" + i);
+        if (numIterations == numOutputsPerProcessCall) {
+          return resume();
+        }
       }
+      return stop();
     }
 
     @GetInitialRestriction
@@ -64,8 +76,8 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
   }
 
   private SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result
-      runTest(int count, Duration sleepPerElement) {
-    SomeFn fn = new SomeFn(sleepPerElement);
+      runTest(int totalNumOutputs, int numOutputsPerProcessCall, Duration sleepPerElement) {
+    SomeFn fn = new SomeFn(numOutputsPerProcessCall, sleepPerElement);
     SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker> invoker =
         new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
             fn,
@@ -93,14 +105,15 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
 
     return invoker.invokeProcessElement(
         DoFnInvokers.invokerFor(fn),
-        WindowedValue.of(count, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
-        new OffsetRangeTracker(new OffsetRange(0, count)));
+        WindowedValue.of(totalNumOutputs, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+        new OffsetRangeTracker(new OffsetRange(0, totalNumOutputs)));
   }
 
   @Test
   public void testInvokeProcessElementOutputBounded() throws Exception {
     SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
-        runTest(10000, Duration.ZERO);
+        runTest(10000, Integer.MAX_VALUE, Duration.ZERO);
+    assertFalse(res.getContinuation().shouldResume());
     OffsetRange residualRange = res.getResidualRestriction();
     // Should process the first 100 elements.
     assertEquals(1000, residualRange.getFrom());
@@ -110,7 +123,8 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
   @Test
   public void testInvokeProcessElementTimeBounded() throws Exception {
     SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
-        runTest(10000, Duration.millis(100));
+        runTest(10000, Integer.MAX_VALUE, Duration.millis(100));
+    assertFalse(res.getContinuation().shouldResume());
     OffsetRange residualRange = res.getResidualRestriction();
     // Should process ideally around 30 elements - but due to timing flakiness, we can't enforce
     // that precisely. Just test that it's not egregiously off.
@@ -120,9 +134,18 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
   }
 
   @Test
-  public void testInvokeProcessElementVoluntaryReturn() throws Exception {
+  public void testInvokeProcessElementVoluntaryReturnStop() throws Exception {
     SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
-        runTest(5, Duration.millis(100));
+        runTest(5, Integer.MAX_VALUE, Duration.millis(100));
+    assertFalse(res.getContinuation().shouldResume());
     assertNull(res.getResidualRestriction());
   }
+
+  @Test
+  public void testInvokeProcessElementVoluntaryReturnResume() throws Exception {
+    SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
+        runTest(10, 5, Duration.millis(100));
+    assertTrue(res.getContinuation().shouldResume());
+    assertEquals(new OffsetRange(5, 10), res.getResidualRestriction());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 1cd1275..7449af3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasItems;
@@ -365,16 +368,71 @@ public class SplittableParDoProcessFnTest {
     assertEquals(null, tester.getWatermarkHold());
   }
 
-  /**
-   * A splittable {@link DoFn} that generates the sequence [init, init + total).
-   */
+  /** A simple splittable {@link DoFn} that outputs the given element every 5 seconds forever. */
+  private static class SelfInitiatedResumeFn extends DoFn<Integer, String> {
+    @ProcessElement
+    public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) {
+      c.output(c.element().toString());
+      return resume().withResumeDelay(Duration.standardSeconds(5));
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer elem) {
+      return new SomeRestriction();
+    }
+  }
+
+  @Test
+  public void testResumeSetsTimer() throws Exception {
+    DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
+    Instant base = Instant.now();
+    ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+        new ProcessFnTester<>(
+            base,
+            fn,
+            BigEndianIntegerCoder.of(),
+            SerializableCoder.of(SomeRestriction.class),
+            MAX_OUTPUTS_PER_BUNDLE,
+            MAX_BUNDLE_DURATION);
+
+    tester.startElement(42, new SomeRestriction());
+    assertThat(tester.takeOutputElements(), contains("42"));
+
+    // Should resume after 5 seconds: advancing by 3 seconds should have no effect.
+    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertTrue(tester.takeOutputElements().isEmpty());
+
+    // 6 seconds should be enough  should invoke the fn again.
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertThat(tester.takeOutputElements(), contains("42"));
+
+    // Should again resume after 5 seconds: advancing by 3 seconds should again have no effect.
+    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertTrue(tester.takeOutputElements().isEmpty());
+
+    // 6 seconds should again be enough.
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+    assertThat(tester.takeOutputElements(), contains("42"));
+  }
+
+  /** A splittable {@link DoFn} that generates the sequence [init, init + total). */
   private static class CounterFn extends DoFn<Integer, String> {
+    private final int numOutputsPerCall;
+
+    public CounterFn(int numOutputsPerCall) {
+      this.numOutputsPerCall = numOutputsPerCall;
+    }
+
     @ProcessElement
-    public void process(ProcessContext c, OffsetRangeTracker tracker) {
-      for (long i = tracker.currentRestriction().getFrom();
-          tracker.tryClaim(i); ++i) {
+    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
+      for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
+          tracker.tryClaim(i); ++i, ++numIterations) {
         c.output(String.valueOf(c.element() + i));
+        if (numIterations == numOutputsPerCall) {
+          return resume();
+        }
       }
+      return stop();
     }
 
     @GetInitialRestriction
@@ -383,10 +441,35 @@ public class SplittableParDoProcessFnTest {
     }
   }
 
+  public void testResumeCarriesOverState() throws Exception {
+    DoFn<Integer, String> fn = new CounterFn(1);
+    Instant base = Instant.now();
+    ProcessFnTester<Integer, String, OffsetRange, OffsetRangeTracker> tester =
+        new ProcessFnTester<>(
+            base,
+            fn,
+            BigEndianIntegerCoder.of(),
+            SerializableCoder.of(OffsetRange.class),
+            MAX_OUTPUTS_PER_BUNDLE,
+            MAX_BUNDLE_DURATION);
+
+    tester.startElement(42, new OffsetRange(0, 3));
+    assertThat(tester.takeOutputElements(), contains("42"));
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    assertThat(tester.takeOutputElements(), contains("43"));
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    assertThat(tester.takeOutputElements(), contains("44"));
+    assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+    // After outputting all 3 items, should not output anything more.
+    assertEquals(0, tester.takeOutputElements().size());
+    // Should also not ask to resume.
+    assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+  }
+
   @Test
   public void testCheckpointsAfterNumOutputs() throws Exception {
     int max = 100;
-    DoFn<Integer, String> fn = new CounterFn();
+    DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE);
     Instant base = Instant.now();
     int baseIndex = 42;
 
@@ -428,7 +511,7 @@ public class SplittableParDoProcessFnTest {
     // But bound bundle duration - the bundle should terminate.
     Duration maxBundleDuration = Duration.standardSeconds(1);
     // Create an fn that attempts to 2x output more than checkpointing allows.
-    DoFn<Integer, String> fn = new CounterFn();
+    DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE);
     Instant base = Instant.now();
     int baseIndex = 42;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index a2e5c16..1b809c2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
@@ -545,11 +546,15 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    *     returned by {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
    * <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
    * <li>The type of restrictions used by all of these methods must be the same.
+   * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to
+   *     indicate whether there is more work to be done for the current element.
    * <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such as
    *     {@link BoundedWindow}.
    * <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or
    *     {@link UnboundedPerElement}, but not both at the same time. If it's not annotated with
-   *     either of these, it's assumed to be {@link BoundedPerElement}.
+   *     either of these, it's assumed to be {@link BoundedPerElement} if its {@link
+   *     ProcessElement} method returns {@code void} and {@link UnboundedPerElement} if it
+   *     returns a {@link ProcessContinuation}.
    * </ul>
    *
    * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods.
@@ -677,8 +682,48 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   @Experimental(Kind.SPLITTABLE_DO_FN)
   public @interface UnboundedPerElement {}
 
-  /** Temporary, do not use. See https://issues.apache.org/jira/browse/BEAM-1904 */
-  public class ProcessContinuation {}
+  // This can't be put into ProcessContinuation itself due to the following problem:
+  // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
+  private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
+      new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO);
+
+  /**
+   * When used as a return value of {@link ProcessElement}, indicates whether there is more work to
+   * be done for the current element.
+   *
+   * <p>If the {@link ProcessElement} call completes because of a failed {@code tryClaim()} call
+   * on the {@link RestrictionTracker}, then the call MUST return {@link #stop()}.
+   */
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  @AutoValue
+  public abstract static class ProcessContinuation {
+    /** Indicates that there is no more work to be done for the current element. */
+    public static ProcessContinuation stop() {
+      return PROCESS_CONTINUATION_STOP;
+    }
+
+    /** Indicates that there is more work to be done for the current element. */
+    public static ProcessContinuation resume() {
+      return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO);
+    }
+
+    /**
+     * If false, the {@link DoFn} promises that there is no more work remaining for the current
+     * element, so the runner should not resume the {@link ProcessElement} call.
+     */
+    public abstract boolean shouldResume();
+
+    /**
+     * A minimum duration that should elapse between the end of this {@link ProcessElement} call and
+     * the {@link ProcessElement} call continuing processing of the same element. By default, zero.
+     */
+    public abstract Duration resumeDelay();
+
+    /** Builder method to set the value of {@link #resumeDelay()}. */
+    public ProcessContinuation withResumeDelay(Duration resumeDelay) {
+      return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeDelay);
+    }
+  }
 
   /**
    * Finalize the {@link DoFn} construction to prepare for processing.

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 8378204..cf96c9b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -49,7 +49,6 @@ import net.bytebuddy.implementation.bytecode.Throw;
 import net.bytebuddy.implementation.bytecode.assign.Assigner;
 import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
 import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
-import net.bytebuddy.implementation.bytecode.constant.NullConstant;
 import net.bytebuddy.implementation.bytecode.constant.TextConstant;
 import net.bytebuddy.implementation.bytecode.member.FieldAccess;
 import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
@@ -641,6 +640,17 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
    * {@link ProcessElement} method.
    */
   private static final class ProcessElementDelegation extends DoFnMethodDelegation {
+    private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD;
+
+    static {
+      try {
+        PROCESS_CONTINUATION_STOP_METHOD =
+            new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop"));
+      } catch (NoSuchMethodException e) {
+        throw new RuntimeException("Failed to locate ProcessContinuation.stop()");
+      }
+    }
+
     private final DoFnSignature.ProcessElementMethod signature;
 
     /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
@@ -677,7 +687,12 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
 
     @Override
     protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
-      return new StackManipulation.Compound(NullConstant.INSTANCE, MethodReturn.REFERENCE);
+      if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) {
+        return new StackManipulation.Compound(
+            MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE);
+      } else {
+        return MethodReturn.of(targetMethod.getReturnType().asErasure());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 3b22fda..8b41fee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -54,8 +54,8 @@ public interface DoFnInvoker<InputT, OutputT> {
    * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
    *
    * @param extra Factory for producing extra parameter objects (such as window), if necessary.
-   * @return {@code null} - see <a href="https://issues.apache.org/jira/browse/BEAM-1904">JIRA</a>
-   *     tracking the complete removal of {@link DoFn.ProcessContinuation}.
+   * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link
+   *     DoFn.ProcessContinuation#stop()} if it returns {@code void}.
    */
   DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 6eeed8e..bfad69e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
@@ -433,16 +434,21 @@ public abstract class DoFnSignature {
     @Nullable
     public abstract TypeDescriptor<? extends BoundedWindow> windowT();
 
+    /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */
+    public abstract boolean hasReturnValue();
+
     static ProcessElementMethod create(
         Method targetMethod,
         List<Parameter> extraParameters,
         TypeDescriptor<?> trackerT,
-        @Nullable TypeDescriptor<? extends BoundedWindow> windowT) {
+        @Nullable TypeDescriptor<? extends BoundedWindow> windowT,
+        boolean hasReturnValue) {
       return new AutoValue_DoFnSignature_ProcessElementMethod(
           targetMethod,
           Collections.unmodifiableList(extraParameters),
           trackerT,
-          windowT);
+          windowT,
+          hasReturnValue);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 1b27e66..de57c3b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
@@ -440,6 +442,8 @@ public class DoFnSignatures {
    * <li>If the {@link DoFn} (or any of its supertypes) is annotated as {@link
    *     DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, use that. Only one of
    *     these must be specified.
+   * <li>If {@link DoFn.ProcessElement} returns {@link DoFn.ProcessContinuation}, assume it is
+   *     unbounded. Otherwise (if it returns {@code void}), assume it is bounded.
    * <li>If {@link DoFn.ProcessElement} returns {@code void}, but the {@link DoFn} is annotated
    *     {@link DoFn.UnboundedPerElement}, this is an error.
    * </ol>
@@ -465,7 +469,10 @@ public class DoFnSignatures {
     }
     if (processElement.isSplittable()) {
       if (isBounded == null) {
-        isBounded = PCollection.IsBounded.BOUNDED;
+        isBounded =
+            processElement.hasReturnValue()
+                ? PCollection.IsBounded.UNBOUNDED
+                : PCollection.IsBounded.BOUNDED;
       }
     } else {
       errors.checkArgument(
@@ -474,6 +481,7 @@ public class DoFnSignatures {
               + ((isBounded == PCollection.IsBounded.BOUNDED)
                   ? DoFn.BoundedPerElement.class.getSimpleName()
                   : DoFn.UnboundedPerElement.class.getSimpleName()));
+      checkState(!processElement.hasReturnValue(), "Should have been inferred splittable");
       isBounded = PCollection.IsBounded.BOUNDED;
     }
     return isBounded;
@@ -710,8 +718,10 @@ public class DoFnSignatures {
       TypeDescriptor<?> outputT,
       FnAnalysisContext fnContext) {
     errors.checkArgument(
-        void.class.equals(m.getReturnType()),
-        "Must return void");
+        void.class.equals(m.getReturnType())
+            || DoFn.ProcessContinuation.class.equals(m.getReturnType()),
+        "Must return void or %s",
+        DoFn.ProcessContinuation.class.getSimpleName());
 
 
     MethodAnalysisContext methodContext = MethodAnalysisContext.create();
@@ -751,7 +761,11 @@ public class DoFnSignatures {
     }
 
     return DoFnSignature.ProcessElementMethod.create(
-        m, methodContext.getExtraParameters(), trackerT, windowT);
+        m,
+        methodContext.getExtraParameters(),
+        trackerT,
+        windowT,
+        DoFn.ProcessContinuation.class.equals(m.getReturnType()));
   }
 
   private static void checkParameterOneOf(

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 62c10a7..4987409 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.base.MoreObjects;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 
@@ -100,4 +101,13 @@ public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
         lastAttemptedOffset + 1,
         range.getTo());
   }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("range", range)
+        .add("lastClaimedOffset", lastClaimedOffset)
+        .add("lastAttemptedOffset", lastAttemptedOffset)
+        .toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 27ef68f..8cb0a6b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -31,10 +31,13 @@ public interface RestrictionTracker<RestrictionT> {
   RestrictionT currentRestriction();
 
   /**
-   * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible.
-   * Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the work:
-   * the old value of {@link #currentRestriction} is equivalent to the new value and the return
-   * value of this method combined. Must be called at most once on a given object.
+   * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible:
+   * after this method returns, the tracker MUST refuse all future claim calls, and {@link
+   * #checkDone} MUST succeed.
+   *
+   * <p>Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the
+   * work: the old value of {@link #currentRestriction} is equivalent to the new value and the
+   * return value of this method combined. Must be called at most once on a given object.
    */
   RestrictionT checkpoint();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index cb60f9a..d2d2529 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -19,10 +19,10 @@ package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.testing.TestPipeline.testingPipelineOptions;
-import static org.hamcrest.Matchers.greaterThan;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Ordering;
@@ -33,7 +33,6 @@ import java.util.List;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
@@ -74,10 +73,16 @@ public class SplittableDoFnTest implements Serializable {
 
   static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
     @ProcessElement
-    public void process(ProcessContext c, OffsetRangeTracker tracker) {
-      for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
+    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
+      for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
+          tracker.tryClaim(i);
+          ++i, ++numIterations) {
         c.output(KV.of(c.element(), (int) i));
+        if (numIterations % 3 == 0) {
+          return resume();
+        }
       }
+      return stop();
     }
 
     @GetInitialRestriction
@@ -206,10 +211,10 @@ public class SplittableDoFnTest implements Serializable {
   private static class SDFWithMultipleOutputsPerBlock extends DoFn<String, Integer> {
     private static final int MAX_INDEX = 98765;
 
-    private final TupleTag<Integer> numProcessCalls;
+    private final int numClaimsPerCall;
 
-    private SDFWithMultipleOutputsPerBlock(TupleTag<Integer> numProcessCalls) {
-      this.numProcessCalls = numProcessCalls;
+    private SDFWithMultipleOutputsPerBlock(int numClaimsPerCall) {
+      this.numClaimsPerCall = numClaimsPerCall;
     }
 
     private static int snapToNextBlock(int index, int[] blockStarts) {
@@ -222,15 +227,20 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
+    public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
       int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
       int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
-      c.output(numProcessCalls, 1);
-      for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
+      for (int i = trueStart, numIterations = 1;
+          tracker.tryClaim(blockStarts[i]);
+          ++i, ++numIterations) {
         for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
           c.output(index);
         }
+        if (numIterations == numClaimsPerCall) {
+          return resume();
+        }
       }
+      return stop();
     }
 
     @GetInitialRestriction
@@ -242,26 +252,10 @@ public class SplittableDoFnTest implements Serializable {
   @Test
   @Category({ValidatesRunner.class, UsesSplittableParDo.class})
   public void testOutputAfterCheckpoint() throws Exception {
-    TupleTag<Integer> main = new TupleTag<>();
-    TupleTag<Integer> numProcessCalls = new TupleTag<>();
-    PCollectionTuple outputs =
-        p.apply(Create.of("foo"))
-            .apply(
-                ParDo.of(new SDFWithMultipleOutputsPerBlock(numProcessCalls))
-                    .withOutputTags(main, TupleTagList.of(numProcessCalls)));
-    PAssert.thatSingleton(outputs.get(main).apply(Count.<Integer>globally()))
+    PCollection<Integer> outputs = p.apply(Create.of("foo"))
+        .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock(3)));
+    PAssert.thatSingleton(outputs.apply(Count.<Integer>globally()))
         .isEqualTo((long) SDFWithMultipleOutputsPerBlock.MAX_INDEX);
-    // Verify that more than 1 process() call was involved, i.e. that there was checkpointing.
-    PAssert.thatSingleton(
-            outputs.get(numProcessCalls).setCoder(VarIntCoder.of()).apply(Sum.integersGlobally()))
-        .satisfies(
-            new SerializableFunction<Integer, Void>() {
-              @Override
-              public Void apply(Integer input) {
-                assertThat(input, greaterThan(1));
-                return null;
-              }
-            });
     p.run();
   }
 
@@ -341,12 +335,12 @@ public class SplittableDoFnTest implements Serializable {
       extends DoFn<Integer, KV<String, Integer>> {
     private static final int MAX_INDEX = 98765;
     private final PCollectionView<String> sideInput;
-    private final TupleTag<Integer> numProcessCalls;
+    private final int numClaimsPerCall;
 
     public SDFWithMultipleOutputsPerBlockAndSideInput(
-        PCollectionView<String> sideInput, TupleTag<Integer> numProcessCalls) {
+        PCollectionView<String> sideInput, int numClaimsPerCall) {
       this.sideInput = sideInput;
-      this.numProcessCalls = numProcessCalls;
+      this.numClaimsPerCall = numClaimsPerCall;
     }
 
     private static int snapToNextBlock(int index, int[] blockStarts) {
@@ -359,15 +353,20 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
+    public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
       int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
       int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
-      c.output(numProcessCalls, 1);
-      for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
+      for (int i = trueStart, numIterations = 1;
+          tracker.tryClaim(blockStarts[i]);
+          ++i, ++numIterations) {
         for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
           c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index));
         }
+        if (numIterations == numClaimsPerCall) {
+          return resume();
+        }
       }
+      return stop();
     }
 
     @GetInitialRestriction
@@ -400,15 +399,14 @@ public class SplittableDoFnTest implements Serializable {
             .apply("window 2", Window.<String>into(FixedWindows.of(Duration.millis(2))))
             .apply("singleton", View.<String>asSingleton());
 
-    TupleTag<KV<String, Integer>> main = new TupleTag<>();
-    TupleTag<Integer> numProcessCalls = new TupleTag<>();
-    PCollectionTuple res =
+    PCollection<KV<String, Integer>> res =
         mainInput.apply(
-            ParDo.of(new SDFWithMultipleOutputsPerBlockAndSideInput(sideInput, numProcessCalls))
-                .withSideInputs(sideInput)
-                .withOutputTags(main, TupleTagList.of(numProcessCalls)));
+            ParDo.of(
+                    new SDFWithMultipleOutputsPerBlockAndSideInput(
+                        sideInput, 3 /* numClaimsPerCall */))
+                .withSideInputs(sideInput));
     PCollection<KV<String, Iterable<Integer>>> grouped =
-        res.get(main).apply(GroupByKey.<String, Integer>create());
+        res.apply(GroupByKey.<String, Integer>create());
 
     PAssert.that(grouped.apply(Keys.<String>create()))
         .containsInAnyOrder("a:0", "a:1", "b:2", "b:3");
@@ -427,22 +425,6 @@ public class SplittableDoFnTest implements Serializable {
                 return null;
               }
             });
-
-    // Verify that more than 1 process() call was involved, i.e. that there was checkpointing.
-    PAssert.thatSingleton(
-            res.get(numProcessCalls)
-                .setCoder(VarIntCoder.of())
-                .apply(Sum.integersGlobally().withoutDefaults()))
-        // This should hold in all windows, but verifying a particular window is sufficient.
-        .inOnlyPane(new IntervalWindow(new Instant(0), new Instant(1)))
-        .satisfies(
-            new SerializableFunction<Integer, Void>() {
-              @Override
-              public Void apply(Integer input) {
-                assertThat(input, greaterThan(1));
-                return null;
-              }
-            });
     p.run();
 
     // TODO: also test coverage when some of the windows of the side input are not ready.

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 3edb194..2098c66 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
@@ -89,8 +91,8 @@ public class DoFnInvokersTest {
     when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
   }
 
-  private void invokeProcessElement(DoFn<String, String> fn) {
-    DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
+  private DoFn.ProcessContinuation invokeProcessElement(DoFn<String, String> fn) {
+    return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
   }
 
   private void invokeOnTimer(String timerId, DoFn<String, String> fn) {
@@ -119,7 +121,7 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c) throws Exception {}
     }
     MockFn mockFn = mock(MockFn.class);
-    invokeProcessElement(mockFn);
+    assertEquals(stop(), invokeProcessElement(mockFn));
     verify(mockFn).processElement(mockProcessContext);
   }
 
@@ -140,7 +142,7 @@ public class DoFnInvokersTest {
   public void testDoFnWithProcessElementInterface() throws Exception {
     IdentityUsingInterfaceWithProcessElement fn =
         mock(IdentityUsingInterfaceWithProcessElement.class);
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockProcessContext);
   }
 
@@ -161,14 +163,14 @@ public class DoFnInvokersTest {
   @Test
   public void testDoFnWithMethodInSuperclass() throws Exception {
     IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class);
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     verify(fn).process(mockProcessContext);
   }
 
   @Test
   public void testDoFnWithMethodInSubclass() throws Exception {
     IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class);
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     verify(fn).process(mockProcessContext);
   }
 
@@ -179,7 +181,7 @@ public class DoFnInvokersTest {
       public void processElement(ProcessContext c, IntervalWindow w) throws Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockProcessContext, mockWindow);
   }
 
@@ -203,7 +205,7 @@ public class DoFnInvokersTest {
           throws Exception {}
     }
     MockFn fn = mock(MockFn.class);
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockProcessContext, mockState);
   }
 
@@ -229,11 +231,35 @@ public class DoFnInvokersTest {
       public void onTimer() {}
     }
     MockFn fn = mock(MockFn.class);
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     verify(fn).processElement(mockProcessContext, mockTimer);
   }
 
   @Test
+  public void testDoFnWithReturn() throws Exception {
+    class MockFn extends DoFn<String, String> {
+      @DoFn.ProcessElement
+      public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker)
+          throws Exception {
+        return null;
+      }
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(String element) {
+        return null;
+      }
+
+      @NewTracker
+      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+        return null;
+      }
+    }
+    MockFn fn = mock(MockFn.class);
+    when(fn.processElement(mockProcessContext, null)).thenReturn(resume());
+    assertEquals(resume(), invokeProcessElement(fn));
+  }
+
+  @Test
   public void testDoFnWithStartBundleSetupTeardown() throws Exception {
     class MockFn extends DoFn<String, String> {
       @ProcessElement
@@ -288,7 +314,9 @@ public class DoFnInvokersTest {
   /** Public so Mockito can do "delegatesTo()" in the test below. */
   public static class MockFn extends DoFn<String, String> {
     @ProcessElement
-    public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {}
+    public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) {
+      return null;
+    }
 
     @GetInitialRestriction
     public SomeRestriction getInitialRestriction(String element) {
@@ -340,7 +368,7 @@ public class DoFnInvokersTest {
         .splitRestriction(
             eq("blah"), same(restriction), Mockito.<DoFn.OutputReceiver<SomeRestriction>>any());
     when(fn.newTracker(restriction)).thenReturn(tracker);
-    fn.processElement(mockProcessContext, tracker);
+    when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume());
 
     assertEquals(coder, invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault()));
     assertEquals(restriction, invoker.invokeGetInitialRestriction("blah"));
@@ -356,6 +384,8 @@ public class DoFnInvokersTest {
         });
     assertEquals(Arrays.asList(part1, part2, part3), outputs);
     assertEquals(tracker, invoker.invokeNewTracker(restriction));
+    assertEquals(
+        resume(),
         invoker.invokeProcessElement(
             new FakeArgumentProvider<String, String>() {
               @Override
@@ -367,7 +397,7 @@ public class DoFnInvokersTest {
               public RestrictionTracker<?> restrictionTracker() {
                 return tracker;
               }
-            });
+            }));
   }
 
   private static class RestrictionWithDefaultTracker
@@ -441,7 +471,7 @@ public class DoFnInvokersTest {
             assertEquals("foo", output);
           }
         });
-    invoker.invokeProcessElement(mockArgumentProvider);
+    assertEquals(stop(), invoker.invokeProcessElement(mockArgumentProvider));
     assertThat(
         invoker.invokeNewTracker(new RestrictionWithDefaultTracker()),
         instanceOf(DefaultTracker.class));
@@ -531,14 +561,14 @@ public class DoFnInvokersTest {
   @Test
   public void testLocalPrivateDoFnClass() throws Exception {
     PrivateDoFnClass fn = mock(PrivateDoFnClass.class);
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     verify(fn).processThis(mockProcessContext);
   }
 
   @Test
   public void testStaticPackagePrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockProcessContext);
   }
 
@@ -546,28 +576,28 @@ public class DoFnInvokersTest {
   public void testInnerPackagePrivateDoFnClass() throws Exception {
     DoFn<String, String> fn =
         mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockProcessContext);
   }
 
   @Test
   public void testStaticPrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockProcessContext);
   }
 
   @Test
   public void testInnerPrivateDoFnClass() throws Exception {
     DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockProcessContext);
   }
 
   @Test
   public void testAnonymousInnerDoFn() throws Exception {
     DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass());
-    invokeProcessElement(fn);
+    assertEquals(stop(), invokeProcessElement(fn));
     DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockProcessContext);
   }
 
@@ -604,6 +634,31 @@ public class DoFnInvokersTest {
   }
 
   @Test
+  public void testProcessElementExceptionWithReturn() throws Exception {
+    thrown.expect(UserCodeException.class);
+    thrown.expectMessage("bogus");
+    DoFnInvokers.invokerFor(
+            new DoFn<Integer, Integer>() {
+              @ProcessElement
+              public ProcessContinuation processElement(
+                  @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) {
+                throw new IllegalArgumentException("bogus");
+              }
+
+              @GetInitialRestriction
+              public SomeRestriction getInitialRestriction(Integer element) {
+                return null;
+              }
+
+              @NewTracker
+              public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+                return null;
+              }
+            })
+        .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>());
+  }
+
+  @Test
   public void testStartBundleException() throws Exception {
     DoFnInvoker<Integer, Integer> invoker =
         DoFnInvokers.invokerFor(

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
index d321f54..44ae5c4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -50,7 +50,7 @@ public class DoFnSignaturesProcessElementTest {
   @Test
   public void testBadReturnType() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Must return void");
+    thrown.expectMessage("Must return void or ProcessContinuation");
 
     analyzeProcessElementMethod(
         new AnonymousMethod() {

http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 07b3348..08af65e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -52,7 +52,8 @@ import org.junit.runners.JUnit4;
 public class DoFnSignaturesSplittableDoFnTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
-  private static class SomeRestriction {}
+  private abstract static class SomeRestriction
+      implements HasDefaultTracker<SomeRestriction, SomeRestrictionTracker> {}
 
   private abstract static class SomeRestrictionTracker
       implements RestrictionTracker<SomeRestriction> {}
@@ -60,6 +61,20 @@ public class DoFnSignaturesSplittableDoFnTest {
   private abstract static class SomeRestrictionCoder extends StructuredCoder<SomeRestriction> {}
 
   @Test
+  public void testReturnsProcessContinuation() throws Exception {
+    DoFnSignature.ProcessElementMethod signature =
+        analyzeProcessElementMethod(
+            new AnonymousMethod() {
+              private DoFn.ProcessContinuation method(
+                  DoFn<Integer, String>.ProcessContext context) {
+                return null;
+              }
+            });
+
+    assertTrue(signature.hasReturnValue());
+  }
+
+  @Test
   public void testHasRestrictionTracker() throws Exception {
     DoFnSignature.ProcessElementMethod signature =
         analyzeProcessElementMethod(
@@ -100,11 +115,6 @@ public class DoFnSignaturesSplittableDoFnTest {
       public SomeRestriction getInitialRestriction(Integer element) {
         return null;
       }
-
-      @NewTracker
-      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
-        return null;
-      }
     }
 
     @BoundedPerElement
@@ -130,6 +140,55 @@ public class DoFnSignaturesSplittableDoFnTest {
             .isBoundedPerElement());
   }
 
+  private static class BaseFnWithoutContinuation extends DoFn<Integer, String> {
+    @ProcessElement
+    public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer element) {
+      return null;
+    }
+  }
+
+  private static class BaseFnWithContinuation extends DoFn<Integer, String> {
+    @ProcessElement
+    public ProcessContinuation processElement(
+        ProcessContext context, SomeRestrictionTracker tracker) {
+      return null;
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer element) {
+      return null;
+    }
+  }
+
+  @Test
+  public void testSplittableBoundednessInferredFromReturnValue() throws Exception {
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        DoFnSignatures.getSignature(BaseFnWithoutContinuation.class).isBoundedPerElement());
+    assertEquals(
+        PCollection.IsBounded.UNBOUNDED,
+        DoFnSignatures.getSignature(BaseFnWithContinuation.class).isBoundedPerElement());
+  }
+
+  @Test
+  public void testSplittableRespectsBoundednessAnnotation() throws Exception {
+    @BoundedPerElement
+    class BoundedFnWithContinuation extends BaseFnWithContinuation {}
+
+    assertEquals(
+        PCollection.IsBounded.BOUNDED,
+        DoFnSignatures.getSignature(BoundedFnWithContinuation.class).isBoundedPerElement());
+
+    @UnboundedPerElement
+    class UnboundedFnWithContinuation extends BaseFnWithContinuation {}
+
+    assertEquals(
+        PCollection.IsBounded.UNBOUNDED,
+        DoFnSignatures.getSignature(UnboundedFnWithContinuation.class).isBoundedPerElement());
+  }
   @Test
   public void testUnsplittableIsBounded() throws Exception {
     class UnsplittableFn extends DoFn<Integer, String> {
@@ -172,8 +231,10 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testSplittableWithAllFunctions() throws Exception {
     class GoodSplittableDoFn extends DoFn<Integer, String> {
       @ProcessElement
-      public void processElement(
-          ProcessContext context, SomeRestrictionTracker tracker) {}
+      public ProcessContinuation processElement(
+          ProcessContext context, SomeRestrictionTracker tracker) {
+        return null;
+      }
 
       @GetInitialRestriction
       public SomeRestriction getInitialRestriction(Integer element) {
@@ -198,6 +259,7 @@ public class DoFnSignaturesSplittableDoFnTest {
     DoFnSignature signature = DoFnSignatures.getSignature(GoodSplittableDoFn.class);
     assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
     assertTrue(signature.processElement().isSplittable());
+    assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
         SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
     assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
@@ -214,7 +276,9 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testSplittableWithAllFunctionsGeneric() throws Exception {
     class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> {
       @ProcessElement
-      public void processElement(ProcessContext context, TrackerT tracker) {}
+      public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) {
+        return null;
+      }
 
       @GetInitialRestriction
       public RestrictionT getInitialRestriction(Integer element) {
@@ -242,6 +306,7 @@ public class DoFnSignaturesSplittableDoFnTest {
                 SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass());
     assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
     assertTrue(signature.processElement().isSplittable());
+    assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
         SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
     assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());


[2/2] beam git commit: This closes #3360: [BEAM-2447] Reintroduces DoFn.ProcessContinuation

Posted by jk...@apache.org.
This closes #3360: [BEAM-2447] Reintroduces DoFn.ProcessContinuation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/66b4a1be
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/66b4a1be
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/66b4a1be

Branch: refs/heads/master
Commit: 66b4a1be09b58a761cf49cc18a04eaaff555e376
Parents: 91c7d3d 1bff4a7
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 12 16:16:05 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Jul 12 16:16:05 2017 -0700

----------------------------------------------------------------------
 .../core/construction/SplittableParDoTest.java  |  10 +-
 ...eBoundedSplittableProcessElementInvoker.java |  35 ++++++-
 .../core/SplittableParDoViaKeyedWorkItems.java  |   9 +-
 .../core/SplittableProcessElementInvoker.java   |  25 ++++-
 ...ndedSplittableProcessElementInvokerTest.java |  45 +++++++--
 .../core/SplittableParDoProcessFnTest.java      |  99 ++++++++++++++++--
 .../org/apache/beam/sdk/transforms/DoFn.java    |  51 +++++++++-
 .../reflect/ByteBuddyDoFnInvokerFactory.java    |  19 +++-
 .../sdk/transforms/reflect/DoFnInvoker.java     |   4 +-
 .../sdk/transforms/reflect/DoFnSignature.java   |  10 +-
 .../sdk/transforms/reflect/DoFnSignatures.java  |  22 +++-
 .../splittabledofn/OffsetRangeTracker.java      |  10 ++
 .../splittabledofn/RestrictionTracker.java      |  11 +-
 .../beam/sdk/transforms/SplittableDoFnTest.java | 100 ++++++++-----------
 .../transforms/reflect/DoFnInvokersTest.java    |  93 +++++++++++++----
 .../DoFnSignaturesProcessElementTest.java       |   2 +-
 .../DoFnSignaturesSplittableDoFnTest.java       |  83 +++++++++++++--
 17 files changed, 487 insertions(+), 141 deletions(-)
----------------------------------------------------------------------