You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/08 00:34:49 UTC
[1/4] beam git commit: Adds tests for the watermark hold (previously
untested)
Repository: beam
Updated Branches:
refs/heads/master 4a694cebb -> 1594849da
Adds tests for the watermark hold (previously untested)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/29c28021
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/29c28021
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/29c28021
Branch: refs/heads/master
Commit: 29c280211c2431f29c5552c35bd3435c65e4975b
Parents: dad7ace
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 14:00:05 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 17:06:13 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/SplittableParDoTest.java | 56 +++++++++++++++++++-
1 file changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/29c28021/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index f8d6095..d301113 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -36,6 +37,7 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -208,7 +210,7 @@ public class SplittableParDoTest {
private Instant currentProcessingTime;
private InMemoryTimerInternals timerInternals;
- private InMemoryStateInternals<String> stateInternals;
+ private TestInMemoryStateInternals<String> stateInternals;
ProcessFnTester(
Instant currentProcessingTime,
@@ -223,7 +225,7 @@ public class SplittableParDoTest {
fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
this.tester = DoFnTester.of(processFn);
this.timerInternals = new InMemoryTimerInternals();
- this.stateInternals = InMemoryStateInternals.forKey("dummy");
+ this.stateInternals = new TestInMemoryStateInternals<>("dummy");
processFn.setStateInternalsFactory(
new StateInternalsFactory<String>() {
@Override
@@ -335,6 +337,9 @@ public class SplittableParDoTest {
return tester.takeOutputElements();
}
+ public Instant getWatermarkHold() {
+ return stateInternals.earliestWatermarkHold();
+ }
}
private static class OutputWindowedValueToDoFnTester<OutputT>
@@ -425,6 +430,53 @@ public class SplittableParDoTest {
}
}
+ private static class WatermarkUpdateFn extends DoFn<Instant, String> {
+ @ProcessElement
+ public void process(ProcessContext c, OffsetRangeTracker tracker) {
+ for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
+ c.updateWatermark(c.element().plus(Duration.standardSeconds(i)));
+ c.output(String.valueOf(i));
+ }
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(Instant elem) {
+ throw new IllegalStateException("Expected to be supplied explicitly in this test");
+ }
+
+ @NewTracker
+ public OffsetRangeTracker newTracker(OffsetRange range) {
+ return new OffsetRangeTracker(range);
+ }
+ }
+
+ @Test
+ public void testUpdatesWatermark() throws Exception {
+ DoFn<Instant, String> fn = new WatermarkUpdateFn();
+ Instant base = Instant.now();
+
+ ProcessFnTester<Instant, String, OffsetRange, OffsetRangeTracker> tester =
+ new ProcessFnTester<>(
+ base,
+ fn,
+ InstantCoder.of(),
+ SerializableCoder.of(OffsetRange.class),
+ 3,
+ MAX_BUNDLE_DURATION);
+
+ tester.startElement(base, new OffsetRange(0, 8));
+ assertThat(tester.takeOutputElements(), hasItems("0", "1", "2"));
+ assertEquals(base.plus(Duration.standardSeconds(2)), tester.getWatermarkHold());
+
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ assertThat(tester.takeOutputElements(), hasItems("3", "4", "5"));
+ assertEquals(base.plus(Duration.standardSeconds(5)), tester.getWatermarkHold());
+
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ assertThat(tester.takeOutputElements(), hasItems("6", "7"));
+ assertEquals(null, tester.getWatermarkHold());
+ }
+
/**
* A splittable {@link DoFn} that generates the sequence [init, init + total).
*/
[3/4] beam git commit: Clarifies doc of ProcessElement re:
HasDefaultTracker
Posted by jk...@apache.org.
Clarifies doc of ProcessElement re: HasDefaultTracker
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/19f407c9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/19f407c9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/19f407c9
Branch: refs/heads/master
Commit: 19f407c9497c911ca3cb61d989aa5a78c84896cf
Parents: 4a694ce
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 17:00:39 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 17:06:13 2017 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/beam/sdk/transforms/DoFn.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/19f407c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index de33612..5139290 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -548,10 +549,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* <ul>
* <li>It <i>must</i> define a {@link GetInitialRestriction} method.
* <li>It <i>may</i> define a {@link SplitRestriction} method.
- * <li>It <i>must</i> define a {@link NewTracker} method returning the same type as the type of
+ * <li>It <i>may</i> define a {@link NewTracker} method returning the same type as the type of
* the {@link RestrictionTracker} argument of {@link ProcessElement}, which in turn must be a
* subtype of {@code RestrictionTracker<R>} where {@code R} is the restriction type returned
- * by {@link GetInitialRestriction}.
+ * by {@link GetInitialRestriction}. This method is optional in case the restriction type
+ * returned by {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
* <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
* <li>The type of restrictions used by all of these methods must be the same.
* <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to
[4/4] beam git commit: This closes #2455
Posted by jk...@apache.org.
This closes #2455
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1594849d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1594849d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1594849d
Branch: refs/heads/master
Commit: 1594849da972f5b4dcfc52eb3991d1df083d6719
Parents: 4a694ce 29c2802
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 17:08:54 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 17:08:54 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/core/DoFnAdapters.java | 5 +
...eBoundedSplittableProcessElementInvoker.java | 125 ++++++------
.../beam/runners/core/SimpleDoFnRunner.java | 5 +
.../beam/runners/core/SplittableParDo.java | 15 +-
.../core/SplittableProcessElementInvoker.java | 22 +--
...ndedSplittableProcessElementInvokerTest.java | 16 +-
.../beam/runners/core/SplittableParDoTest.java | 197 ++++++-------------
.../org/apache/beam/sdk/transforms/DoFn.java | 84 ++------
.../apache/beam/sdk/transforms/DoFnTester.java | 5 +
.../reflect/ByteBuddyDoFnInvokerFactory.java | 20 +-
.../sdk/transforms/reflect/DoFnInvoker.java | 4 +-
.../sdk/transforms/reflect/DoFnSignature.java | 10 +-
.../sdk/transforms/reflect/DoFnSignatures.java | 22 +--
.../splittabledofn/OffsetRangeTracker.java | 33 +++-
.../splittabledofn/RestrictionTracker.java | 8 +
.../beam/sdk/transforms/SplittableDoFnTest.java | 17 +-
.../transforms/reflect/DoFnInvokersTest.java | 99 +++-------
.../DoFnSignaturesProcessElementTest.java | 2 +-
.../DoFnSignaturesSplittableDoFnTest.java | 74 +------
.../splittabledofn/OffsetRangeTrackerTest.java | 49 ++++-
20 files changed, 306 insertions(+), 506 deletions(-)
----------------------------------------------------------------------
[2/4] beam git commit: Fixes SDF issues re: watermarks and stop/resume
Posted by jk...@apache.org.
Fixes SDF issues re: watermarks and stop/resume
See detailed discussion in document:
https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dad7ace2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dad7ace2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dad7ace2
Branch: refs/heads/master
Commit: dad7ace2b6143850fac0ca5359d9f56f5f0df2c1
Parents: 19f407c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Apr 6 15:07:05 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 17:06:13 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/core/DoFnAdapters.java | 5 +
...eBoundedSplittableProcessElementInvoker.java | 125 +++++++-------
.../beam/runners/core/SimpleDoFnRunner.java | 5 +
.../beam/runners/core/SplittableParDo.java | 15 +-
.../core/SplittableProcessElementInvoker.java | 22 +--
...ndedSplittableProcessElementInvokerTest.java | 16 +-
.../beam/runners/core/SplittableParDoTest.java | 171 +++----------------
.../org/apache/beam/sdk/transforms/DoFn.java | 78 ++-------
.../apache/beam/sdk/transforms/DoFnTester.java | 5 +
.../reflect/ByteBuddyDoFnInvokerFactory.java | 20 +--
.../sdk/transforms/reflect/DoFnInvoker.java | 4 +-
.../sdk/transforms/reflect/DoFnSignature.java | 10 +-
.../sdk/transforms/reflect/DoFnSignatures.java | 22 +--
.../splittabledofn/OffsetRangeTracker.java | 33 +++-
.../splittabledofn/RestrictionTracker.java | 8 +
.../beam/sdk/transforms/SplittableDoFnTest.java | 17 +-
.../transforms/reflect/DoFnInvokersTest.java | 99 +++--------
.../DoFnSignaturesProcessElementTest.java | 2 +-
.../DoFnSignaturesSplittableDoFnTest.java | 74 +-------
.../splittabledofn/OffsetRangeTrackerTest.java | 49 +++++-
20 files changed, 263 insertions(+), 517 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
index 693cb2f..deb3b7e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -286,6 +286,11 @@ public class DoFnAdapters {
}
@Override
+ public void updateWatermark(Instant watermark) {
+ throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()");
+ }
+
+ @Override
public BoundedWindow window() {
return context.window();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 357094c..27fd0a3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -97,70 +97,57 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
final WindowedValue<InputT> element,
final TrackerT tracker) {
final ProcessContext processContext = new ProcessContext(element, tracker);
- DoFn.ProcessContinuation cont =
- invoker.invokeProcessElement(
- new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
- @Override
- public DoFn<InputT, OutputT>.ProcessContext processContext(
- DoFn<InputT, OutputT> doFn) {
- return processContext;
- }
-
- @Override
- public RestrictionTracker<?> restrictionTracker() {
- return tracker;
- }
-
- // Unsupported methods below.
-
- @Override
- public BoundedWindow window() {
- throw new UnsupportedOperationException(
- "Access to window of the element not supported in Splittable DoFn");
- }
-
- @Override
- public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) {
- throw new IllegalStateException(
- "Should not access context() from @"
- + DoFn.ProcessElement.class.getSimpleName());
- }
-
- @Override
- public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(
- DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Access to timers not supported in Splittable DoFn");
- }
-
- @Override
- public State state(String stateId) {
- throw new UnsupportedOperationException(
- "Access to state not supported in Splittable DoFn");
- }
-
- @Override
- public Timer timer(String timerId) {
- throw new UnsupportedOperationException(
- "Access to timers not supported in Splittable DoFn");
- }
- });
- RestrictionT residual;
- RestrictionT forcedCheckpoint = processContext.extractCheckpoint();
- if (cont.shouldResume()) {
- if (forcedCheckpoint == null) {
- // If no checkpoint was forced, the call returned voluntarily (i.e. all tryClaim() calls
- // succeeded) - but we still need to have a checkpoint to resume from.
- residual = tracker.checkpoint();
- } else {
- // A checkpoint was forced - i.e. the call probably (but not guaranteed) returned because of
- // a failed tryClaim() call.
- residual = forcedCheckpoint;
- }
- } else {
- residual = null;
- }
- return new Result(residual, cont);
+ invoker.invokeProcessElement(
+ new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
+ @Override
+ public DoFn<InputT, OutputT>.ProcessContext processContext(
+ DoFn<InputT, OutputT> doFn) {
+ return processContext;
+ }
+
+ @Override
+ public RestrictionTracker<?> restrictionTracker() {
+ return tracker;
+ }
+
+ // Unsupported methods below.
+
+ @Override
+ public BoundedWindow window() {
+ throw new UnsupportedOperationException(
+ "Access to window of the element not supported in Splittable DoFn");
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) {
+ throw new IllegalStateException(
+ "Should not access context() from @"
+ + DoFn.ProcessElement.class.getSimpleName());
+ }
+
+ @Override
+ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(
+ DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Access to timers not supported in Splittable DoFn");
+ }
+
+ @Override
+ public State state(String stateId) {
+ throw new UnsupportedOperationException(
+ "Access to state not supported in Splittable DoFn");
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ throw new UnsupportedOperationException(
+ "Access to timers not supported in Splittable DoFn");
+ }
+ });
+
+ tracker.checkDone();
+ return new Result(
+ processContext.extractCheckpoint(), processContext.getLastReportedWatermark());
}
private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
@@ -176,6 +163,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
private RestrictionT checkpoint;
// A handle on the scheduled action to take a checkpoint.
private Future<?> scheduledCheckpoint;
+ private Instant lastReportedWatermark;
public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
fn.super();
@@ -241,6 +229,15 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
}
@Override
+ public synchronized void updateWatermark(Instant watermark) {
+ lastReportedWatermark = watermark;
+ }
+
+ public synchronized Instant getLastReportedWatermark() {
+ return lastReportedWatermark;
+ }
+
+ @Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 77286b2..98d88b6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -542,6 +542,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
@Override
+ public void updateWatermark(Instant watermark) {
+ throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()");
+ }
+
+ @Override
public void output(OutputT output) {
context.outputWindowedValue(windowedValue.withValue(output));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 0b311c7..c16bf44 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -324,14 +324,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
/**
* The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is
* acquired during the first {@link DoFn.ProcessElement} call for each element and restriction,
- * and is released when the {@link DoFn.ProcessElement} call returns {@link
- * DoFn.ProcessContinuation#stop}.
+ * and is released when the {@link DoFn.ProcessElement} call returns and there is no residual
+ * restriction captured by the {@link SplittableProcessElementInvoker}.
*
* <p>A hold is needed to avoid letting the output watermark immediately progress together with
* the input watermark when the first {@link DoFn.ProcessElement} call for this element
* completes.
- *
- * <p>The hold is updated with the future output watermark reported by ProcessContinuation.
*/
private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag =
StateTags.makeSystemTagInternal(
@@ -461,7 +459,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
invoker, elementAndRestriction.element(), tracker);
// Save state for resuming.
- if (!result.getContinuation().shouldResume()) {
+ if (result.getResidualRestriction() == null) {
// All work for this element/restriction is completed. Clear state and release hold.
elementState.clear();
restrictionState.clear();
@@ -469,16 +467,15 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
return;
}
restrictionState.write(result.getResidualRestriction());
- Instant futureOutputWatermark = result.getContinuation().getWatermark();
+ Instant futureOutputWatermark = result.getFutureOutputWatermark();
if (futureOutputWatermark == null) {
futureOutputWatermark = elementAndRestriction.element().getTimestamp();
}
- Instant wakeupTime =
- timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
holdState.add(futureOutputWatermark);
// Set a timer to continue processing this element.
timerInternals.setTimer(
- TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME));
+ TimerInternals.TimerData.of(
+ stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME));
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
index cfc39e7..ced6c01 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
/**
* A runner-specific hook for invoking a {@link DoFn.ProcessElement} method for a splittable {@link
@@ -31,25 +32,24 @@ public abstract class SplittableProcessElementInvoker<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> {
/** Specifies how to resume a splittable {@link DoFn.ProcessElement} call. */
public class Result {
- @Nullable private final RestrictionT residualRestriction;
- private final DoFn.ProcessContinuation continuation;
+ @Nullable
+ private final RestrictionT residualRestriction;
+ private final Instant futureOutputWatermark;
public Result(
- @Nullable RestrictionT residualRestriction, DoFn.ProcessContinuation continuation) {
+ @Nullable RestrictionT residualRestriction, Instant futureOutputWatermark) {
this.residualRestriction = residualRestriction;
- this.continuation = continuation;
+ this.futureOutputWatermark = futureOutputWatermark;
}
- /**
- * Can be {@code null} only if {@link #getContinuation} specifies the call should not resume.
- */
+ /** If {@code null}, means the call should not resume. */
@Nullable
public RestrictionT getResidualRestriction() {
return residualRestriction;
}
- public DoFn.ProcessContinuation getContinuation() {
- return continuation;
+ public Instant getFutureOutputWatermark() {
+ return futureOutputWatermark;
}
}
@@ -57,8 +57,8 @@ public abstract class SplittableProcessElementInvoker<
* Invokes the {@link DoFn.ProcessElement} method using the given {@link DoFnInvoker} for the
* original {@link DoFn}, on the given element and with the given {@link RestrictionTracker}.
*
- * @return Information on how to resume the call: residual restriction and a {@link
- * DoFn.ProcessContinuation}.
+ * @return Information on how to resume the call: residual restriction and a
+ * future output watermark.
*/
public abstract Result invokeProcessElement(
DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, TrackerT tracker);
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index b85f481..965380b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -17,15 +17,11 @@
*/
package org.apache.beam.runners.core;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import java.util.Collection;
import java.util.concurrent.Executors;
@@ -54,17 +50,12 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
}
@ProcessElement
- public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker)
+ public void process(ProcessContext context, OffsetRangeTracker tracker)
throws Exception {
- OffsetRange range = tracker.currentRestriction();
- for (int i = (int) range.getFrom(); i < range.getTo(); ++i) {
- if (!tracker.tryClaim(i)) {
- return resume();
- }
+ for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
Thread.sleep(sleepBeforeEachOutput.getMillis());
context.output("" + i);
}
- return stop();
}
@GetInitialRestriction
@@ -111,7 +102,6 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
public void testInvokeProcessElementOutputBounded() throws Exception {
SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
runTest(10000, Duration.ZERO);
- assertTrue(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
// Should process the first 100 elements.
assertEquals(1000, residualRange.getFrom());
@@ -122,7 +112,6 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
public void testInvokeProcessElementTimeBounded() throws Exception {
SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
runTest(10000, Duration.millis(100));
- assertTrue(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
// Should process ideally around 30 elements - but due to timing flakiness, we can't enforce
// that precisely. Just test that it's not egregiously off.
@@ -135,7 +124,6 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
public void testInvokeProcessElementVoluntaryReturn() throws Exception {
SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
runTest(5, Duration.millis(100));
- assertFalse(res.getContinuation().shouldResume());
assertNull(res.getResidualRestriction());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 6205777..f8d6095 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -17,9 +17,6 @@
*/
package org.apache.beam.runners.core;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
@@ -43,9 +40,13 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -97,8 +98,12 @@ public class SplittableParDoTest {
public SomeRestriction checkpoint() {
return someRestriction;
}
+
+ @Override
+ public void checkDone() {}
}
+ @BoundedPerElement
private static class BoundedFakeFn extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
@@ -109,12 +114,10 @@ public class SplittableParDoTest {
}
}
+ @UnboundedPerElement
private static class UnboundedFakeFn extends DoFn<Integer, String> {
@ProcessElement
- public ProcessContinuation processElement(
- ProcessContext context, SomeRestrictionTracker tracker) {
- return stop();
- }
+ public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
@GetInitialRestriction
public SomeRestriction getInitialRestriction(Integer element) {
@@ -422,164 +425,40 @@ public class SplittableParDoTest {
}
}
- /** A simple splittable {@link DoFn} that outputs the given element every 5 seconds forever. */
- private static class SelfInitiatedResumeFn extends DoFn<Integer, String> {
- @ProcessElement
- public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) {
- c.output(c.element().toString());
- return resume().withResumeDelay(Duration.standardSeconds(5)).withWatermark(c.timestamp());
- }
-
- @GetInitialRestriction
- public SomeRestriction getInitialRestriction(Integer elem) {
- return new SomeRestriction();
- }
- }
-
- @Test
- public void testResumeSetsTimer() throws Exception {
- DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
- Instant base = Instant.now();
- ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
- new ProcessFnTester<>(
- base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class),
- MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
-
- tester.startElement(42, new SomeRestriction());
- assertThat(tester.takeOutputElements(), contains("42"));
-
- // Should resume after 5 seconds: advancing by 3 seconds should have no effect.
- assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
- assertTrue(tester.takeOutputElements().isEmpty());
-
- // 6 seconds should be enough - should invoke the fn again.
- assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
- assertThat(tester.takeOutputElements(), contains("42"));
-
- // Should again resume after 5 seconds: advancing by 3 seconds should again have no effect.
- assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
- assertTrue(tester.takeOutputElements().isEmpty());
-
- // 6 seconds should again be enough.
- assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
- assertThat(tester.takeOutputElements(), contains("42"));
- }
-
- private static class SomeCheckpoint
- implements Serializable, HasDefaultTracker<SomeCheckpoint, SomeCheckpointTracker> {
- private int firstUnprocessedIndex;
-
- private SomeCheckpoint(int firstUnprocessedIndex) {
- this.firstUnprocessedIndex = firstUnprocessedIndex;
- }
-
- @Override
- public SomeCheckpointTracker newTracker() {
- return new SomeCheckpointTracker(this);
- }
- }
-
- private static class SomeCheckpointTracker implements RestrictionTracker<SomeCheckpoint> {
- private SomeCheckpoint current;
- private boolean isActive = true;
-
- private SomeCheckpointTracker(SomeCheckpoint current) {
- this.current = current;
- }
-
- @Override
- public SomeCheckpoint currentRestriction() {
- return current;
- }
-
- public boolean tryUpdateCheckpoint(int firstUnprocessedIndex) {
- if (!isActive) {
- return false;
- }
- current = new SomeCheckpoint(firstUnprocessedIndex);
- return true;
- }
-
- @Override
- public SomeCheckpoint checkpoint() {
- isActive = false;
- return current;
- }
- }
-
/**
- * A splittable {@link DoFn} that generates the sequence [init, init + total) in batches of given
- * size.
+ * A splittable {@link DoFn} that generates the sequence [init, init + total).
*/
private static class CounterFn extends DoFn<Integer, String> {
- private final int numTotalOutputs;
- private final int numOutputsPerCall;
-
- private CounterFn(int numTotalOutputs, int numOutputsPerCall) {
- this.numTotalOutputs = numTotalOutputs;
- this.numOutputsPerCall = numOutputsPerCall;
- }
-
@ProcessElement
- public ProcessContinuation process(ProcessContext c, SomeCheckpointTracker tracker) {
- int start = tracker.currentRestriction().firstUnprocessedIndex;
- for (int i = 0; i < numOutputsPerCall; ++i) {
- int index = start + i;
- if (!tracker.tryUpdateCheckpoint(index + 1)) {
- return resume();
- }
- if (index >= numTotalOutputs) {
- return stop();
- }
- c.output(String.valueOf(c.element() + index));
+ public void process(ProcessContext c, OffsetRangeTracker tracker) {
+ for (long i = tracker.currentRestriction().getFrom();
+ tracker.tryClaim(i); ++i) {
+ c.output(String.valueOf(c.element() + i));
}
- return resume();
}
@GetInitialRestriction
- public SomeCheckpoint getInitialRestriction(Integer elem) {
+ public OffsetRange getInitialRestriction(Integer elem) {
throw new UnsupportedOperationException("Expected to be supplied explicitly in this test");
}
}
@Test
- public void testResumeCarriesOverState() throws Exception {
- DoFn<Integer, String> fn = new CounterFn(3, 1);
- Instant base = Instant.now();
- ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester =
- new ProcessFnTester<>(
- base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class),
- MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION);
-
- tester.startElement(42, new SomeCheckpoint(0));
- assertThat(tester.takeOutputElements(), contains("42"));
- assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
- assertThat(tester.takeOutputElements(), contains("43"));
- assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
- assertThat(tester.takeOutputElements(), contains("44"));
- assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
- // After outputting all 3 items, should not output anything more.
- assertEquals(0, tester.takeOutputElements().size());
- // Should also not ask to resume.
- assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
- }
-
- @Test
public void testCheckpointsAfterNumOutputs() throws Exception {
int max = 100;
- // Create an fn that attempts to 2x output more than checkpointing allows.
- DoFn<Integer, String> fn = new CounterFn(2 * max + max / 2, 2 * max);
+ DoFn<Integer, String> fn = new CounterFn();
Instant base = Instant.now();
int baseIndex = 42;
- ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester =
+ ProcessFnTester<Integer, String, OffsetRange, OffsetRangeTracker> tester =
new ProcessFnTester<>(
- base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class),
+ base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(OffsetRange.class),
max, MAX_BUNDLE_DURATION);
List<String> elements;
- tester.startElement(baseIndex, new SomeCheckpoint(0));
+ // Create an fn that attempts to 2x output more than checkpointing allows.
+ tester.startElement(baseIndex, new OffsetRange(0, 2 * max + max / 2));
elements = tester.takeOutputElements();
assertEquals(max, elements.size());
// Should output the range [0, max)
@@ -609,18 +488,18 @@ public class SplittableParDoTest {
// But bound bundle duration - the bundle should terminate.
Duration maxBundleDuration = Duration.standardSeconds(1);
// Create an fn that attempts to 2x output more than checkpointing allows.
- DoFn<Integer, String> fn = new CounterFn(max, max);
+ DoFn<Integer, String> fn = new CounterFn();
Instant base = Instant.now();
int baseIndex = 42;
- ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester =
+ ProcessFnTester<Integer, String, OffsetRange, OffsetRangeTracker> tester =
new ProcessFnTester<>(
- base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class),
+ base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(OffsetRange.class),
max, maxBundleDuration);
List<String> elements;
- tester.startElement(baseIndex, new SomeCheckpoint(0));
+ tester.startElement(baseIndex, new OffsetRange(0, Long.MAX_VALUE));
// Bundle should terminate, and should do at least some processing.
elements = tester.takeOutputElements();
assertFalse(elements.isEmpty());
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 5139290..e35457c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
@@ -32,7 +31,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -293,6 +291,18 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* See {@link Window} for more information.
*/
public abstract PaneInfo pane();
+
+ /**
+ * Gives the runner a (best-effort) lower bound about the timestamps of future output associated
+ * with the current element.
+ *
+ * <p>If the {@link DoFn} has multiple outputs, the watermark applies to all of them.
+ *
+ * <p>Only splittable {@link DoFn DoFns} are allowed to call this method. It is safe to call
+ * this method from a different thread than the one running {@link ProcessElement}, but
+ * all calls must finish before {@link ProcessElement} returns.
+ */
+ public abstract void updateWatermark(Instant watermark);
}
/**
@@ -556,15 +566,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* returned by {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
* <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
* <li>The type of restrictions used by all of these methods must be the same.
- * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to
- * indicate whether there is more work to be done for the current element.
* <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such as
* {@link BoundedWindow}.
* <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or
* {@link UnboundedPerElement}, but not both at the same time. If it's not annotated with
- * either of these, it's assumed to be {@link BoundedPerElement} if its {@link
- * ProcessElement} method returns {@code void} and {@link UnboundedPerElement} if it
- * returns a {@link ProcessContinuation}.
+ * either of these, it's assumed to be {@link BoundedPerElement}.
* </ul>
*
* <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods.
@@ -692,61 +698,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
@Experimental(Kind.SPLITTABLE_DO_FN)
public @interface UnboundedPerElement {}
- // This can't be put into ProcessContinuation itself due to the following problem:
- // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
- private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
- new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO, null);
-
- /**
- * When used as a return value of {@link ProcessElement}, indicates whether there is more work to
- * be done for the current element.
- */
- @Experimental(Kind.SPLITTABLE_DO_FN)
- @AutoValue
- public abstract static class ProcessContinuation {
- /** Indicates that there is no more work to be done for the current element. */
- public static ProcessContinuation stop() {
- return PROCESS_CONTINUATION_STOP;
- }
-
- /** Indicates that there is more work to be done for the current element. */
- public static ProcessContinuation resume() {
- return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO, null);
- }
-
- /**
- * If false, the {@link DoFn} promises that there is no more work remaining for the current
- * element, so the runner should not resume the {@link ProcessElement} call.
- */
- public abstract boolean shouldResume();
-
- /**
- * A minimum duration that should elapse between the end of this {@link ProcessElement} call and
- * the {@link ProcessElement} call continuing processing of the same element. By default, zero.
- */
- public abstract Duration resumeDelay();
-
- /**
- * A lower bound provided by the {@link DoFn} on timestamps of the output that will be emitted
- * by future {@link ProcessElement} calls continuing processing of the current element.
- *
- * <p>A runner should treat an absent value as equivalent to the timestamp of the input element.
- */
- @Nullable
- public abstract Instant getWatermark();
-
- /** Builder method to set the value of {@link #resumeDelay()}. */
- public ProcessContinuation withResumeDelay(Duration resumeDelay) {
- return new AutoValue_DoFn_ProcessContinuation(
- shouldResume(), resumeDelay, getWatermark());
- }
-
- /** Builder method to set the value of {@link #getWatermark()}. */
- public ProcessContinuation withWatermark(Instant watermark) {
- return new AutoValue_DoFn_ProcessContinuation(
- shouldResume(), resumeDelay(), watermark);
- }
- }
+ /** Do not use. See https://issues.apache.org/jira/browse/BEAM-1904 */
+ @Deprecated
+ public class ProcessContinuation {}
/**
* Returns an {@link Aggregator} with aggregation logic specified by the {@link CombineFn}
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 01f0291..88f4035 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -672,6 +672,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
+ public void updateWatermark(Instant watermark) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public PipelineOptions getPipelineOptions() {
return context.getPipelineOptions();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 6746d3a..4b0cbf7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -50,6 +50,7 @@ import net.bytebuddy.implementation.bytecode.Throw;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.constant.NullConstant;
import net.bytebuddy.implementation.bytecode.constant.TextConstant;
import net.bytebuddy.implementation.bytecode.member.FieldAccess;
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
@@ -625,17 +626,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
* {@link ProcessElement} method.
*/
private static final class ProcessElementDelegation extends DoFnMethodDelegation {
- private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD;
-
- static {
- try {
- PROCESS_CONTINUATION_STOP_METHOD =
- new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop"));
- } catch (NoSuchMethodException e) {
- throw new RuntimeException("Failed to locate ProcessContinuation.stop()");
- }
- }
-
private final DoFnSignature.ProcessElementMethod signature;
/** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
@@ -672,12 +662,8 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
@Override
protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
- if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) {
- return new StackManipulation.Compound(
- MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE);
- } else {
- return MethodReturn.of(targetMethod.getReturnType().asErasure());
- }
+ return new StackManipulation.Compound(
+ NullConstant.INSTANCE, MethodReturn.REFERENCE);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 85831a7..cc06e70 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -53,8 +53,8 @@ public interface DoFnInvoker<InputT, OutputT> {
* Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
*
* @param extra Factory for producing extra parameter objects (such as window), if necessary.
- * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link
- * DoFn.ProcessContinuation#stop()} if it returns {@code void}.
+ * @return {@code null} - see <a href="https://issues.apache.org/jira/browse/BEAM-1904">JIRA</a>
+ * tracking the complete removal of {@link DoFn.ProcessContinuation}.
*/
DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 007d8be..1be741f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -28,7 +28,6 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
@@ -397,21 +396,16 @@ public abstract class DoFnSignature {
@Nullable
public abstract TypeDescriptor<? extends BoundedWindow> windowT();
- /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */
- public abstract boolean hasReturnValue();
-
static ProcessElementMethod create(
Method targetMethod,
List<Parameter> extraParameters,
TypeDescriptor<?> trackerT,
- @Nullable TypeDescriptor<? extends BoundedWindow> windowT,
- boolean hasReturnValue) {
+ @Nullable TypeDescriptor<? extends BoundedWindow> windowT) {
return new AutoValue_DoFnSignature_ProcessElementMethod(
targetMethod,
Collections.unmodifiableList(extraParameters),
trackerT,
- windowT,
- hasReturnValue);
+ windowT);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 006d012..80dbe10 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.transforms.reflect;
-import static com.google.common.base.Preconditions.checkState;
-
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicates;
@@ -428,8 +426,6 @@ public class DoFnSignatures {
* <li>If the {@link DoFn} (or any of its supertypes) is annotated as {@link
* DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, use that. Only one of
* these must be specified.
- * <li>If {@link DoFn.ProcessElement} returns {@link DoFn.ProcessContinuation}, assume it is
- * unbounded. Otherwise (if it returns {@code void}), assume it is bounded.
* <li>If {@link DoFn.ProcessElement} returns {@code void}, but the {@link DoFn} is annotated
* {@link DoFn.UnboundedPerElement}, this is an error.
* </ol>
@@ -455,10 +451,7 @@ public class DoFnSignatures {
}
if (processElement.isSplittable()) {
if (isBounded == null) {
- isBounded =
- processElement.hasReturnValue()
- ? PCollection.IsBounded.UNBOUNDED
- : PCollection.IsBounded.BOUNDED;
+ isBounded = PCollection.IsBounded.BOUNDED;
}
} else {
errors.checkArgument(
@@ -467,7 +460,6 @@ public class DoFnSignatures {
+ ((isBounded == PCollection.IsBounded.BOUNDED)
? DoFn.BoundedPerElement.class.getSimpleName()
: DoFn.UnboundedPerElement.class.getSimpleName()));
- checkState(!processElement.hasReturnValue(), "Should have been inferred splittable");
isBounded = PCollection.IsBounded.BOUNDED;
}
return isBounded;
@@ -691,10 +683,8 @@ public class DoFnSignatures {
TypeDescriptor<?> outputT,
FnAnalysisContext fnContext) {
errors.checkArgument(
- void.class.equals(m.getReturnType())
- || DoFn.ProcessContinuation.class.equals(m.getReturnType()),
- "Must return void or %s",
- DoFn.ProcessContinuation.class.getSimpleName());
+ void.class.equals(m.getReturnType()),
+ "Must return void");
MethodAnalysisContext methodContext = MethodAnalysisContext.create();
@@ -734,11 +724,7 @@ public class DoFnSignatures {
}
return DoFnSignature.ProcessElementMethod.create(
- m,
- methodContext.getExtraParameters(),
- trackerT,
- windowT,
- DoFn.ProcessContinuation.class.equals(m.getReturnType()));
+ m, methodContext.getExtraParameters(), trackerT, windowT);
}
private static void checkParameterOneOf(
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 87c7bfd..0271a0d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -19,6 +19,9 @@ package org.apache.beam.sdk.transforms.splittabledofn;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.transforms.DoFn;
/**
* A {@link RestrictionTracker} for claiming offsets in an {@link OffsetRange} in a monotonically
@@ -27,6 +30,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
private OffsetRange range;
private Long lastClaimedOffset = null;
+ private Long lastAttemptedOffset = null;
public OffsetRangeTracker(OffsetRange range) {
this.range = checkNotNull(range);
@@ -59,12 +63,13 @@ public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
*/
public synchronized boolean tryClaim(long i) {
checkArgument(
- lastClaimedOffset == null || i > lastClaimedOffset,
- "Trying to claim offset %s while last claimed was %s",
+ lastAttemptedOffset == null || i > lastAttemptedOffset,
+ "Trying to claim offset %s while last attempted was %s",
i,
- lastClaimedOffset);
+ lastAttemptedOffset);
checkArgument(
i >= range.getFrom(), "Trying to claim offset %s before start of the range %s", i, range);
+ lastAttemptedOffset = i;
// No respective checkArgument for i < range.to() - it's ok to try claiming offsets beyond it.
if (i >= range.getTo()) {
return false;
@@ -72,4 +77,26 @@ public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
lastClaimedOffset = i;
return true;
}
+
+ /**
+ * Marks that there are no more offsets to be claimed in the range.
+ *
+ * <p>E.g., a {@link DoFn} reading a file and claiming the offset of each record in the file might
+ * call this if it hits EOF - even though the last attempted claim was before the end of the
+ * range, there are no more offsets to claim.
+ */
+ public synchronized void markDone() {
+ lastAttemptedOffset = Long.MAX_VALUE;
+ }
+
+ @Override
+ public synchronized void checkDone() throws IllegalStateException {
+ checkState(
+ lastAttemptedOffset >= range.getTo() - 1,
+ "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",
+ lastAttemptedOffset,
+ range,
+ lastAttemptedOffset + 1,
+ range.getTo());
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index e9b718e..27ef68f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -38,5 +38,13 @@ public interface RestrictionTracker<RestrictionT> {
*/
RestrictionT checkpoint();
+ /**
+ * Called by the runner after {@link DoFn.ProcessElement} returns.
+ *
+ * <p>Must throw an exception with an informative error message, if there is still any unclaimed
+ * work remaining in the restriction.
+ */
+ void checkDone() throws IllegalStateException;
+
// TODO: Add the more general splitRemainderAfterFraction() and other methods.
}
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 154a088..a122f67 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -18,8 +18,6 @@
package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -67,14 +65,10 @@ public class SplittableDoFnTest {
static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
@ProcessElement
- public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
+ public void process(ProcessContext c, OffsetRangeTracker tracker) {
for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
c.output(KV.of(c.element(), (int) i));
- if (i % 3 == 0) {
- return resume();
- }
}
- return stop();
}
@GetInitialRestriction
@@ -196,19 +190,14 @@ public class SplittableDoFnTest {
}
@ProcessElement
- public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
+ public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
- int trueEnd = snapToNextBlock((int) tracker.currentRestriction().getTo(), blockStarts);
- for (int i = trueStart; i < trueEnd; ++i) {
- if (!tracker.tryClaim(blockStarts[i])) {
- return resume();
- }
+ for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
c.output(index);
}
}
- return stop();
}
@GetInitialRestriction
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 425f453..8b4df4c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
@@ -83,8 +82,8 @@ public class DoFnInvokersTest {
when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
}
- private ProcessContinuation invokeProcessElement(DoFn<String, String> fn) {
- return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
+ private void invokeProcessElement(DoFn<String, String> fn) {
+ DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
}
private void invokeOnTimer(String timerId, DoFn<String, String> fn) {
@@ -113,7 +112,7 @@ public class DoFnInvokersTest {
public void processElement(ProcessContext c) throws Exception {}
}
MockFn mockFn = mock(MockFn.class);
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(mockFn));
+ invokeProcessElement(mockFn);
verify(mockFn).processElement(mockProcessContext);
}
@@ -134,7 +133,7 @@ public class DoFnInvokersTest {
public void testDoFnWithProcessElementInterface() throws Exception {
IdentityUsingInterfaceWithProcessElement fn =
mock(IdentityUsingInterfaceWithProcessElement.class);
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).processElement(mockProcessContext);
}
@@ -155,14 +154,14 @@ public class DoFnInvokersTest {
@Test
public void testDoFnWithMethodInSuperclass() throws Exception {
IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class);
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).process(mockProcessContext);
}
@Test
public void testDoFnWithMethodInSubclass() throws Exception {
IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class);
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).process(mockProcessContext);
}
@@ -173,7 +172,7 @@ public class DoFnInvokersTest {
public void processElement(ProcessContext c, IntervalWindow w) throws Exception {}
}
MockFn fn = mock(MockFn.class);
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).processElement(mockProcessContext, mockWindow);
}
@@ -197,7 +196,7 @@ public class DoFnInvokersTest {
throws Exception {}
}
MockFn fn = mock(MockFn.class);
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).processElement(mockProcessContext, mockState);
}
@@ -223,35 +222,11 @@ public class DoFnInvokersTest {
public void onTimer() {}
}
MockFn fn = mock(MockFn.class);
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).processElement(mockProcessContext, mockTimer);
}
@Test
- public void testDoFnWithReturn() throws Exception {
- class MockFn extends DoFn<String, String> {
- @DoFn.ProcessElement
- public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker)
- throws Exception {
- return null;
- }
-
- @GetInitialRestriction
- public SomeRestriction getInitialRestriction(String element) {
- return null;
- }
-
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
- return null;
- }
- }
- MockFn fn = mock(MockFn.class);
- when(fn.processElement(mockProcessContext, null)).thenReturn(ProcessContinuation.resume());
- assertEquals(ProcessContinuation.resume(), invokeProcessElement(fn));
- }
-
- @Test
public void testDoFnWithStartBundleSetupTeardown() throws Exception {
class MockFn extends DoFn<String, String> {
@ProcessElement
@@ -306,9 +281,7 @@ public class DoFnInvokersTest {
/** Public so Mockito can do "delegatesTo()" in the test below. */
public static class MockFn extends DoFn<String, String> {
@ProcessElement
- public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) {
- return null;
- }
+ public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {}
@GetInitialRestriction
public SomeRestriction getInitialRestriction(String element) {
@@ -360,7 +333,7 @@ public class DoFnInvokersTest {
.splitRestriction(
eq("blah"), same(restriction), Mockito.<DoFn.OutputReceiver<SomeRestriction>>any());
when(fn.newTracker(restriction)).thenReturn(tracker);
- when(fn.processElement(mockProcessContext, tracker)).thenReturn(ProcessContinuation.resume());
+ fn.processElement(mockProcessContext, tracker);
assertEquals(coder, invoker.invokeGetRestrictionCoder(new CoderRegistry()));
assertEquals(restriction, invoker.invokeGetInitialRestriction("blah"));
@@ -376,8 +349,6 @@ public class DoFnInvokersTest {
});
assertEquals(Arrays.asList(part1, part2, part3), outputs);
assertEquals(tracker, invoker.invokeNewTracker(restriction));
- assertEquals(
- ProcessContinuation.resume(),
invoker.invokeProcessElement(
new FakeArgumentProvider<String, String>() {
@Override
@@ -389,7 +360,7 @@ public class DoFnInvokersTest {
public RestrictionTracker<?> restrictionTracker() {
return tracker;
}
- }));
+ });
}
private static class RestrictionWithDefaultTracker
@@ -410,6 +381,9 @@ public class DoFnInvokersTest {
public RestrictionWithDefaultTracker checkpoint() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void checkDone() throws IllegalStateException {}
}
private static class CoderForDefaultTracker extends CustomCoder<RestrictionWithDefaultTracker> {
@@ -459,8 +433,7 @@ public class DoFnInvokersTest {
assertEquals("foo", output);
}
});
- assertEquals(
- ProcessContinuation.stop(), invoker.invokeProcessElement(mockArgumentProvider));
+ invoker.invokeProcessElement(mockArgumentProvider);
assertThat(
invoker.invokeNewTracker(new RestrictionWithDefaultTracker()),
instanceOf(DefaultTracker.class));
@@ -550,14 +523,14 @@ public class DoFnInvokersTest {
@Test
public void testLocalPrivateDoFnClass() throws Exception {
PrivateDoFnClass fn = mock(PrivateDoFnClass.class);
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).processThis(mockProcessContext);
}
@Test
public void testStaticPackagePrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass());
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockProcessContext);
}
@@ -565,28 +538,28 @@ public class DoFnInvokersTest {
public void testInnerPackagePrivateDoFnClass() throws Exception {
DoFn<String, String> fn =
mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass());
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockProcessContext);
}
@Test
public void testStaticPrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass());
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockProcessContext);
}
@Test
public void testInnerPrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass());
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockProcessContext);
}
@Test
public void testAnonymousInnerDoFn() throws Exception {
DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass());
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockProcessContext);
}
@@ -594,7 +567,7 @@ public class DoFnInvokersTest {
public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
// Can't use mockito for this one - the anonymous class is final and can't be mocked.
DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFn();
- assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
DoFnInvokersTestHelper.verifyStaticAnonymousDoFnInvoked(fn, mockProcessContext);
}
@@ -623,32 +596,6 @@ public class DoFnInvokersTest {
}
@Test
- public void testProcessElementExceptionWithReturn() throws Exception {
- thrown.expect(UserCodeException.class);
- thrown.expectMessage("bogus");
- DoFnInvokers
- .invokerFor(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public ProcessContinuation processElement(
- @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) {
- throw new IllegalArgumentException("bogus");
- }
-
- @GetInitialRestriction
- public SomeRestriction getInitialRestriction(Integer element) {
- return null;
- }
-
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
- return null;
- }
- })
- .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>());
- }
-
- @Test
public void testStartBundleException() throws Exception {
DoFnInvoker<Integer, Integer> invoker =
DoFnInvokers.invokerFor(
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
index 44ae5c4..d321f54 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -50,7 +50,7 @@ public class DoFnSignaturesProcessElementTest {
@Test
public void testBadReturnType() throws Exception {
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Must return void or ProcessContinuation");
+ thrown.expectMessage("Must return void");
analyzeProcessElementMethod(
new AnonymousMethod() {
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 052feb8..b937e84 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -60,20 +60,6 @@ public class DoFnSignaturesSplittableDoFnTest {
private abstract static class SomeRestrictionCoder implements Coder<SomeRestriction> {}
@Test
- public void testReturnsProcessContinuation() throws Exception {
- DoFnSignature.ProcessElementMethod signature =
- analyzeProcessElementMethod(
- new AnonymousMethod() {
- private DoFn.ProcessContinuation method(
- DoFn<Integer, String>.ProcessContext context) {
- return null;
- }
- });
-
- assertTrue(signature.hasReturnValue());
- }
-
- @Test
public void testHasRestrictionTracker() throws Exception {
DoFnSignature.ProcessElementMethod signature =
analyzeProcessElementMethod(
@@ -157,54 +143,6 @@ public class DoFnSignaturesSplittableDoFnTest {
.isBoundedPerElement());
}
- private static class BaseFnWithContinuation extends DoFn<Integer, String> {
- @ProcessElement
- public ProcessContinuation processElement(
- ProcessContext context, SomeRestrictionTracker tracker) {
- return null;
- }
-
- @GetInitialRestriction
- public SomeRestriction getInitialRestriction(Integer element) {
- return null;
- }
-
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
- return null;
- }
- }
-
- @Test
- public void testSplittableIsBoundedByDefault() throws Exception {
- assertEquals(
- PCollection.IsBounded.UNBOUNDED,
- DoFnSignatures
- .getSignature(BaseFnWithContinuation.class)
- .isBoundedPerElement());
- }
-
- @Test
- public void testSplittableRespectsBoundednessAnnotation() throws Exception {
- @BoundedPerElement
- class BoundedFnWithContinuation extends BaseFnWithContinuation {}
-
- assertEquals(
- PCollection.IsBounded.BOUNDED,
- DoFnSignatures
- .getSignature(BoundedFnWithContinuation.class)
- .isBoundedPerElement());
-
- @UnboundedPerElement
- class UnboundedFnWithContinuation extends BaseFnWithContinuation {}
-
- assertEquals(
- PCollection.IsBounded.UNBOUNDED,
- DoFnSignatures
- .getSignature(UnboundedFnWithContinuation.class)
- .isBoundedPerElement());
- }
-
@Test
public void testUnsplittableButDeclaresBounded() throws Exception {
@BoundedPerElement
@@ -234,10 +172,8 @@ public class DoFnSignaturesSplittableDoFnTest {
public void testSplittableWithAllFunctions() throws Exception {
class GoodSplittableDoFn extends DoFn<Integer, String> {
@ProcessElement
- public ProcessContinuation processElement(
- ProcessContext context, SomeRestrictionTracker tracker) {
- return null;
- }
+ public void processElement(
+ ProcessContext context, SomeRestrictionTracker tracker) {}
@GetInitialRestriction
public SomeRestriction getInitialRestriction(Integer element) {
@@ -262,7 +198,6 @@ public class DoFnSignaturesSplittableDoFnTest {
DoFnSignature signature = DoFnSignatures.getSignature(GoodSplittableDoFn.class);
assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
assertTrue(signature.processElement().isSplittable());
- assertTrue(signature.processElement().hasReturnValue());
assertEquals(
SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
@@ -279,9 +214,7 @@ public class DoFnSignaturesSplittableDoFnTest {
public void testSplittableWithAllFunctionsGeneric() throws Exception {
class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> {
@ProcessElement
- public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) {
- return null;
- }
+ public void processElement(ProcessContext context, TrackerT tracker) {}
@GetInitialRestriction
public RestrictionT getInitialRestriction(Integer element) {
@@ -309,7 +242,6 @@ public class DoFnSignaturesSplittableDoFnTest {
SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass());
assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
assertTrue(signature.processElement().isSplittable());
- assertTrue(signature.processElement().hasReturnValue());
assertEquals(
SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
http://git-wip-us.apache.org/repos/asf/beam/blob/dad7ace2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
index c8a530c..831894c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
@@ -95,7 +95,7 @@ public class OffsetRangeTrackerTest {
@Test
public void testNonMonotonicClaim() throws Exception {
- expected.expectMessage("Trying to claim offset 103 while last claimed was 110");
+ expected.expectMessage("Trying to claim offset 103 while last attempted was 110");
OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
assertTrue(tracker.tryClaim(105));
assertTrue(tracker.tryClaim(110));
@@ -108,4 +108,51 @@ public class OffsetRangeTrackerTest {
OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
tracker.tryClaim(90);
}
+
+ @Test
+ public void testCheckDoneAfterTryClaimPastEndOfRange() {
+ OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+ assertTrue(tracker.tryClaim(150));
+ assertTrue(tracker.tryClaim(175));
+ assertFalse(tracker.tryClaim(220));
+ tracker.checkDone();
+ }
+
+ @Test
+ public void testCheckDoneAfterTryClaimAtEndOfRange() {
+ OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+ assertTrue(tracker.tryClaim(150));
+ assertTrue(tracker.tryClaim(175));
+ assertFalse(tracker.tryClaim(200));
+ tracker.checkDone();
+ }
+
+ @Test
+ public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() {
+ OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+ assertTrue(tracker.tryClaim(150));
+ assertTrue(tracker.tryClaim(175));
+ assertTrue(tracker.tryClaim(199));
+ tracker.checkDone();
+ }
+
+ @Test
+ public void testCheckDoneWhenNotDone() {
+ OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+ assertTrue(tracker.tryClaim(150));
+ assertTrue(tracker.tryClaim(175));
+ expected.expectMessage(
+ "Last attempted offset was 175 in range [100, 200), "
+ + "claiming work in [176, 200) was not attempted");
+ tracker.checkDone();
+ }
+
+ @Test
+ public void testCheckDoneWhenExplicitlyMarkedDone() {
+ OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
+ assertTrue(tracker.tryClaim(150));
+ assertTrue(tracker.tryClaim(175));
+ tracker.markDone();
+ tracker.checkDone();
+ }
}