You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/09 18:46:54 UTC
[1/4] incubator-beam git commit: Refactor CompletionCallbacks
Repository: incubator-beam
Updated Branches:
refs/heads/master da1b7556b -> 272493ed7
Refactor CompletionCallbacks
The default and timerful completion callbacks are identical, excepting
their calls to evaluationContext.commitResult; factor that code into a
common location.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e7df160a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e7df160a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e7df160a
Branch: refs/heads/master
Commit: e7df160a2cde6dead6c4f7e0ec0aaa5e4808239d
Parents: 9b9d73f
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 3 13:22:13 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue May 3 14:27:22 2016 -0700
----------------------------------------------------------------------
.../direct/ExecutorServiceParallelExecutor.java | 49 ++++++++++++--------
1 file changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e7df160a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 18af363..6f26b6b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -210,16 +210,20 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
}
/**
- * The default {@link CompletionCallback}. The default completion callback is used to complete
- * transform evaluations that are triggered due to the arrival of elements from an upstream
- * transform, or for a source transform.
+ * The base implementation of {@link CompletionCallback} that provides implementations for
+ * {@link #handleResult(CommittedBundle, InProcessTransformResult)} and
+ * {@link #handleThrowable(CommittedBundle, Throwable)}, given an implementation of
+ * {@link #getCommittedResult(CommittedBundle, InProcessTransformResult)}.
*/
- private class DefaultCompletionCallback implements CompletionCallback {
+ private abstract class CompletionCallbackBase implements CompletionCallback {
+ protected abstract CommittedResult getCommittedResult(
+ CommittedBundle<?> inputBundle,
+ InProcessTransformResult result);
+
@Override
- public CommittedResult handleResult(
+ public final CommittedResult handleResult(
CommittedBundle<?> inputBundle, InProcessTransformResult result) {
- CommittedResult committedResult =
- evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result);
+ CommittedResult committedResult = getCommittedResult(inputBundle, result);
for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
}
@@ -233,12 +237,27 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
}
/**
+ * The default {@link CompletionCallback}. The default completion callback is used to complete
+ * transform evaluations that are triggered due to the arrival of elements from an upstream
+ * transform, or for a source transform.
+ */
+ private class DefaultCompletionCallback extends CompletionCallbackBase {
+ @Override
+ public CommittedResult getCommittedResult(
+ CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+ return evaluationContext.handleResult(inputBundle,
+ Collections.<TimerData>emptyList(),
+ result);
+ }
+ }
+
+ /**
* A {@link CompletionCallback} where the completed bundle was produced to deliver some collection
* of {@link TimerData timers}. When the evaluator completes successfully, reports all of the
* timers used to create the input to the {@link InProcessEvaluationContext evaluation context}
* as part of the result.
*/
- private class TimerCompletionCallback implements CompletionCallback {
+ private class TimerCompletionCallback extends CompletionCallbackBase {
private final Iterable<TimerData> timers;
private TimerCompletionCallback(Iterable<TimerData> timers) {
@@ -246,19 +265,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
}
@Override
- public CommittedResult handleResult(
+ public CommittedResult getCommittedResult(
CommittedBundle<?> inputBundle, InProcessTransformResult result) {
- CommittedResult committedResult =
- evaluationContext.handleResult(inputBundle, timers, result);
- for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
- allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
- }
- return committedResult;
- }
-
- @Override
- public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
- allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+ return evaluationContext.handleResult(inputBundle, timers, result);
}
}
[4/4] incubator-beam git commit: This closes #283
Posted by ke...@apache.org.
This closes #283
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/272493ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/272493ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/272493ed
Branch: refs/heads/master
Commit: 272493ed7c86b81f3573ea341f6a2ff138da9800
Parents: da1b755 0518fc6
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 9 11:46:21 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 9 11:46:21 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/CommittedResult.java | 16 ++-
.../direct/ExecutorServiceParallelExecutor.java | 65 +++++-----
.../direct/InMemoryWatermarkManager.java | 7 +-
.../direct/InProcessEvaluationContext.java | 6 +-
.../direct/InProcessTransformResult.java | 7 +
.../runners/direct/StepTransformResult.java | 17 +++
.../runners/direct/CommittedResultTest.java | 34 ++++-
.../direct/InMemoryWatermarkManagerTest.java | 127 ++++++++++++++++---
.../runners/direct/TransformExecutorTest.java | 11 +-
9 files changed, 239 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/272493ed/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/272493ed/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
[3/4] incubator-beam git commit: Allow InProcess TransformEvaluators
to refuse inputs
Posted by ke...@apache.org.
Allow InProcess TransformEvaluators to refuse inputs
Inputs that cannot be processed (generally due to a side input not being
ready) can be added to a list of unprocessed elements, which will
schedule them to be executed at a later point.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0518fc68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0518fc68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0518fc68
Branch: refs/heads/master
Commit: 0518fc687febca6dca75159924265fdc6196a572
Parents: 59cca8d
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 3 13:24:25 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 6 11:17:37 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/CommittedResult.java | 16 ++-
.../direct/ExecutorServiceParallelExecutor.java | 6 +
.../direct/InMemoryWatermarkManager.java | 7 +-
.../direct/InProcessEvaluationContext.java | 6 +-
.../direct/InProcessTransformResult.java | 7 +
.../runners/direct/StepTransformResult.java | 17 +++
.../runners/direct/CommittedResultTest.java | 34 ++++-
.../direct/InMemoryWatermarkManagerTest.java | 127 ++++++++++++++++---
.../runners/direct/TransformExecutorTest.java | 11 +-
9 files changed, 209 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index d15e012..4a42e34 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -23,6 +23,8 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+
/**
* A {@link InProcessTransformResult} that has been committed.
*/
@@ -34,13 +36,25 @@ abstract class CommittedResult {
public abstract AppliedPTransform<?, ?, ?> getTransform();
/**
+ * Returns the {@link CommittedBundle} that contains the input elements that could not be
+ * processed by the evaluation.
+ *
+ * <p>{@code null} if the input bundle was null.
+ */
+ @Nullable
+ public abstract CommittedBundle<?> getUnprocessedInputs();
+
+ /**
* Returns the outputs produced by the transform.
*/
public abstract Iterable<? extends CommittedBundle<?>> getOutputs();
public static CommittedResult create(
- InProcessTransformResult original, Iterable<? extends CommittedBundle<?>> outputs) {
+ InProcessTransformResult original,
+ CommittedBundle<?> unprocessedElements,
+ Iterable<? extends CommittedBundle<?>> outputs) {
return new AutoValue_CommittedResult(original.getTransform(),
+ unprocessedElements,
outputs);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index fd4cc2c..570ddc4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -37,6 +37,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -230,6 +231,11 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,
valueToConsumers.get(outputBundle.getPCollection())));
}
+ CommittedBundle<?> unprocessedInputs = committedResult.getUnprocessedInputs();
+ if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) {
+ allUpdates.offer(ExecutorUpdate.fromBundle(unprocessedInputs,
+ Collections.<AppliedPTransform<?, ?, ?>>singleton(committedResult.getTransform())));
+ }
return committedResult;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
index 4d5a3a1..87ea4d5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -849,8 +849,6 @@ public class InMemoryWatermarkManager {
CommittedBundle<?> input,
TimerUpdate timerUpdate,
CommittedResult result) {
- TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform());
-
// Newly pending elements must be added before completed elements are removed, as the two
// do not share a Mutex within this call and thus can be interleaved with external calls to
// refresh.
@@ -861,6 +859,11 @@ public class InMemoryWatermarkManager {
}
}
+ TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform());
+ if (input != null) {
+ // Add the unprocessed inputs
+ completedTransform.addPending(result.getUnprocessedInputs());
+ }
completedTransform.updateTimers(timerUpdate);
if (input != null) {
completedTransform.removePending(input);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index d4f891e..5c19287 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -152,7 +152,11 @@ class InProcessEvaluationContext {
Iterable<? extends CommittedBundle<?>> committedBundles =
commitBundles(result.getOutputBundles());
// Update watermarks and timers
- CommittedResult committedResult = CommittedResult.create(result, committedBundles);
+ CommittedResult committedResult = CommittedResult.create(result,
+ completedBundle == null ?
+ null :
+ completedBundle.withElements((Iterable) result.getUnprocessedElements()),
+ committedBundles);
watermarkManager.updateWatermarks(
completedBundle,
result.getTimerUpdate().withCompletedTimers(completedTimers),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
index a132c33..0bc3ea1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
@@ -22,6 +22,7 @@ import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.CounterSet;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
@@ -45,6 +46,12 @@ public interface InProcessTransformResult {
Iterable<? extends UncommittedBundle<?>> getOutputBundles();
/**
+ * Returns elements that were provided to the {@link TransformEvaluator} as input but were not
+ * processed.
+ */
+ Iterable<? extends WindowedValue<?>> getUnprocessedElements();
+
+ /**
* Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did
* not use a {@link CounterSet}.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 46e7d04..b2e3897 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.CounterSet;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
@@ -41,6 +42,7 @@ import javax.annotation.Nullable;
public class StepTransformResult implements InProcessTransformResult {
private final AppliedPTransform<?, ?, ?> transform;
private final Iterable<? extends UncommittedBundle<?>> bundles;
+ private final Iterable<? extends WindowedValue<?>> unprocessedElements;
@Nullable private final CopyOnAccessInMemoryStateInternals<?> state;
private final TimerUpdate timerUpdate;
@Nullable private final CounterSet counters;
@@ -49,12 +51,14 @@ public class StepTransformResult implements InProcessTransformResult {
private StepTransformResult(
AppliedPTransform<?, ?, ?> transform,
Iterable<? extends UncommittedBundle<?>> outputBundles,
+ Iterable<? extends WindowedValue<?>> unprocessedElements,
CopyOnAccessInMemoryStateInternals<?> state,
TimerUpdate timerUpdate,
CounterSet counters,
Instant watermarkHold) {
this.transform = checkNotNull(transform);
this.bundles = checkNotNull(outputBundles);
+ this.unprocessedElements = checkNotNull(unprocessedElements);
this.state = state;
this.timerUpdate = checkNotNull(timerUpdate);
this.counters = counters;
@@ -67,6 +71,11 @@ public class StepTransformResult implements InProcessTransformResult {
}
@Override
+ public Iterable<? extends WindowedValue<?>> getUnprocessedElements() {
+ return unprocessedElements;
+ }
+
+ @Override
public CounterSet getCounters() {
return counters;
}
@@ -113,6 +122,7 @@ public class StepTransformResult implements InProcessTransformResult {
public static class Builder {
private final AppliedPTransform<?, ?, ?> transform;
private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
+ private final ImmutableList.Builder<WindowedValue<?>> unprocessedElementsBuilder;
private CopyOnAccessInMemoryStateInternals<?> state;
private TimerUpdate timerUpdate;
private CounterSet counters;
@@ -122,6 +132,7 @@ public class StepTransformResult implements InProcessTransformResult {
this.transform = transform;
this.watermarkHold = watermarkHold;
this.bundlesBuilder = ImmutableList.builder();
+ this.unprocessedElementsBuilder = ImmutableList.builder();
this.timerUpdate = TimerUpdate.builder(null).build();
}
@@ -129,6 +140,7 @@ public class StepTransformResult implements InProcessTransformResult {
return new StepTransformResult(
transform,
bundlesBuilder.build(),
+ unprocessedElementsBuilder.build(),
state,
timerUpdate,
counters,
@@ -150,6 +162,11 @@ public class StepTransformResult implements InProcessTransformResult {
return this;
}
+ public Builder addUnprocessedElements(Iterable<? extends WindowedValue<?>> unprocessed) {
+ unprocessedElementsBuilder.addAll(unprocessed);
+ return this;
+ }
+
public Builder addOutput(
UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) {
bundlesBuilder.add(outputBundle);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index b30e005..0d1b464 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -18,11 +18,14 @@
package org.apache.beam.runners.direct;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -46,6 +49,7 @@ import java.util.List;
@RunWith(JUnit4.class)
public class CommittedResultTest implements Serializable {
private transient TestPipeline p = TestPipeline.create();
+ private transient PCollection<Integer> created = p.apply(Create.of(1, 2));
private transient AppliedPTransform<?, ?, ?> transform =
AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() {
});
@@ -55,12 +59,38 @@ public class CommittedResultTest implements Serializable {
public void getTransformExtractsFromResult() {
CommittedResult result =
CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+ bundleFactory.createRootBundle(created).commit(Instant.now()),
Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
}
@Test
+ public void getUncommittedElementsEqualInput() {
+ InProcessPipelineRunner.CommittedBundle<Integer> bundle =
+ bundleFactory.createRootBundle(created)
+ .add(WindowedValue.valueInGlobalWindow(2))
+ .commit(Instant.now());
+ CommittedResult result =
+ CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+ bundle,
+ Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+
+ assertThat(result.getUnprocessedInputs(),
+ Matchers.<InProcessPipelineRunner.CommittedBundle<?>>equalTo(bundle));
+ }
+
+ @Test
+ public void getUncommittedElementsNull() {
+ CommittedResult result =
+ CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+ null,
+ Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+
+ assertThat(result.getUnprocessedInputs(), nullValue());
+ }
+
+ @Test
public void getOutputsEqualInput() {
List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs =
ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
@@ -70,7 +100,9 @@ public class CommittedResultTest implements Serializable {
WindowingStrategy.globalDefault(),
PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
CommittedResult result =
- CommittedResult.create(StepTransformResult.withoutHold(transform).build(), outputs);
+ CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+ bundleFactory.createRootBundle(created).commit(Instant.now()),
+ outputs);
assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
index 15cdf8a..b45440d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
@@ -71,6 +71,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nullable;
+
/**
* Tests for {@link InMemoryWatermarkManager}.
*/
@@ -162,6 +164,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(output)),
new Instant(8000L));
TransformWatermarks updatedSourceWatermark =
@@ -181,6 +184,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(intsToFlatten.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -213,6 +217,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(secondPcollectionBundle,
TimerUpdate.empty(),
result(flattened.getProducingTransformInternal(),
+ secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
null);
TransformWatermarks transformAfterProcessing =
@@ -220,6 +225,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(secondPcollectionBundle,
TimerUpdate.empty(),
result(flattened.getProducingTransformInternal(),
+ secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
null);
assertThat(
@@ -237,6 +243,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)),
new Instant(Long.MAX_VALUE));
TransformWatermarks firstSourceWatermarks =
@@ -267,6 +274,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(firstPcollectionBundle,
TimerUpdate.empty(),
result(flattened.getProducingTransformInternal(),
+ firstPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)),
null);
TransformWatermarks afterConsumingAllInput =
@@ -291,6 +299,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
new Instant(Long.MAX_VALUE));
TransformWatermarks createdAfterProducing =
@@ -306,6 +315,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
result(keyed.getProducingTransformInternal(),
+ createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle)),
null);
TransformWatermarks keyedWatermarks =
@@ -325,6 +335,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
result(filtered.getProducingTransformInternal(),
+ createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle)),
null);
TransformWatermarks filteredProcessedWatermarks =
@@ -350,6 +361,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
new Instant(Long.MAX_VALUE));
@@ -360,6 +372,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
result(keyed.getProducingTransformInternal(),
+ createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle)),
new Instant(500L));
TransformWatermarks keyedWatermarks =
@@ -389,17 +402,20 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
ImmutableList.of(firstKeyBundle, secondKeyBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(firstKeyBundle,
TimerUpdate.empty(),
result(filtered.getProducingTransformInternal(),
+ firstKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
new Instant(-1000L));
manager.updateWatermarks(secondKeyBundle,
TimerUpdate.empty(),
result(filtered.getProducingTransformInternal(),
+ secondKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
new Instant(1234L));
@@ -414,6 +430,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(fauxFirstKeyTimerBundle,
TimerUpdate.empty(),
result(filtered.getProducingTransformInternal(),
+ fauxFirstKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -424,6 +441,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(fauxSecondKeyTimerBundle,
TimerUpdate.empty(),
result(filtered.getProducingTransformInternal(),
+ fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
new Instant(5678L));
assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L)));
@@ -431,6 +449,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(fauxSecondKeyTimerBundle,
TimerUpdate.empty(),
result(filtered.getProducingTransformInternal(),
+ fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
assertThat(filteredWatermarks.getOutputWatermark(),
@@ -447,6 +466,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
manager.updateWatermarks(null, TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(firstInput)),
new Instant(0L));
TransformWatermarks firstWatermarks =
@@ -458,6 +478,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(secondInput)),
new Instant(-250L));
TransformWatermarks secondWatermarks =
@@ -473,10 +494,12 @@ public class InMemoryWatermarkManagerTest implements Serializable {
public void updateWatermarkWithHoldsShouldBeMonotonic() {
CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
TimestampedValue.of(1, new Instant(1_000_000L)),
- TimestampedValue.of(2, new Instant(1234L)),
- TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null,
+ TimestampedValue.of(2, new Instant(1234L)),
+ TimestampedValue.of(3, new Instant(-1000L)));
+ manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
new Instant(Long.MAX_VALUE));
@@ -487,6 +510,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
result(keyed.getProducingTransformInternal(),
+ createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle)),
new Instant(500L));
TransformWatermarks keyedWatermarks =
@@ -505,6 +529,40 @@ public class InMemoryWatermarkManagerTest implements Serializable {
assertThat(updatedWatermarks.getOutputWatermark(), equalTo(oldOutputWatermark));
}
+ @Test
+ public void updateWatermarkWithUnprocessedElements() {
+ WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(1);
+ WindowedValue<Integer> second =
+ WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-1000L));
+ WindowedValue<Integer> third =
+ WindowedValue.timestampedValueInGlobalWindow(3, new Instant(1234L));
+ CommittedBundle<Integer> createdBundle = bundleFactory.createRootBundle(createdInts)
+ .add(first)
+ .add(second)
+ .add(third)
+ .commit(clock.now());
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
+ result(createdInts.getProducingTransformInternal(),
+ null,
+ Collections.<CommittedBundle<?>>singleton(createdBundle)),
+ BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+ CommittedBundle<KV<String, Integer>> keyBundle = timestampedBundle(keyed,
+ TimestampedValue.of(KV.of("MyKey", 1), BoundedWindow.TIMESTAMP_MIN_VALUE));
+ manager.updateWatermarks(createdBundle,
+ TimerUpdate.empty(),
+ result(keyed.getProducingTransformInternal(),
+ createdBundle.withElements(ImmutableList.of(second, third)),
+ Collections.<CommittedBundle<?>>singleton(keyBundle)),
+ BoundedWindow.TIMESTAMP_MAX_VALUE);
+ TransformWatermarks keyedWatermarks =
+ manager.getWatermarks(keyed.getProducingTransformInternal());
+ // the unprocessed second and third are readded to pending
+ assertThat(
+ keyedWatermarks.getInputWatermark(), not(laterThan(new Instant(-1000L))));
+ }
+
/**
* Demonstrates that updateWatermarks in the presence of late data is monotonic.
*/
@@ -517,6 +575,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
sourceWatermark);
@@ -528,6 +587,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
result(keyed.getProducingTransformInternal(),
+ createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(keyBundle)),
null);
TransformWatermarks onTimeWatermarks =
@@ -542,6 +602,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(lateDataBundle)),
new Instant(2_000_000L));
TransformWatermarks bufferedLateWm =
@@ -560,6 +621,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(lateDataBundle,
TimerUpdate.empty(),
result(keyed.getProducingTransformInternal(),
+ lateDataBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)),
null);
}
@@ -567,8 +629,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
public void updateWatermarkWithDifferentWindowedValueInstances() {
manager.updateWatermarks(
null,
- TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(),
+ TimerUpdate.empty(), result(createdInts.getProducingTransformInternal(), null,
Collections.<CommittedBundle<?>>singleton(
bundleFactory
.createRootBundle(createdInts)
@@ -576,12 +637,14 @@ public class InMemoryWatermarkManagerTest implements Serializable {
.commit(Instant.now()))),
BoundedWindow.TIMESTAMP_MAX_VALUE);
- manager.updateWatermarks(
- bundleFactory.createRootBundle(createdInts)
- .add(WindowedValue.valueInGlobalWindow(1))
- .commit(Instant.now()),
+ CommittedBundle<Integer> createdBundle = bundleFactory.createRootBundle(createdInts)
+ .add(WindowedValue.valueInGlobalWindow(1))
+ .commit(Instant.now());
+ manager.updateWatermarks(createdBundle,
TimerUpdate.empty(),
- result(keyed.getProducingTransformInternal(), Collections.<CommittedBundle<?>>emptyList()),
+ result(keyed.getProducingTransformInternal(),
+ createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+ Collections.<CommittedBundle<?>>emptyList()),
null);
TransformWatermarks onTimeWatermarks =
manager.getWatermarks(keyed.getProducingTransformInternal());
@@ -598,6 +661,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks updatedSourceWatermarks =
@@ -627,6 +691,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(firstCreateOutput)),
new Instant(12_000L));
@@ -634,6 +699,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(firstCreateOutput,
TimerUpdate.empty(),
result(filtered.getProducingTransformInternal(),
+ firstCreateOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(firstFilterOutput)),
new Instant(10_000L));
TransformWatermarks firstFilterWatermarks =
@@ -645,6 +711,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks updatedSourceWatermarks =
@@ -687,6 +754,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(createOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks createAfterUpdate =
@@ -716,6 +784,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(createOutput,
TimerUpdate.empty(),
result(filtered.getProducingTransformInternal(),
+ createOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filterOutputBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks filterAfterConsumed =
@@ -737,8 +806,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
// @Test
public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
CommittedBundle<Integer> createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8);
- manager.updateWatermarks(null, TimerUpdate.empty(),
+ manager.updateWatermarks(null,
+ TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(createdBundle)),
new Instant(1248L));
@@ -759,6 +830,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(createdBundle,
timers,
result(filtered.getProducingTransformInternal(),
+ createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
Instant startTime = clock.now();
@@ -796,6 +868,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
TimerUpdate.builder("key")
.withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(),
result(filtered.getProducingTransformInternal(),
+ filteredTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredTimerResult)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -809,6 +882,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(filteredTimerResult,
TimerUpdate.empty(),
result(filteredTimesTwo.getProducingTransformInternal(),
+ filteredTimerResult.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now()));
@@ -846,6 +920,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(createOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
TransformWatermarks createAfterUpdate =
@@ -859,6 +934,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(createSecondOutput)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -871,6 +947,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(created)),
new Instant(40_900L));
@@ -881,6 +958,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(created,
TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(),
result(filtered.getProducingTransformInternal(),
+ created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -901,6 +979,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
TimerUpdate.builder("key")
.withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(),
result(filtered.getProducingTransformInternal(),
+ otherCreated.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList()),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -914,6 +993,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>singleton(created)),
new Instant(29_919_235L));
@@ -924,6 +1004,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
created,
TimerUpdate.empty(),
result(filtered.getProducingTransformInternal(),
+ created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(filteredBundle)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -946,7 +1027,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+ result(createdInts.getProducingTransformInternal(),
+ null,
+ Collections.singleton(createdBundle)),
new Instant(1500L));
TimerData earliestTimer =
@@ -966,6 +1049,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(createdBundle,
update,
result(filtered.getProducingTransformInternal(),
+ createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
new Instant(1000L));
@@ -982,6 +1066,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));
Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
@@ -1007,7 +1092,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+ result(createdInts.getProducingTransformInternal(),
+ null,
+ Collections.singleton(createdBundle)),
new Instant(1500L));
TimerData earliestTimer =
@@ -1028,7 +1115,8 @@ public class InMemoryWatermarkManagerTest implements Serializable {
createdBundle,
update,
result(filtered.getProducingTransformInternal(),
- Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
+ createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+ Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
new Instant(1000L));
Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
@@ -1045,6 +1133,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));
Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
@@ -1070,7 +1159,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
manager.updateWatermarks(null,
TimerUpdate.empty(),
- result(createdInts.getProducingTransformInternal(), Collections.singleton(createdBundle)),
+ result(createdInts.getProducingTransformInternal(),
+ null,
+ Collections.singleton(createdBundle)),
new Instant(1500L));
TimerData earliestTimer = TimerData.of(
@@ -1091,6 +1182,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
createdBundle,
update,
result(filtered.getProducingTransformInternal(),
+ createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
new Instant(1000L));
@@ -1109,6 +1201,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
manager.updateWatermarks(null,
TimerUpdate.empty(),
result(createdInts.getProducingTransformInternal(),
+ null,
Collections.<CommittedBundle<?>>emptyList()),
new Instant(50_000L));
Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
@@ -1273,8 +1366,10 @@ public class InMemoryWatermarkManagerTest implements Serializable {
private final CommittedResult result(
AppliedPTransform<?, ?, ?> transform,
+ @Nullable CommittedBundle<?> unprocessedBundle,
Iterable<? extends CommittedBundle<?>> bundles) {
- return CommittedResult.create(StepTransformResult.withoutHold(transform)
- .build(), bundles);
+ return CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+ unprocessedBundle,
+ bundles);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0518fc68/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 959e9d3..8b6053e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -491,7 +491,16 @@ public class TransformExecutorTest {
CommittedBundle<?> inputBundle, InProcessTransformResult result) {
handledResult = result;
onMethod.countDown();
- return CommittedResult.create(result, Collections.<CommittedBundle<?>>emptyList());
+ @SuppressWarnings("rawtypes") Iterable unprocessedElements =
+ result.getUnprocessedElements() == null ?
+ Collections.emptyList() :
+ result.getUnprocessedElements();
+
+ CommittedBundle<?> unprocessedBundle =
+ inputBundle == null ? null : inputBundle.withElements(unprocessedElements);
+ return CommittedResult.create(result,
+ unprocessedBundle,
+ Collections.<CommittedBundle<?>>emptyList());
}
@Override
[2/4] incubator-beam git commit: Use AutoValue for ExecutorUpdate
Posted by ke...@apache.org.
Use AutoValue for ExecutorUpdate
Explicitly provide the collections the Bundle should be consumed by in
the update.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59cca8dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59cca8dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59cca8dd
Branch: refs/heads/master
Commit: 59cca8ddae3d544beea9684719409efe3acbe634
Parents: e7df160
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 6 10:33:32 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 6 10:55:53 2016 -0700
----------------------------------------------------------------------
.../direct/ExecutorServiceParallelExecutor.java | 59 ++++++++++----------
1 file changed, 30 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59cca8dd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 6f26b6b..fd4cc2c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
+import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
@@ -191,8 +192,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
return keyedPValues.contains(pvalue);
}
- private void scheduleConsumers(CommittedBundle<?> bundle) {
- for (AppliedPTransform<?, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) {
+ private void scheduleConsumers(ExecutorUpdate update) {
+ CommittedBundle<?> bundle = update.getBundle().get();
+ for (AppliedPTransform<?, ?, ?> consumer : update.getConsumers()) {
scheduleConsumption(consumer, bundle, defaultCompletionCallback);
}
}
@@ -225,7 +227,8 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
CommittedBundle<?> inputBundle, InProcessTransformResult result) {
CommittedResult committedResult = getCommittedResult(inputBundle, result);
for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
- allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
+ allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,
+ valueToConsumers.get(outputBundle.getPCollection())));
}
return committedResult;
}
@@ -276,38 +279,36 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
*
* Used to signal when the executor should be shut down (due to an exception).
*/
- private static class ExecutorUpdate {
- private final Optional<? extends CommittedBundle<?>> bundle;
- private final Optional<? extends Throwable> throwable;
-
- public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) {
- return new ExecutorUpdate(bundle, null);
+ @AutoValue
+ abstract static class ExecutorUpdate {
+ public static ExecutorUpdate fromBundle(
+ CommittedBundle<?> bundle,
+ Collection<AppliedPTransform<?, ?, ?>> consumers) {
+ return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
+ Optional.of(bundle),
+ consumers,
+ Optional.<Throwable>absent());
}
public static ExecutorUpdate fromThrowable(Throwable t) {
- return new ExecutorUpdate(null, t);
- }
-
- private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable throwable) {
- this.bundle = Optional.fromNullable(producedBundle);
- this.throwable = Optional.fromNullable(throwable);
+ return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
+ Optional.<CommittedBundle<?>>absent(),
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+ Optional.of(t));
}
- public Optional<? extends CommittedBundle<?>> getBundle() {
- return bundle;
- }
+ /**
+ * Returns the bundle that produced this update.
+ */
+ public abstract Optional<? extends CommittedBundle<?>> getBundle();
- public Optional<? extends Throwable> getException() {
- return throwable;
- }
+ /**
+ * Returns the transforms to process the bundle. If nonempty, {@link #getBundle()} will return
+ * a present {@link Optional}.
+ */
+ public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers();
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(ExecutorUpdate.class)
- .add("bundle", bundle)
- .add("exception", throwable)
- .toString();
- }
+ public abstract Optional<? extends Throwable> getException();
}
/**
@@ -353,7 +354,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
while (update != null) {
LOG.debug("Executor Update: {}", update);
if (update.getBundle().isPresent()) {
- scheduleConsumers(update.getBundle().get());
+ scheduleConsumers(update);
} else if (update.getException().isPresent()) {
visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
}