You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/12 23:29:29 UTC
[1/2] beam git commit: [BEAM-2447] Reintroduces
DoFn.ProcessContinuation
Repository: beam
Updated Branches:
refs/heads/master 91c7d3d1f -> 66b4a1be0
[BEAM-2447] Reintroduces DoFn.ProcessContinuation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bff4a78
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bff4a78
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bff4a78
Branch: refs/heads/master
Commit: 1bff4a786536ff1a4ffe9904079c7a89058e6b4e
Parents: 91c7d3d
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jun 13 16:50:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Jul 12 16:10:07 2017 -0700
----------------------------------------------------------------------
.../core/construction/SplittableParDoTest.java | 10 +-
...eBoundedSplittableProcessElementInvoker.java | 35 ++++++-
.../core/SplittableParDoViaKeyedWorkItems.java | 9 +-
.../core/SplittableProcessElementInvoker.java | 25 ++++-
...ndedSplittableProcessElementInvokerTest.java | 45 +++++++--
.../core/SplittableParDoProcessFnTest.java | 99 ++++++++++++++++--
.../org/apache/beam/sdk/transforms/DoFn.java | 51 +++++++++-
.../reflect/ByteBuddyDoFnInvokerFactory.java | 19 +++-
.../sdk/transforms/reflect/DoFnInvoker.java | 4 +-
.../sdk/transforms/reflect/DoFnSignature.java | 10 +-
.../sdk/transforms/reflect/DoFnSignatures.java | 22 +++-
.../splittabledofn/OffsetRangeTracker.java | 10 ++
.../splittabledofn/RestrictionTracker.java | 11 +-
.../beam/sdk/transforms/SplittableDoFnTest.java | 100 ++++++++-----------
.../transforms/reflect/DoFnInvokersTest.java | 93 +++++++++++++----
.../DoFnSignaturesProcessElementTest.java | 2 +-
.../DoFnSignaturesSplittableDoFnTest.java | 83 +++++++++++++--
17 files changed, 487 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index f4c596e..267232c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.core.construction;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.junit.Assert.assertEquals;
import java.io.Serializable;
@@ -24,8 +25,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
-import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -70,7 +69,6 @@ public class SplittableParDoTest {
public void checkDone() {}
}
- @BoundedPerElement
private static class BoundedFakeFn extends DoFn<Integer, String> {
@ProcessElement
public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
@@ -81,10 +79,12 @@ public class SplittableParDoTest {
}
}
- @UnboundedPerElement
private static class UnboundedFakeFn extends DoFn<Integer, String> {
@ProcessElement
- public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
+ public ProcessContinuation processElement(
+ ProcessContext context, SomeRestrictionTracker tracker) {
+ return stop();
+ }
@GetInitialRestriction
public SomeRestriction getInitialRestriction(Integer element) {
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 475abf2..0c956d5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -96,7 +96,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
final WindowedValue<InputT> element,
final TrackerT tracker) {
final ProcessContext processContext = new ProcessContext(element, tracker);
- invoker.invokeProcessElement(
+ DoFn.ProcessContinuation cont = invoker.invokeProcessElement(
new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
@Override
public DoFn<InputT, OutputT>.ProcessContext processContext(
@@ -155,10 +155,37 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
"Access to timers not supported in Splittable DoFn");
}
});
-
+ // TODO: verify that if there was a failed tryClaim() call, then cont.shouldResume() is false.
+ // Currently we can't verify this because there are no hooks into tryClaim().
+ // See https://issues.apache.org/jira/browse/BEAM-2607
+ RestrictionT residual = processContext.extractCheckpoint();
+ if (cont.shouldResume()) {
+ if (residual == null) {
+ // No checkpoint had been taken by the runner while the ProcessElement call ran, however
+ // the call says that not the whole restriction has been processed. So we need to take
+ // a checkpoint now: checkpoint() guarantees that the primary restriction describes exactly
+ // the work that was done in the current ProcessElement call, and returns a residual
+ // restriction that describes exactly the work that wasn't done in the current call.
+ residual = tracker.checkpoint();
+ } else {
+ // A checkpoint was taken by the runner, and then the ProcessElement call returned resume()
+ // without making more tryClaim() calls (since no tryClaim() calls can succeed after
+ // checkpoint(), and since if it had made a failed tryClaim() call, it should have returned
+ // stop()).
+ // This means that the resulting primary restriction and the taken checkpoint already
+ // accurately describe respectively the work that was and wasn't done in the current
+ // ProcessElement call.
+ // In other words, if we took a checkpoint *after* ProcessElement completed (like in the
+ // branch above), it would have been equivalent to this one.
+ }
+ } else {
+ // The ProcessElement call returned stop() - that means the tracker's current restriction
+ // has been fully processed by the call. A checkpoint may or may not have been taken in
+ // "residual"; if it was, then we'll need to process it; if no, then we don't - nothing
+ // special needs to be done.
+ }
tracker.checkDone();
- return new Result(
- processContext.extractCheckpoint(), processContext.getLastReportedWatermark());
+ return new Result(residual, cont, processContext.getLastReportedWatermark());
}
private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 09f3b15..6e97645 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -200,8 +200,8 @@ public class SplittableParDoViaKeyedWorkItems {
/**
* The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is
* acquired during the first {@link DoFn.ProcessElement} call for each element and restriction,
- * and is released when the {@link DoFn.ProcessElement} call returns and there is no residual
- * restriction captured by the {@link SplittableProcessElementInvoker}.
+ * and is released when the {@link DoFn.ProcessElement} call returns {@link
+ * ProcessContinuation#stop()}.
*
* <p>A hold is needed to avoid letting the output watermark immediately progress together with
* the input watermark when the first {@link DoFn.ProcessElement} call for this element
@@ -365,11 +365,12 @@ public class SplittableParDoViaKeyedWorkItems {
if (futureOutputWatermark == null) {
futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
}
+ Instant wakeupTime =
+ timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
holdState.add(futureOutputWatermark);
// Set a timer to continue processing this element.
timerInternals.setTimer(
- TimerInternals.TimerData.of(
- stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME));
+ TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME));
}
private DoFn<InputT, OutputT>.StartBundleContext wrapContextAsStartBundle(
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
index ced6c01..7732df3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.core;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -34,20 +36,35 @@ public abstract class SplittableProcessElementInvoker<
public class Result {
@Nullable
private final RestrictionT residualRestriction;
+ private final DoFn.ProcessContinuation continuation;
private final Instant futureOutputWatermark;
public Result(
- @Nullable RestrictionT residualRestriction, Instant futureOutputWatermark) {
+ @Nullable RestrictionT residualRestriction,
+ DoFn.ProcessContinuation continuation,
+ Instant futureOutputWatermark) {
+ this.continuation = checkNotNull(continuation);
+ if (continuation.shouldResume()) {
+ checkNotNull(residualRestriction);
+ }
this.residualRestriction = residualRestriction;
this.futureOutputWatermark = futureOutputWatermark;
}
- /** If {@code null}, means the call should not resume. */
+ /**
+ * Can be {@code null} only if {@link #getContinuation} specifies the call should not resume.
+ * However, the converse is not true: this can be non-null even if {@link #getContinuation}
+ * is {@link DoFn.ProcessContinuation#stop()}.
+ */
@Nullable
public RestrictionT getResidualRestriction() {
return residualRestriction;
}
+ public DoFn.ProcessContinuation getContinuation() {
+ return continuation;
+ }
+
public Instant getFutureOutputWatermark() {
return futureOutputWatermark;
}
@@ -57,8 +74,8 @@ public abstract class SplittableProcessElementInvoker<
* Invokes the {@link DoFn.ProcessElement} method using the given {@link DoFnInvoker} for the
* original {@link DoFn}, on the given element and with the given {@link RestrictionTracker}.
*
- * @return Information on how to resume the call: residual restriction and a
- * future output watermark.
+ * @return Information on how to resume the call: residual restriction, a {@link
+ * DoFn.ProcessContinuation}, and a future output watermark.
*/
public abstract Result invokeProcessElement(
DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, TrackerT tracker);
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index b80a632..959909e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -17,11 +17,15 @@
*/
package org.apache.beam.runners.core;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import java.util.Collection;
import java.util.concurrent.Executors;
@@ -42,19 +46,27 @@ import org.junit.Test;
/** Tests for {@link OutputAndTimeBoundedSplittableProcessElementInvoker}. */
public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
private static class SomeFn extends DoFn<Integer, String> {
+ private final int numOutputsPerProcessCall;
private final Duration sleepBeforeEachOutput;
- private SomeFn(Duration sleepBeforeEachOutput) {
+ private SomeFn(int numOutputsPerProcessCall, Duration sleepBeforeEachOutput) {
+ this.numOutputsPerProcessCall = numOutputsPerProcessCall;
this.sleepBeforeEachOutput = sleepBeforeEachOutput;
}
@ProcessElement
- public void process(ProcessContext context, OffsetRangeTracker tracker)
+ public ProcessContinuation process(ProcessContext context, OffsetRangeTracker tracker)
throws Exception {
- for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
+ for (long i = tracker.currentRestriction().getFrom(), numIterations = 1;
+ tracker.tryClaim(i);
+ ++i, ++numIterations) {
Thread.sleep(sleepBeforeEachOutput.getMillis());
context.output("" + i);
+ if (numIterations == numOutputsPerProcessCall) {
+ return resume();
+ }
}
+ return stop();
}
@GetInitialRestriction
@@ -64,8 +76,8 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
}
private SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result
- runTest(int count, Duration sleepPerElement) {
- SomeFn fn = new SomeFn(sleepPerElement);
+ runTest(int totalNumOutputs, int numOutputsPerProcessCall, Duration sleepPerElement) {
+ SomeFn fn = new SomeFn(numOutputsPerProcessCall, sleepPerElement);
SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker> invoker =
new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
fn,
@@ -93,14 +105,15 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
return invoker.invokeProcessElement(
DoFnInvokers.invokerFor(fn),
- WindowedValue.of(count, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
- new OffsetRangeTracker(new OffsetRange(0, count)));
+ WindowedValue.of(totalNumOutputs, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+ new OffsetRangeTracker(new OffsetRange(0, totalNumOutputs)));
}
@Test
public void testInvokeProcessElementOutputBounded() throws Exception {
SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
- runTest(10000, Duration.ZERO);
+ runTest(10000, Integer.MAX_VALUE, Duration.ZERO);
+ assertFalse(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
// Should process the first 100 elements.
assertEquals(1000, residualRange.getFrom());
@@ -110,7 +123,8 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
@Test
public void testInvokeProcessElementTimeBounded() throws Exception {
SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
- runTest(10000, Duration.millis(100));
+ runTest(10000, Integer.MAX_VALUE, Duration.millis(100));
+ assertFalse(res.getContinuation().shouldResume());
OffsetRange residualRange = res.getResidualRestriction();
// Should process ideally around 30 elements - but due to timing flakiness, we can't enforce
// that precisely. Just test that it's not egregiously off.
@@ -120,9 +134,18 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
}
@Test
- public void testInvokeProcessElementVoluntaryReturn() throws Exception {
+ public void testInvokeProcessElementVoluntaryReturnStop() throws Exception {
SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
- runTest(5, Duration.millis(100));
+ runTest(5, Integer.MAX_VALUE, Duration.millis(100));
+ assertFalse(res.getContinuation().shouldResume());
assertNull(res.getResidualRestriction());
}
+
+ @Test
+ public void testInvokeProcessElementVoluntaryReturnResume() throws Exception {
+ SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result res =
+ runTest(10, 5, Duration.millis(100));
+ assertTrue(res.getContinuation().shouldResume());
+ assertEquals(new OffsetRange(5, 10), res.getResidualRestriction());
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 1cd1275..7449af3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.core;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasItems;
@@ -365,16 +368,71 @@ public class SplittableParDoProcessFnTest {
assertEquals(null, tester.getWatermarkHold());
}
- /**
- * A splittable {@link DoFn} that generates the sequence [init, init + total).
- */
+ /** A simple splittable {@link DoFn} that outputs the given element every 5 seconds forever. */
+ private static class SelfInitiatedResumeFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) {
+ c.output(c.element().toString());
+ return resume().withResumeDelay(Duration.standardSeconds(5));
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer elem) {
+ return new SomeRestriction();
+ }
+ }
+
+ @Test
+ public void testResumeSetsTimer() throws Exception {
+ DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
+ Instant base = Instant.now();
+ ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+ new ProcessFnTester<>(
+ base,
+ fn,
+ BigEndianIntegerCoder.of(),
+ SerializableCoder.of(SomeRestriction.class),
+ MAX_OUTPUTS_PER_BUNDLE,
+ MAX_BUNDLE_DURATION);
+
+ tester.startElement(42, new SomeRestriction());
+ assertThat(tester.takeOutputElements(), contains("42"));
+
+ // Should resume after 5 seconds: advancing by 3 seconds should have no effect.
+ assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+ assertTrue(tester.takeOutputElements().isEmpty());
+
+ // 6 seconds should be enough should invoke the fn again.
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+ assertThat(tester.takeOutputElements(), contains("42"));
+
+ // Should again resume after 5 seconds: advancing by 3 seconds should again have no effect.
+ assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+ assertTrue(tester.takeOutputElements().isEmpty());
+
+ // 6 seconds should again be enough.
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+ assertThat(tester.takeOutputElements(), contains("42"));
+ }
+
+ /** A splittable {@link DoFn} that generates the sequence [init, init + total). */
private static class CounterFn extends DoFn<Integer, String> {
+ private final int numOutputsPerCall;
+
+ public CounterFn(int numOutputsPerCall) {
+ this.numOutputsPerCall = numOutputsPerCall;
+ }
+
@ProcessElement
- public void process(ProcessContext c, OffsetRangeTracker tracker) {
- for (long i = tracker.currentRestriction().getFrom();
- tracker.tryClaim(i); ++i) {
+ public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
+ for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
+ tracker.tryClaim(i); ++i, ++numIterations) {
c.output(String.valueOf(c.element() + i));
+ if (numIterations == numOutputsPerCall) {
+ return resume();
+ }
}
+ return stop();
}
@GetInitialRestriction
@@ -383,10 +441,35 @@ public class SplittableParDoProcessFnTest {
}
}
+ public void testResumeCarriesOverState() throws Exception {
+ DoFn<Integer, String> fn = new CounterFn(1);
+ Instant base = Instant.now();
+ ProcessFnTester<Integer, String, OffsetRange, OffsetRangeTracker> tester =
+ new ProcessFnTester<>(
+ base,
+ fn,
+ BigEndianIntegerCoder.of(),
+ SerializableCoder.of(OffsetRange.class),
+ MAX_OUTPUTS_PER_BUNDLE,
+ MAX_BUNDLE_DURATION);
+
+ tester.startElement(42, new OffsetRange(0, 3));
+ assertThat(tester.takeOutputElements(), contains("42"));
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ assertThat(tester.takeOutputElements(), contains("43"));
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ assertThat(tester.takeOutputElements(), contains("44"));
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ // After outputting all 3 items, should not output anything more.
+ assertEquals(0, tester.takeOutputElements().size());
+ // Should also not ask to resume.
+ assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ }
+
@Test
public void testCheckpointsAfterNumOutputs() throws Exception {
int max = 100;
- DoFn<Integer, String> fn = new CounterFn();
+ DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE);
Instant base = Instant.now();
int baseIndex = 42;
@@ -428,7 +511,7 @@ public class SplittableParDoProcessFnTest {
// But bound bundle duration - the bundle should terminate.
Duration maxBundleDuration = Duration.standardSeconds(1);
// Create an fn that attempts to 2x output more than checkpointing allows.
- DoFn<Integer, String> fn = new CounterFn();
+ DoFn<Integer, String> fn = new CounterFn(Integer.MAX_VALUE);
Instant base = Instant.now();
int baseIndex = 42;
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index a2e5c16..1b809c2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.transforms;
+import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
@@ -545,11 +546,15 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* returned by {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
* <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
* <li>The type of restrictions used by all of these methods must be the same.
+ * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to
+ * indicate whether there is more work to be done for the current element.
* <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such as
* {@link BoundedWindow}.
* <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or
* {@link UnboundedPerElement}, but not both at the same time. If it's not annotated with
- * either of these, it's assumed to be {@link BoundedPerElement}.
+ * either of these, it's assumed to be {@link BoundedPerElement} if its {@link
+ * ProcessElement} method returns {@code void} and {@link UnboundedPerElement} if it
+ * returns a {@link ProcessContinuation}.
* </ul>
*
* <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods.
@@ -677,8 +682,48 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
@Experimental(Kind.SPLITTABLE_DO_FN)
public @interface UnboundedPerElement {}
- /** Temporary, do not use. See https://issues.apache.org/jira/browse/BEAM-1904 */
- public class ProcessContinuation {}
+ // This can't be put into ProcessContinuation itself due to the following problem:
+ // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
+ private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
+ new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO);
+
+ /**
+ * When used as a return value of {@link ProcessElement}, indicates whether there is more work to
+ * be done for the current element.
+ *
+ * <p>If the {@link ProcessElement} call completes because of a failed {@code tryClaim()} call
+ * on the {@link RestrictionTracker}, then the call MUST return {@link #stop()}.
+ */
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ @AutoValue
+ public abstract static class ProcessContinuation {
+ /** Indicates that there is no more work to be done for the current element. */
+ public static ProcessContinuation stop() {
+ return PROCESS_CONTINUATION_STOP;
+ }
+
+ /** Indicates that there is more work to be done for the current element. */
+ public static ProcessContinuation resume() {
+ return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO);
+ }
+
+ /**
+ * If false, the {@link DoFn} promises that there is no more work remaining for the current
+ * element, so the runner should not resume the {@link ProcessElement} call.
+ */
+ public abstract boolean shouldResume();
+
+ /**
+ * A minimum duration that should elapse between the end of this {@link ProcessElement} call and
+ * the {@link ProcessElement} call continuing processing of the same element. By default, zero.
+ */
+ public abstract Duration resumeDelay();
+
+ /** Builder method to set the value of {@link #resumeDelay()}. */
+ public ProcessContinuation withResumeDelay(Duration resumeDelay) {
+ return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeDelay);
+ }
+ }
/**
* Finalize the {@link DoFn} construction to prepare for processing.
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 8378204..cf96c9b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -49,7 +49,6 @@ import net.bytebuddy.implementation.bytecode.Throw;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
-import net.bytebuddy.implementation.bytecode.constant.NullConstant;
import net.bytebuddy.implementation.bytecode.constant.TextConstant;
import net.bytebuddy.implementation.bytecode.member.FieldAccess;
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
@@ -641,6 +640,17 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
* {@link ProcessElement} method.
*/
private static final class ProcessElementDelegation extends DoFnMethodDelegation {
+ private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD;
+
+ static {
+ try {
+ PROCESS_CONTINUATION_STOP_METHOD =
+ new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop"));
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("Failed to locate ProcessContinuation.stop()");
+ }
+ }
+
private final DoFnSignature.ProcessElementMethod signature;
/** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
@@ -677,7 +687,12 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
@Override
protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
- return new StackManipulation.Compound(NullConstant.INSTANCE, MethodReturn.REFERENCE);
+ if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) {
+ return new StackManipulation.Compound(
+ MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE);
+ } else {
+ return MethodReturn.of(targetMethod.getReturnType().asErasure());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 3b22fda..8b41fee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -54,8 +54,8 @@ public interface DoFnInvoker<InputT, OutputT> {
* Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
*
* @param extra Factory for producing extra parameter objects (such as window), if necessary.
- * @return {@code null} - see <a href="https://issues.apache.org/jira/browse/BEAM-1904">JIRA</a>
- * tracking the complete removal of {@link DoFn.ProcessContinuation}.
+ * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link
+ * DoFn.ProcessContinuation#stop()} if it returns {@code void}.
*/
DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 6eeed8e..bfad69e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
@@ -433,16 +434,21 @@ public abstract class DoFnSignature {
@Nullable
public abstract TypeDescriptor<? extends BoundedWindow> windowT();
+ /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */
+ public abstract boolean hasReturnValue();
+
static ProcessElementMethod create(
Method targetMethod,
List<Parameter> extraParameters,
TypeDescriptor<?> trackerT,
- @Nullable TypeDescriptor<? extends BoundedWindow> windowT) {
+ @Nullable TypeDescriptor<? extends BoundedWindow> windowT,
+ boolean hasReturnValue) {
return new AutoValue_DoFnSignature_ProcessElementMethod(
targetMethod,
Collections.unmodifiableList(extraParameters),
trackerT,
- windowT);
+ windowT,
+ hasReturnValue);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 1b27e66..de57c3b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.transforms.reflect;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicates;
@@ -440,6 +442,8 @@ public class DoFnSignatures {
* <li>If the {@link DoFn} (or any of its supertypes) is annotated as {@link
* DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, use that. Only one of
* these must be specified.
+ * <li>If {@link DoFn.ProcessElement} returns {@link DoFn.ProcessContinuation}, assume it is
+ * unbounded. Otherwise (if it returns {@code void}), assume it is bounded.
* <li>If {@link DoFn.ProcessElement} returns {@code void}, but the {@link DoFn} is annotated
* {@link DoFn.UnboundedPerElement}, this is an error.
* </ol>
@@ -465,7 +469,10 @@ public class DoFnSignatures {
}
if (processElement.isSplittable()) {
if (isBounded == null) {
- isBounded = PCollection.IsBounded.BOUNDED;
+ isBounded =
+ processElement.hasReturnValue()
+ ? PCollection.IsBounded.UNBOUNDED
+ : PCollection.IsBounded.BOUNDED;
}
} else {
errors.checkArgument(
@@ -474,6 +481,7 @@ public class DoFnSignatures {
+ ((isBounded == PCollection.IsBounded.BOUNDED)
? DoFn.BoundedPerElement.class.getSimpleName()
: DoFn.UnboundedPerElement.class.getSimpleName()));
+ checkState(!processElement.hasReturnValue(), "Should have been inferred splittable");
isBounded = PCollection.IsBounded.BOUNDED;
}
return isBounded;
@@ -710,8 +718,10 @@ public class DoFnSignatures {
TypeDescriptor<?> outputT,
FnAnalysisContext fnContext) {
errors.checkArgument(
- void.class.equals(m.getReturnType()),
- "Must return void");
+ void.class.equals(m.getReturnType())
+ || DoFn.ProcessContinuation.class.equals(m.getReturnType()),
+ "Must return void or %s",
+ DoFn.ProcessContinuation.class.getSimpleName());
MethodAnalysisContext methodContext = MethodAnalysisContext.create();
@@ -751,7 +761,11 @@ public class DoFnSignatures {
}
return DoFnSignature.ProcessElementMethod.create(
- m, methodContext.getExtraParameters(), trackerT, windowT);
+ m,
+ methodContext.getExtraParameters(),
+ trackerT,
+ windowT,
+ DoFn.ProcessContinuation.class.equals(m.getReturnType()));
}
private static void checkParameterOneOf(
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 62c10a7..4987409 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.MoreObjects;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
@@ -100,4 +101,13 @@ public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
lastAttemptedOffset + 1,
range.getTo());
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("range", range)
+ .add("lastClaimedOffset", lastClaimedOffset)
+ .add("lastAttemptedOffset", lastAttemptedOffset)
+ .toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 27ef68f..8cb0a6b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -31,10 +31,13 @@ public interface RestrictionTracker<RestrictionT> {
RestrictionT currentRestriction();
/**
- * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible.
- * Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the work:
- * the old value of {@link #currentRestriction} is equivalent to the new value and the return
- * value of this method combined. Must be called at most once on a given object.
+ * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible:
+ * after this method returns, the tracker MUST refuse all future claim calls, and {@link
+ * #checkDone} MUST succeed.
+ *
+ * <p>Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the
+ * work: the old value of {@link #currentRestriction} is equivalent to the new value and the
+ * return value of this method combined. Must be called at most once on a given object.
*/
RestrictionT checkpoint();
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index cb60f9a..d2d2529 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -19,10 +19,10 @@ package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.testing.TestPipeline.testingPipelineOptions;
-import static org.hamcrest.Matchers.greaterThan;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Ordering;
@@ -33,7 +33,6 @@ import java.util.List;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
@@ -74,10 +73,16 @@ public class SplittableDoFnTest implements Serializable {
static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
@ProcessElement
- public void process(ProcessContext c, OffsetRangeTracker tracker) {
- for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
+ public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
+ for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
+ tracker.tryClaim(i);
+ ++i, ++numIterations) {
c.output(KV.of(c.element(), (int) i));
+ if (numIterations % 3 == 0) {
+ return resume();
+ }
}
+ return stop();
}
@GetInitialRestriction
@@ -206,10 +211,10 @@ public class SplittableDoFnTest implements Serializable {
private static class SDFWithMultipleOutputsPerBlock extends DoFn<String, Integer> {
private static final int MAX_INDEX = 98765;
- private final TupleTag<Integer> numProcessCalls;
+ private final int numClaimsPerCall;
- private SDFWithMultipleOutputsPerBlock(TupleTag<Integer> numProcessCalls) {
- this.numProcessCalls = numProcessCalls;
+ private SDFWithMultipleOutputsPerBlock(int numClaimsPerCall) {
+ this.numClaimsPerCall = numClaimsPerCall;
}
private static int snapToNextBlock(int index, int[] blockStarts) {
@@ -222,15 +227,20 @@ public class SplittableDoFnTest implements Serializable {
}
@ProcessElement
- public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
+ public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
- c.output(numProcessCalls, 1);
- for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
+ for (int i = trueStart, numIterations = 1;
+ tracker.tryClaim(blockStarts[i]);
+ ++i, ++numIterations) {
for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
c.output(index);
}
+ if (numIterations == numClaimsPerCall) {
+ return resume();
+ }
}
+ return stop();
}
@GetInitialRestriction
@@ -242,26 +252,10 @@ public class SplittableDoFnTest implements Serializable {
@Test
@Category({ValidatesRunner.class, UsesSplittableParDo.class})
public void testOutputAfterCheckpoint() throws Exception {
- TupleTag<Integer> main = new TupleTag<>();
- TupleTag<Integer> numProcessCalls = new TupleTag<>();
- PCollectionTuple outputs =
- p.apply(Create.of("foo"))
- .apply(
- ParDo.of(new SDFWithMultipleOutputsPerBlock(numProcessCalls))
- .withOutputTags(main, TupleTagList.of(numProcessCalls)));
- PAssert.thatSingleton(outputs.get(main).apply(Count.<Integer>globally()))
+ PCollection<Integer> outputs = p.apply(Create.of("foo"))
+ .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock(3)));
+ PAssert.thatSingleton(outputs.apply(Count.<Integer>globally()))
.isEqualTo((long) SDFWithMultipleOutputsPerBlock.MAX_INDEX);
- // Verify that more than 1 process() call was involved, i.e. that there was checkpointing.
- PAssert.thatSingleton(
- outputs.get(numProcessCalls).setCoder(VarIntCoder.of()).apply(Sum.integersGlobally()))
- .satisfies(
- new SerializableFunction<Integer, Void>() {
- @Override
- public Void apply(Integer input) {
- assertThat(input, greaterThan(1));
- return null;
- }
- });
p.run();
}
@@ -341,12 +335,12 @@ public class SplittableDoFnTest implements Serializable {
extends DoFn<Integer, KV<String, Integer>> {
private static final int MAX_INDEX = 98765;
private final PCollectionView<String> sideInput;
- private final TupleTag<Integer> numProcessCalls;
+ private final int numClaimsPerCall;
public SDFWithMultipleOutputsPerBlockAndSideInput(
- PCollectionView<String> sideInput, TupleTag<Integer> numProcessCalls) {
+ PCollectionView<String> sideInput, int numClaimsPerCall) {
this.sideInput = sideInput;
- this.numProcessCalls = numProcessCalls;
+ this.numClaimsPerCall = numClaimsPerCall;
}
private static int snapToNextBlock(int index, int[] blockStarts) {
@@ -359,15 +353,20 @@ public class SplittableDoFnTest implements Serializable {
}
@ProcessElement
- public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
+ public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
- c.output(numProcessCalls, 1);
- for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
+ for (int i = trueStart, numIterations = 1;
+ tracker.tryClaim(blockStarts[i]);
+ ++i, ++numIterations) {
for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index));
}
+ if (numIterations == numClaimsPerCall) {
+ return resume();
+ }
}
+ return stop();
}
@GetInitialRestriction
@@ -400,15 +399,14 @@ public class SplittableDoFnTest implements Serializable {
.apply("window 2", Window.<String>into(FixedWindows.of(Duration.millis(2))))
.apply("singleton", View.<String>asSingleton());
- TupleTag<KV<String, Integer>> main = new TupleTag<>();
- TupleTag<Integer> numProcessCalls = new TupleTag<>();
- PCollectionTuple res =
+ PCollection<KV<String, Integer>> res =
mainInput.apply(
- ParDo.of(new SDFWithMultipleOutputsPerBlockAndSideInput(sideInput, numProcessCalls))
- .withSideInputs(sideInput)
- .withOutputTags(main, TupleTagList.of(numProcessCalls)));
+ ParDo.of(
+ new SDFWithMultipleOutputsPerBlockAndSideInput(
+ sideInput, 3 /* numClaimsPerCall */))
+ .withSideInputs(sideInput));
PCollection<KV<String, Iterable<Integer>>> grouped =
- res.get(main).apply(GroupByKey.<String, Integer>create());
+ res.apply(GroupByKey.<String, Integer>create());
PAssert.that(grouped.apply(Keys.<String>create()))
.containsInAnyOrder("a:0", "a:1", "b:2", "b:3");
@@ -427,22 +425,6 @@ public class SplittableDoFnTest implements Serializable {
return null;
}
});
-
- // Verify that more than 1 process() call was involved, i.e. that there was checkpointing.
- PAssert.thatSingleton(
- res.get(numProcessCalls)
- .setCoder(VarIntCoder.of())
- .apply(Sum.integersGlobally().withoutDefaults()))
- // This should hold in all windows, but verifying a particular window is sufficient.
- .inOnlyPane(new IntervalWindow(new Instant(0), new Instant(1)))
- .satisfies(
- new SerializableFunction<Integer, Void>() {
- @Override
- public Void apply(Integer input) {
- assertThat(input, greaterThan(1));
- return null;
- }
- });
p.run();
// TODO: also test coverage when some of the windows of the side input are not ready.
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 3edb194..2098c66 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.transforms.reflect;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
@@ -89,8 +91,8 @@ public class DoFnInvokersTest {
when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
}
- private void invokeProcessElement(DoFn<String, String> fn) {
- DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
+ private DoFn.ProcessContinuation invokeProcessElement(DoFn<String, String> fn) {
+ return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
}
private void invokeOnTimer(String timerId, DoFn<String, String> fn) {
@@ -119,7 +121,7 @@ public class DoFnInvokersTest {
public void processElement(ProcessContext c) throws Exception {}
}
MockFn mockFn = mock(MockFn.class);
- invokeProcessElement(mockFn);
+ assertEquals(stop(), invokeProcessElement(mockFn));
verify(mockFn).processElement(mockProcessContext);
}
@@ -140,7 +142,7 @@ public class DoFnInvokersTest {
public void testDoFnWithProcessElementInterface() throws Exception {
IdentityUsingInterfaceWithProcessElement fn =
mock(IdentityUsingInterfaceWithProcessElement.class);
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
verify(fn).processElement(mockProcessContext);
}
@@ -161,14 +163,14 @@ public class DoFnInvokersTest {
@Test
public void testDoFnWithMethodInSuperclass() throws Exception {
IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class);
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
verify(fn).process(mockProcessContext);
}
@Test
public void testDoFnWithMethodInSubclass() throws Exception {
IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class);
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
verify(fn).process(mockProcessContext);
}
@@ -179,7 +181,7 @@ public class DoFnInvokersTest {
public void processElement(ProcessContext c, IntervalWindow w) throws Exception {}
}
MockFn fn = mock(MockFn.class);
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
verify(fn).processElement(mockProcessContext, mockWindow);
}
@@ -203,7 +205,7 @@ public class DoFnInvokersTest {
throws Exception {}
}
MockFn fn = mock(MockFn.class);
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
verify(fn).processElement(mockProcessContext, mockState);
}
@@ -229,11 +231,35 @@ public class DoFnInvokersTest {
public void onTimer() {}
}
MockFn fn = mock(MockFn.class);
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
verify(fn).processElement(mockProcessContext, mockTimer);
}
@Test
+ public void testDoFnWithReturn() throws Exception {
+ class MockFn extends DoFn<String, String> {
+ @DoFn.ProcessElement
+ public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker)
+ throws Exception {
+ return null;
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(String element) {
+ return null;
+ }
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+ }
+ MockFn fn = mock(MockFn.class);
+ when(fn.processElement(mockProcessContext, null)).thenReturn(resume());
+ assertEquals(resume(), invokeProcessElement(fn));
+ }
+
+ @Test
public void testDoFnWithStartBundleSetupTeardown() throws Exception {
class MockFn extends DoFn<String, String> {
@ProcessElement
@@ -288,7 +314,9 @@ public class DoFnInvokersTest {
/** Public so Mockito can do "delegatesTo()" in the test below. */
public static class MockFn extends DoFn<String, String> {
@ProcessElement
- public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {}
+ public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) {
+ return null;
+ }
@GetInitialRestriction
public SomeRestriction getInitialRestriction(String element) {
@@ -340,7 +368,7 @@ public class DoFnInvokersTest {
.splitRestriction(
eq("blah"), same(restriction), Mockito.<DoFn.OutputReceiver<SomeRestriction>>any());
when(fn.newTracker(restriction)).thenReturn(tracker);
- fn.processElement(mockProcessContext, tracker);
+ when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume());
assertEquals(coder, invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault()));
assertEquals(restriction, invoker.invokeGetInitialRestriction("blah"));
@@ -356,6 +384,8 @@ public class DoFnInvokersTest {
});
assertEquals(Arrays.asList(part1, part2, part3), outputs);
assertEquals(tracker, invoker.invokeNewTracker(restriction));
+ assertEquals(
+ resume(),
invoker.invokeProcessElement(
new FakeArgumentProvider<String, String>() {
@Override
@@ -367,7 +397,7 @@ public class DoFnInvokersTest {
public RestrictionTracker<?> restrictionTracker() {
return tracker;
}
- });
+ }));
}
private static class RestrictionWithDefaultTracker
@@ -441,7 +471,7 @@ public class DoFnInvokersTest {
assertEquals("foo", output);
}
});
- invoker.invokeProcessElement(mockArgumentProvider);
+ assertEquals(stop(), invoker.invokeProcessElement(mockArgumentProvider));
assertThat(
invoker.invokeNewTracker(new RestrictionWithDefaultTracker()),
instanceOf(DefaultTracker.class));
@@ -531,14 +561,14 @@ public class DoFnInvokersTest {
@Test
public void testLocalPrivateDoFnClass() throws Exception {
PrivateDoFnClass fn = mock(PrivateDoFnClass.class);
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
verify(fn).processThis(mockProcessContext);
}
@Test
public void testStaticPackagePrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass());
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockProcessContext);
}
@@ -546,28 +576,28 @@ public class DoFnInvokersTest {
public void testInnerPackagePrivateDoFnClass() throws Exception {
DoFn<String, String> fn =
mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass());
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockProcessContext);
}
@Test
public void testStaticPrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass());
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockProcessContext);
}
@Test
public void testInnerPrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass());
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockProcessContext);
}
@Test
public void testAnonymousInnerDoFn() throws Exception {
DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass());
- invokeProcessElement(fn);
+ assertEquals(stop(), invokeProcessElement(fn));
DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockProcessContext);
}
@@ -604,6 +634,31 @@ public class DoFnInvokersTest {
}
@Test
+ public void testProcessElementExceptionWithReturn() throws Exception {
+ thrown.expect(UserCodeException.class);
+ thrown.expectMessage("bogus");
+ DoFnInvokers.invokerFor(
+ new DoFn<Integer, Integer>() {
+ @ProcessElement
+ public ProcessContinuation processElement(
+ @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) {
+ throw new IllegalArgumentException("bogus");
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+ })
+ .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>());
+ }
+
+ @Test
public void testStartBundleException() throws Exception {
DoFnInvoker<Integer, Integer> invoker =
DoFnInvokers.invokerFor(
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
index d321f54..44ae5c4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -50,7 +50,7 @@ public class DoFnSignaturesProcessElementTest {
@Test
public void testBadReturnType() throws Exception {
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Must return void");
+ thrown.expectMessage("Must return void or ProcessContinuation");
analyzeProcessElementMethod(
new AnonymousMethod() {
http://git-wip-us.apache.org/repos/asf/beam/blob/1bff4a78/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 07b3348..08af65e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -52,7 +52,8 @@ import org.junit.runners.JUnit4;
public class DoFnSignaturesSplittableDoFnTest {
@Rule public ExpectedException thrown = ExpectedException.none();
- private static class SomeRestriction {}
+ private abstract static class SomeRestriction
+ implements HasDefaultTracker<SomeRestriction, SomeRestrictionTracker> {}
private abstract static class SomeRestrictionTracker
implements RestrictionTracker<SomeRestriction> {}
@@ -60,6 +61,20 @@ public class DoFnSignaturesSplittableDoFnTest {
private abstract static class SomeRestrictionCoder extends StructuredCoder<SomeRestriction> {}
@Test
+ public void testReturnsProcessContinuation() throws Exception {
+ DoFnSignature.ProcessElementMethod signature =
+ analyzeProcessElementMethod(
+ new AnonymousMethod() {
+ private DoFn.ProcessContinuation method(
+ DoFn<Integer, String>.ProcessContext context) {
+ return null;
+ }
+ });
+
+ assertTrue(signature.hasReturnValue());
+ }
+
+ @Test
public void testHasRestrictionTracker() throws Exception {
DoFnSignature.ProcessElementMethod signature =
analyzeProcessElementMethod(
@@ -100,11 +115,6 @@ public class DoFnSignaturesSplittableDoFnTest {
public SomeRestriction getInitialRestriction(Integer element) {
return null;
}
-
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
- return null;
- }
}
@BoundedPerElement
@@ -130,6 +140,55 @@ public class DoFnSignaturesSplittableDoFnTest {
.isBoundedPerElement());
}
+ private static class BaseFnWithoutContinuation extends DoFn<Integer, String> {
+ @ProcessElement
+ public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+ }
+
+ private static class BaseFnWithContinuation extends DoFn<Integer, String> {
+ @ProcessElement
+ public ProcessContinuation processElement(
+ ProcessContext context, SomeRestrictionTracker tracker) {
+ return null;
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+ }
+
+ @Test
+ public void testSplittableBoundednessInferredFromReturnValue() throws Exception {
+ assertEquals(
+ PCollection.IsBounded.BOUNDED,
+ DoFnSignatures.getSignature(BaseFnWithoutContinuation.class).isBoundedPerElement());
+ assertEquals(
+ PCollection.IsBounded.UNBOUNDED,
+ DoFnSignatures.getSignature(BaseFnWithContinuation.class).isBoundedPerElement());
+ }
+
+ @Test
+ public void testSplittableRespectsBoundednessAnnotation() throws Exception {
+ @BoundedPerElement
+ class BoundedFnWithContinuation extends BaseFnWithContinuation {}
+
+ assertEquals(
+ PCollection.IsBounded.BOUNDED,
+ DoFnSignatures.getSignature(BoundedFnWithContinuation.class).isBoundedPerElement());
+
+ @UnboundedPerElement
+ class UnboundedFnWithContinuation extends BaseFnWithContinuation {}
+
+ assertEquals(
+ PCollection.IsBounded.UNBOUNDED,
+ DoFnSignatures.getSignature(UnboundedFnWithContinuation.class).isBoundedPerElement());
+ }
@Test
public void testUnsplittableIsBounded() throws Exception {
class UnsplittableFn extends DoFn<Integer, String> {
@@ -172,8 +231,10 @@ public class DoFnSignaturesSplittableDoFnTest {
public void testSplittableWithAllFunctions() throws Exception {
class GoodSplittableDoFn extends DoFn<Integer, String> {
@ProcessElement
- public void processElement(
- ProcessContext context, SomeRestrictionTracker tracker) {}
+ public ProcessContinuation processElement(
+ ProcessContext context, SomeRestrictionTracker tracker) {
+ return null;
+ }
@GetInitialRestriction
public SomeRestriction getInitialRestriction(Integer element) {
@@ -198,6 +259,7 @@ public class DoFnSignaturesSplittableDoFnTest {
DoFnSignature signature = DoFnSignatures.getSignature(GoodSplittableDoFn.class);
assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
assertTrue(signature.processElement().isSplittable());
+ assertTrue(signature.processElement().hasReturnValue());
assertEquals(
SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
@@ -214,7 +276,9 @@ public class DoFnSignaturesSplittableDoFnTest {
public void testSplittableWithAllFunctionsGeneric() throws Exception {
class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> {
@ProcessElement
- public void processElement(ProcessContext context, TrackerT tracker) {}
+ public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) {
+ return null;
+ }
@GetInitialRestriction
public RestrictionT getInitialRestriction(Integer element) {
@@ -242,6 +306,7 @@ public class DoFnSignaturesSplittableDoFnTest {
SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass());
assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
assertTrue(signature.processElement().isSplittable());
+ assertTrue(signature.processElement().hasReturnValue());
assertEquals(
SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
[2/2] beam git commit: This closes #3360: [BEAM-2447] Reintroduces
DoFn.ProcessContinuation
Posted by jk...@apache.org.
This closes #3360: [BEAM-2447] Reintroduces DoFn.ProcessContinuation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/66b4a1be
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/66b4a1be
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/66b4a1be
Branch: refs/heads/master
Commit: 66b4a1be09b58a761cf49cc18a04eaaff555e376
Parents: 91c7d3d 1bff4a7
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Jul 12 16:16:05 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Jul 12 16:16:05 2017 -0700
----------------------------------------------------------------------
.../core/construction/SplittableParDoTest.java | 10 +-
...eBoundedSplittableProcessElementInvoker.java | 35 ++++++-
.../core/SplittableParDoViaKeyedWorkItems.java | 9 +-
.../core/SplittableProcessElementInvoker.java | 25 ++++-
...ndedSplittableProcessElementInvokerTest.java | 45 +++++++--
.../core/SplittableParDoProcessFnTest.java | 99 ++++++++++++++++--
.../org/apache/beam/sdk/transforms/DoFn.java | 51 +++++++++-
.../reflect/ByteBuddyDoFnInvokerFactory.java | 19 +++-
.../sdk/transforms/reflect/DoFnInvoker.java | 4 +-
.../sdk/transforms/reflect/DoFnSignature.java | 10 +-
.../sdk/transforms/reflect/DoFnSignatures.java | 22 +++-
.../splittabledofn/OffsetRangeTracker.java | 10 ++
.../splittabledofn/RestrictionTracker.java | 11 +-
.../beam/sdk/transforms/SplittableDoFnTest.java | 100 ++++++++-----------
.../transforms/reflect/DoFnInvokersTest.java | 93 +++++++++++++----
.../DoFnSignaturesProcessElementTest.java | 2 +-
.../DoFnSignaturesSplittableDoFnTest.java | 83 +++++++++++++--
17 files changed, 487 insertions(+), 141 deletions(-)
----------------------------------------------------------------------