You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/28 18:06:46 UTC
[2/2] incubator-beam git commit: Add input type to TransformResult
Add input type to TransformResult
This would likely have caught some hard-to-diagnose type safety errors
during the development of StatefulParDoEvaluatorFactory, so adding it
should hopefully catch similar bugs in the future.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7502adda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7502adda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7502adda
Branch: refs/heads/master
Commit: 7502adda3262bce9d6d4fe4499bde8d8b5273029
Parents: 9fbd2d2
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 22 16:01:45 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Nov 28 10:06:31 2016 -0800
----------------------------------------------------------------------
.../direct/AbstractModelEnforcement.java | 2 +-
.../direct/BoundedReadEvaluatorFactory.java | 2 +-
.../beam/runners/direct/CommittedResult.java | 2 +-
.../beam/runners/direct/CompletionCallback.java | 2 +-
...ecycleManagerRemovingTransformEvaluator.java | 2 +-
.../runners/direct/EmptyTransformEvaluator.java | 4 +-
.../beam/runners/direct/EvaluationContext.java | 2 +-
.../direct/ExecutorServiceParallelExecutor.java | 2 +-
.../runners/direct/FlattenEvaluatorFactory.java | 10 ++---
.../GroupAlsoByWindowEvaluatorFactory.java | 5 ++-
.../direct/GroupByKeyOnlyEvaluatorFactory.java | 2 +-
.../direct/ImmutabilityEnforcementFactory.java | 2 +-
.../beam/runners/direct/ModelEnforcement.java | 2 +-
.../beam/runners/direct/ParDoEvaluator.java | 2 +-
.../direct/PassthroughTransformEvaluator.java | 4 +-
.../runners/direct/StepTransformResult.java | 38 +++++++++--------
.../direct/TestStreamEvaluatorFactory.java | 2 +-
.../beam/runners/direct/TransformEvaluator.java | 2 +-
.../beam/runners/direct/TransformExecutor.java | 4 +-
.../beam/runners/direct/TransformResult.java | 16 +++++--
.../direct/UnboundedReadEvaluatorFactory.java | 3 +-
.../runners/direct/ViewEvaluatorFactory.java | 2 +-
.../runners/direct/WindowEvaluatorFactory.java | 6 ++-
.../direct/BoundedReadEvaluatorFactoryTest.java | 10 ++---
...leManagerRemovingTransformEvaluatorTest.java | 4 +-
.../runners/direct/EvaluationContextTest.java | 20 ++++-----
.../direct/FlattenEvaluatorFactoryTest.java | 6 +--
.../ImmutabilityEnforcementFactoryTest.java | 6 +--
.../beam/runners/direct/ParDoEvaluatorTest.java | 2 +-
.../runners/direct/StepTransformResultTest.java | 25 ++++++-----
.../direct/TestStreamEvaluatorFactoryTest.java | 10 ++---
.../runners/direct/TransformExecutorTest.java | 45 ++++++++++----------
.../UnboundedReadEvaluatorFactoryTest.java | 20 ++++++---
.../direct/WindowEvaluatorFactoryTest.java | 12 +++---
34 files changed, 152 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
index 81f0f5f..f09164b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
@@ -33,6 +33,6 @@ abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
@Override
public void afterFinish(
CommittedBundle<T> input,
- TransformResult result,
+ TransformResult<T> result,
Iterable<? extends CommittedBundle<?>> outputs) {}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 66c55cd..65b622f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -161,7 +161,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public TransformResult finishBundle() {
+ public TransformResult<BoundedSourceShard<OutputT>> finishBundle() {
return resultBuilder.build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index 5fcf7b3..4db7e18 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -59,7 +59,7 @@ abstract class CommittedResult {
public abstract Set<OutputType> getProducedOutputTypes();
public static CommittedResult create(
- TransformResult original,
+ TransformResult<?> original,
CommittedBundle<?> unprocessedElements,
Iterable<? extends CommittedBundle<?>> outputs,
Set<OutputType> producedOutputs) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 2986df1..766259d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -28,7 +28,7 @@ interface CompletionCallback {
* Handle a successful result, returning the committed outputs of the result.
*/
CommittedResult handleResult(
- CommittedBundle<?> inputBundle, TransformResult result);
+ CommittedBundle<?> inputBundle, TransformResult<?> result);
/**
* Handle an input bundle that did not require processing.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index faa0615..fb13b0f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -54,7 +54,7 @@ class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements Transfor
}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<InputT> finishBundle() throws Exception {
try {
return underlying.finishBundle();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
index 778c5aa..85e5e70 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
@@ -43,8 +43,8 @@ final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> {
public void processElement(WindowedValue<T> element) throws Exception {}
@Override
- public TransformResult finishBundle() throws Exception {
- return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE)
+ public TransformResult<T> finishBundle() throws Exception {
+ return StepTransformResult.<T>withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE)
.build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index b814def..c1225f6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -161,7 +161,7 @@ class EvaluationContext {
public CommittedResult handleResult(
@Nullable CommittedBundle<?> completedBundle,
Iterable<TimerData> completedTimers,
- TransformResult result) {
+ TransformResult<?> result) {
Iterable<? extends CommittedBundle<?>> committedBundles =
commitBundles(result.getOutputBundles());
metrics.commitLogical(completedBundle, result.getLogicalMetricUpdates());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 05cdd34..b7908c5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -270,7 +270,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
@Override
public final CommittedResult handleResult(
- CommittedBundle<?> inputBundle, TransformResult result) {
+ CommittedBundle<?> inputBundle, TransformResult<?> result) {
CommittedResult committedResult = evaluationContext.handleResult(inputBundle, timers, result);
for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 57d5628..817e736 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -56,17 +56,17 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
application) {
final UncommittedBundle<InputT> outputBundle =
evaluationContext.createBundle(application.getOutput());
- final TransformResult result =
- StepTransformResult.withoutHold(application).addOutput(outputBundle).build();
+ final TransformResult<InputT> result =
+ StepTransformResult.<InputT>withoutHold(application).addOutput(outputBundle).build();
return new FlattenEvaluator<>(outputBundle, result);
}
private static class FlattenEvaluator<InputT> implements TransformEvaluator<InputT> {
private final UncommittedBundle<InputT> outputBundle;
- private final TransformResult result;
+ private final TransformResult<InputT> result;
public FlattenEvaluator(
- UncommittedBundle<InputT> outputBundle, TransformResult result) {
+ UncommittedBundle<InputT> outputBundle, TransformResult<InputT> result) {
this.outputBundle = outputBundle;
this.result = result;
}
@@ -77,7 +77,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public TransformResult finishBundle() {
+ public TransformResult<InputT> finishBundle() {
return result;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 36c742b..9d25bc6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -208,10 +208,11 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception {
// State is initialized within the constructor. It can never be null.
CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
- return StepTransformResult.withHold(application, state.getEarliestWatermarkHold())
+ return StepTransformResult.<KeyedWorkItem<K, V>>withHold(
+ application, state.getEarliestWatermarkHold())
.withState(state)
.addOutput(outputBundles)
.withTimerUpdate(stepContext.getTimerUpdate())
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index 0fa7ebd..4d691ea 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -143,7 +143,7 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public TransformResult finishBundle() {
+ public TransformResult<KV<K, V>> finishBundle() {
Builder resultBuilder = StepTransformResult.withoutHold(application);
for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
groupingMap.entrySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
index 612922a..85fc374 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
@@ -74,7 +74,7 @@ class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
@Override
public void afterFinish(
CommittedBundle<T> input,
- TransformResult result,
+ TransformResult<T> result,
Iterable<? extends CommittedBundle<?>> outputs) {
for (MutationDetector detector : mutationElements.values()) {
verifyUnmodified(detector);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
index 074619a..25226f7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
@@ -58,6 +58,6 @@ public interface ModelEnforcement<T> {
*/
void afterFinish(
CommittedBundle<T> input,
- TransformResult result,
+ TransformResult<T> result,
Iterable<? extends CommittedBundle<?>> outputs);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 6f91319..254fa44 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -122,7 +122,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
}
@Override
- public TransformResult finishBundle() {
+ public TransformResult<InputT> finishBundle() {
try {
fnRunner.finishBundle();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
index c6e10e5..153af65 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
@@ -42,8 +42,8 @@ class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT
}
@Override
- public TransformResult finishBundle() throws Exception {
- return StepTransformResult.withoutHold(transform).addOutput(output).build();
+ public TransformResult<InputT> finishBundle() throws Exception {
+ return StepTransformResult.<InputT>withoutHold(transform).addOutput(output).build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 5719e44..d58b027 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -37,18 +37,20 @@ import org.joda.time.Instant;
* An immutable {@link TransformResult}.
*/
@AutoValue
-public abstract class StepTransformResult implements TransformResult {
+public abstract class StepTransformResult<InputT> implements TransformResult<InputT> {
- public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
+ public static <InputT> Builder<InputT> withHold(
+ AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
return new Builder(transform, watermarkHold);
}
- public static Builder withoutHold(AppliedPTransform<?, ?, ?> transform) {
+ public static <InputT> Builder<InputT> withoutHold(
+ AppliedPTransform<?, ?, ?> transform) {
return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE);
}
@Override
- public TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates) {
+ public TransformResult<InputT> withLogicalMetricUpdates(MetricUpdates metricUpdates) {
return new AutoValue_StepTransformResult(
getTransform(),
getOutputBundles(),
@@ -64,10 +66,10 @@ public abstract class StepTransformResult implements TransformResult {
/**
* A builder for creating instances of {@link StepTransformResult}.
*/
- public static class Builder {
+ public static class Builder<InputT> {
private final AppliedPTransform<?, ?, ?> transform;
private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
- private final ImmutableList.Builder<WindowedValue<?>> unprocessedElementsBuilder;
+ private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElementsBuilder;
private MetricUpdates metricUpdates;
private CopyOnAccessInMemoryStateInternals<?> state;
private TimerUpdate timerUpdate;
@@ -85,8 +87,8 @@ public abstract class StepTransformResult implements TransformResult {
this.metricUpdates = MetricUpdates.EMPTY;
}
- public StepTransformResult build() {
- return new AutoValue_StepTransformResult(
+ public StepTransformResult<InputT> build() {
+ return new AutoValue_StepTransformResult<>(
transform,
bundlesBuilder.build(),
unprocessedElementsBuilder.build(),
@@ -98,49 +100,51 @@ public abstract class StepTransformResult implements TransformResult {
producedOutputs);
}
- public Builder withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) {
+ public Builder<InputT> withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) {
this.aggregatorChanges = aggregatorChanges;
return this;
}
- public Builder withMetricUpdates(MetricUpdates metricUpdates) {
+ public Builder<InputT> withMetricUpdates(MetricUpdates metricUpdates) {
this.metricUpdates = metricUpdates;
return this;
}
- public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) {
+ public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals<?> state) {
this.state = state;
return this;
}
- public Builder withTimerUpdate(TimerUpdate timerUpdate) {
+ public Builder<InputT> withTimerUpdate(TimerUpdate timerUpdate) {
this.timerUpdate = timerUpdate;
return this;
}
- public Builder addUnprocessedElements(WindowedValue<?>... unprocessed) {
+ public Builder<InputT> addUnprocessedElements(WindowedValue<InputT>... unprocessed) {
unprocessedElementsBuilder.addAll(Arrays.asList(unprocessed));
return this;
}
- public Builder addUnprocessedElements(Iterable<? extends WindowedValue<?>> unprocessed) {
+ public Builder<InputT> addUnprocessedElements(
+ Iterable<? extends WindowedValue<InputT>> unprocessed) {
unprocessedElementsBuilder.addAll(unprocessed);
return this;
}
- public Builder addOutput(
+ public Builder<InputT> addOutput(
UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) {
bundlesBuilder.add(outputBundle);
bundlesBuilder.add(outputBundles);
return this;
}
- public Builder addOutput(Collection<UncommittedBundle<?>> outputBundles) {
+ public Builder<InputT> addOutput(
+ Collection<UncommittedBundle<?>> outputBundles) {
bundlesBuilder.addAll(outputBundles);
return this;
}
- public Builder withAdditionalOutput(OutputType producedAdditionalOutput) {
+ public Builder<InputT> withAdditionalOutput(OutputType producedAdditionalOutput) {
producedOutputs.add(producedAdditionalOutput);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 2ab6adf..9df7cdc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -127,7 +127,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<TestStreamIndex<T>> finishBundle() throws Exception {
return resultBuilder.build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
index 1624fcb..79c942b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
@@ -42,5 +42,5 @@ public interface TransformEvaluator<InputT> {
*
* @return an {@link TransformResult} containing the results of this bundle evaluation.
*/
- TransformResult finishBundle() throws Exception;
+ TransformResult<InputT> finishBundle() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index fb31cc9..bbc0aae 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -159,11 +159,11 @@ class TransformExecutor<T> implements Runnable {
* @return the {@link TransformResult} produced by
* {@link TransformEvaluator#finishBundle()}
*/
- private TransformResult finishBundle(
+ private TransformResult<T> finishBundle(
TransformEvaluator<T> evaluator, MetricsContainer metricsContainer,
Collection<ModelEnforcement<T>> enforcements)
throws Exception {
- TransformResult result = evaluator.finishBundle()
+ TransformResult<T> result = evaluator.finishBundle()
.withLogicalMetricUpdates(metricsContainer.getCumulative());
CommittedResult outputs = onComplete.handleResult(inputBundle, result);
for (ModelEnforcement<T> enforcement : enforcements) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index ac1e395..b4797b0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -25,6 +25,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.metrics.MetricUpdates;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
@@ -32,16 +33,25 @@ import org.joda.time.Instant;
/**
* The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}.
+ *
+ * <p>Every transform evaluator has a defined input type, but {@link ParDo} has multiple outputs
+ * so there is not necesssarily a defined output type.
*/
-public interface TransformResult {
+public interface TransformResult<InputT> {
/**
* Returns the {@link AppliedPTransform} that produced this result.
+ *
+ * <p>This is treated as an opaque identifier so evaluators can delegate to other evaluators
+ * that may not have compatible types.
*/
AppliedPTransform<?, ?, ?> getTransform();
/**
* Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These
* will be committed by the evaluation context as part of completing this result.
+ *
+ * <p>Note that the bundles need not have a uniform type, for example in the case of multi-output
+ * {@link ParDo}.
*/
Iterable<? extends UncommittedBundle<?>> getOutputBundles();
@@ -49,7 +59,7 @@ public interface TransformResult {
* Returns elements that were provided to the {@link TransformEvaluator} as input but were not
* processed.
*/
- Iterable<? extends WindowedValue<?>> getUnprocessedElements();
+ Iterable<? extends WindowedValue<InputT>> getUnprocessedElements();
/**
* Returns the {@link AggregatorContainer.Mutator} used by this {@link PTransform}, or null if
@@ -97,5 +107,5 @@ public interface TransformResult {
* Returns a new TransformResult based on this one but overwriting any existing logical metric
* updates with {@code metricUpdates}.
*/
- TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates);
+ TransformResult<InputT> withLogicalMetricUpdates(MetricUpdates metricUpdates);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 24a91cb..a4aebc9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -229,7 +229,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public TransformResult finishBundle() throws IOException {
+ public TransformResult<UnboundedSourceShard<OutputT, CheckpointMarkT>> finishBundle()
+ throws IOException {
return resultBuilder.build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 2dd280a..b92ade1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -81,7 +81,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public TransformResult finishBundle() {
+ public TransformResult<Iterable<InT>> finishBundle() {
writer.add(elements);
Builder resultBuilder = StepTransformResult.withoutHold(application);
if (!elements.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index eb53b7f..991addf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -103,8 +103,10 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public TransformResult finishBundle() throws Exception {
- return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build();
+ public TransformResult<InputT> finishBundle() throws Exception {
+ return StepTransformResult.<InputT>withoutHold(transform)
+ .addOutput(outputBundle)
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index e956c34..dee95a7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -110,7 +110,7 @@ public class BoundedReadEvaluatorFactoryTest {
for (WindowedValue<?> shard : shardBundle.getElements()) {
evaluator.processElement((WindowedValue) shard);
}
- TransformResult result = evaluator.finishBundle();
+ TransformResult<?> result = evaluator.finishBundle();
assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
assertThat(
Iterables.size(result.getOutputBundles()),
@@ -154,11 +154,11 @@ public class BoundedReadEvaluatorFactoryTest {
Collection<CommittedBundle<?>> newUnreadInputs = new ArrayList<>();
for (CommittedBundle<?> shardBundle : unreadInputs) {
- TransformEvaluator<?> evaluator = factory.forApplication(transform, null);
+ TransformEvaluator<Long> evaluator = factory.forApplication(transform, null);
for (WindowedValue<?> shard : shardBundle.getElements()) {
evaluator.processElement((WindowedValue) shard);
}
- TransformResult result = evaluator.finishBundle();
+ TransformResult<Long> result = evaluator.finishBundle();
assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
assertThat(
Iterables.size(result.getOutputBundles()),
@@ -207,7 +207,7 @@ public class BoundedReadEvaluatorFactoryTest {
for (WindowedValue<?> shard : shardBundle.getElements()) {
evaluator.processElement((WindowedValue) shard);
}
- TransformResult result = evaluator.finishBundle();
+ TransformResult<?> result = evaluator.finishBundle();
assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
assertThat(
Iterables.size(result.getOutputBundles()),
@@ -277,7 +277,7 @@ public class BoundedReadEvaluatorFactoryTest {
when(context.createBundle(longs)).thenReturn(outputBundle);
evaluator.processElement(shard);
}
- TransformResult result = evaluator.finishBundle();
+ TransformResult<?> result = evaluator.finishBundle();
assertThat(Iterables.size(result.getOutputBundles()), equalTo(splits.size()));
List<WindowedValue<?>> outputElems = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index 9e2732e..b5eec63 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -115,7 +115,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<Object> finishBundle() throws Exception {
finishBundleCalled = true;
return null;
}
@@ -128,7 +128,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<Object> finishBundle() throws Exception {
throw new Exception();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index e1277ac..9a3959d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -250,7 +250,7 @@ public class EvaluationContextTest {
AggregatorContainer.Mutator mutator = container.createMutator();
mutator.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(4L);
- TransformResult result =
+ TransformResult<?> result =
StepTransformResult.withoutHold(created.getProducingTransformInternal())
.withAggregatorChanges(mutator)
.build();
@@ -260,7 +260,7 @@ public class EvaluationContextTest {
AggregatorContainer.Mutator mutatorAgain = container.createMutator();
mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(12L);
- TransformResult secondResult =
+ TransformResult<?> secondResult =
StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
.withAggregatorChanges(mutatorAgain)
.build();
@@ -286,7 +286,7 @@ public class EvaluationContextTest {
bag.add(2);
bag.add(4);
- TransformResult stateResult =
+ TransformResult<?> stateResult =
StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
.withState(state)
.build();
@@ -319,7 +319,7 @@ public class EvaluationContextTest {
context.scheduleAfterOutputWouldBeProduced(
downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback);
- TransformResult result =
+ TransformResult<?> result =
StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
.build();
@@ -328,7 +328,7 @@ public class EvaluationContextTest {
// will likely be flaky if this logic is broken
assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false));
- TransformResult finishedResult =
+ TransformResult<?> finishedResult =
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
context.forceRefresh();
@@ -338,7 +338,7 @@ public class EvaluationContextTest {
@Test
public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception {
- TransformResult finishedResult =
+ TransformResult<?> finishedResult =
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
@@ -358,7 +358,7 @@ public class EvaluationContextTest {
@Test
public void extractFiredTimersExtractsTimers() {
- TransformResult holdResult =
+ TransformResult<?> holdResult =
StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
.build();
context.handleResult(null, ImmutableList.<TimerData>of(), holdResult);
@@ -366,7 +366,7 @@ public class EvaluationContextTest {
StructuralKey<?> key = StructuralKey.of("foo".length(), VarIntCoder.of());
TimerData toFire =
TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
- TransformResult timerResult =
+ TransformResult<?> timerResult =
StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
.withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null))
.withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build())
@@ -382,7 +382,7 @@ public class EvaluationContextTest {
// timer hasn't fired
assertThat(context.extractFiredTimers(), emptyIterable());
- TransformResult advanceResult =
+ TransformResult<?> advanceResult =
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
// Should cause the downstream timer to fire
context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
@@ -460,7 +460,7 @@ public class EvaluationContextTest {
context.handleResult(
null,
ImmutableList.<TimerData>of(),
- StepTransformResult.withoutHold(created.getProducingTransformInternal())
+ StepTransformResult.<Integer>withoutHold(created.getProducingTransformInternal())
.addOutput(rootBundle)
.build());
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 417aa64..cb27fbc 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -84,8 +84,8 @@ public class FlattenEvaluatorFactoryTest {
rightSideEvaluator.processElement(
WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)));
- TransformResult rightSideResult = rightSideEvaluator.finishBundle();
- TransformResult leftSideResult = leftSideEvaluator.finishBundle();
+ TransformResult<Integer> rightSideResult = rightSideEvaluator.finishBundle();
+ TransformResult<Integer> leftSideResult = leftSideEvaluator.finishBundle();
assertThat(
rightSideResult.getOutputBundles(),
@@ -131,7 +131,7 @@ public class FlattenEvaluatorFactoryTest {
flattened.getProducingTransformInternal(),
bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
- TransformResult leftSideResult = emptyEvaluator.finishBundle();
+ TransformResult<Integer> leftSideResult = emptyEvaluator.finishBundle();
CommittedBundle<?> outputBundle =
Iterables.getOnlyElement(leftSideResult.getOutputBundles()).commit(Instant.now());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index a7277fe..a65cd30 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -78,7 +78,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
enforcement.afterElement(element);
enforcement.afterFinish(
elements,
- StepTransformResult.withoutHold(consumer).build(),
+ StepTransformResult.<byte[]>withoutHold(consumer).build(),
Collections.<CommittedBundle<?>>emptyList());
}
@@ -98,7 +98,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
enforcement.afterElement(element);
enforcement.afterFinish(
elements,
- StepTransformResult.withoutHold(consumer).build(),
+ StepTransformResult.<byte[]>withoutHold(consumer).build(),
Collections.<CommittedBundle<?>>emptyList());
}
@@ -120,7 +120,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
thrown.expectMessage("Input values must not be mutated");
enforcement.afterFinish(
elements,
- StepTransformResult.withoutHold(consumer).build(),
+ StepTransformResult.<byte[]>withoutHold(consumer).build(),
Collections.<CommittedBundle<?>>emptyList());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index eab92f4..85e99c5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -112,7 +112,7 @@ public class ParDoEvaluatorTest {
evaluator.processElement(first);
evaluator.processElement(second);
evaluator.processElement(third);
- TransformResult result = evaluator.finishBundle();
+ TransformResult<Integer> result = evaluator.finishBundle();
assertThat(
result.getUnprocessedElements(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
index 61f5812..a21d8f7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
@@ -56,34 +56,37 @@ public class StepTransformResultTest {
@Test
public void producedBundlesProducedOutputs() {
UncommittedBundle<Integer> bundle = bundleFactory.createBundle(pc);
- TransformResult result = StepTransformResult.withoutHold(transform).addOutput(bundle)
- .build();
+ TransformResult<Integer> result =
+ StepTransformResult.<Integer>withoutHold(transform).addOutput(bundle).build();
- assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle>containsInAnyOrder(bundle));
+ assertThat(
+ result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(bundle));
}
@Test
public void withAdditionalOutputProducedOutputs() {
- TransformResult result = StepTransformResult.withoutHold(transform)
- .withAdditionalOutput(OutputType.PCOLLECTION_VIEW)
- .build();
+ TransformResult<Integer> result =
+ StepTransformResult.<Integer>withoutHold(transform)
+ .withAdditionalOutput(OutputType.PCOLLECTION_VIEW)
+ .build();
assertThat(result.getOutputTypes(), containsInAnyOrder(OutputType.PCOLLECTION_VIEW));
}
@Test
public void producedBundlesAndAdditionalOutputProducedOutputs() {
- TransformResult result = StepTransformResult.withoutHold(transform)
- .addOutput(bundleFactory.createBundle(pc))
- .withAdditionalOutput(OutputType.PCOLLECTION_VIEW)
- .build();
+ TransformResult<Integer> result =
+ StepTransformResult.<Integer>withoutHold(transform)
+ .addOutput(bundleFactory.createBundle(pc))
+ .withAdditionalOutput(OutputType.PCOLLECTION_VIEW)
+ .build();
assertThat(result.getOutputTypes(), hasItem(OutputType.PCOLLECTION_VIEW));
}
@Test
public void noBundlesNoAdditionalOutputProducedOutputsFalse() {
- TransformResult result = StepTransformResult.withoutHold(transform).build();
+ TransformResult<Integer> result = StepTransformResult.<Integer>withoutHold(transform).build();
assertThat(result.getOutputTypes(), emptyIterable());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 94a0d41..3d31df6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -90,7 +90,7 @@ public class TestStreamEvaluatorFactoryTest {
TransformEvaluator<TestStreamIndex<Integer>> firstEvaluator =
factory.forApplication(streamVals.getProducingTransformInternal(), initialBundle);
firstEvaluator.processElement(Iterables.getOnlyElement(initialBundle.getElements()));
- TransformResult firstResult = firstEvaluator.finishBundle();
+ TransformResult<TestStreamIndex<Integer>> firstResult = firstEvaluator.finishBundle();
WindowedValue<TestStreamIndex<Integer>> firstResidual =
(WindowedValue<TestStreamIndex<Integer>>)
@@ -103,7 +103,7 @@ public class TestStreamEvaluatorFactoryTest {
TransformEvaluator<TestStreamIndex<Integer>> secondEvaluator =
factory.forApplication(streamVals.getProducingTransformInternal(), secondBundle);
secondEvaluator.processElement(firstResidual);
- TransformResult secondResult = secondEvaluator.finishBundle();
+ TransformResult<TestStreamIndex<Integer>> secondResult = secondEvaluator.finishBundle();
WindowedValue<TestStreamIndex<Integer>> secondResidual =
(WindowedValue<TestStreamIndex<Integer>>)
@@ -116,7 +116,7 @@ public class TestStreamEvaluatorFactoryTest {
TransformEvaluator<TestStreamIndex<Integer>> thirdEvaluator =
factory.forApplication(streamVals.getProducingTransformInternal(), thirdBundle);
thirdEvaluator.processElement(secondResidual);
- TransformResult thirdResult = thirdEvaluator.finishBundle();
+ TransformResult<TestStreamIndex<Integer>> thirdResult = thirdEvaluator.finishBundle();
WindowedValue<TestStreamIndex<Integer>> thirdResidual =
(WindowedValue<TestStreamIndex<Integer>>)
@@ -130,7 +130,7 @@ public class TestStreamEvaluatorFactoryTest {
TransformEvaluator<TestStreamIndex<Integer>> fourthEvaluator =
factory.forApplication(streamVals.getProducingTransformInternal(), fourthBundle);
fourthEvaluator.processElement(thirdResidual);
- TransformResult fourthResult = fourthEvaluator.finishBundle();
+ TransformResult<TestStreamIndex<Integer>> fourthResult = fourthEvaluator.finishBundle();
assertThat(clock.now(), equalTo(start.plus(Duration.standardMinutes(10))));
WindowedValue<TestStreamIndex<Integer>> fourthResidual =
@@ -144,7 +144,7 @@ public class TestStreamEvaluatorFactoryTest {
TransformEvaluator<TestStreamIndex<Integer>> fifthEvaluator =
factory.forApplication(streamVals.getProducingTransformInternal(), fifthBundle);
fifthEvaluator.processElement(fourthResidual);
- TransformResult fifthResult = fifthEvaluator.finishBundle();
+ TransformResult<TestStreamIndex<Integer>> fifthResult = fifthEvaluator.finishBundle();
assertThat(
Iterables.getOnlyElement(firstResult.getOutputBundles())
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 0b7b882..85eff65 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@@ -96,7 +95,7 @@ public class TransformExecutorTest {
@Test
public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
- final TransformResult result =
+ final TransformResult<Object> result =
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
final AtomicBoolean finishCalled = new AtomicBoolean(false);
TransformEvaluator<Object> evaluator =
@@ -107,7 +106,7 @@ public class TransformExecutorTest {
}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<Object> finishBundle() throws Exception {
finishCalled.set(true);
return result;
}
@@ -128,7 +127,7 @@ public class TransformExecutorTest {
executor.run();
assertThat(finishCalled.get(), is(true));
- assertThat(completionCallback.handledResult, equalTo(result));
+ assertThat(completionCallback.handledResult, Matchers.<TransformResult<?>>equalTo(result));
assertThat(completionCallback.handledException, is(nullValue()));
}
@@ -154,8 +153,8 @@ public class TransformExecutorTest {
@Test
public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
- final TransformResult result =
- StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+ final TransformResult<String> result =
+ StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build();
final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>();
TransformEvaluator<String> evaluator =
new TransformEvaluator<String>() {
@@ -166,7 +165,7 @@ public class TransformExecutorTest {
}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<String> finishBundle() throws Exception {
return result;
}
};
@@ -194,14 +193,14 @@ public class TransformExecutorTest {
evaluatorCompleted.await();
assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
- assertThat(completionCallback.handledResult, equalTo(result));
+ assertThat(completionCallback.handledResult, Matchers.<TransformResult<?>>equalTo(result));
assertThat(completionCallback.handledException, is(nullValue()));
}
@Test
public void processElementThrowsExceptionCallsback() throws Exception {
- final TransformResult result =
- StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+ final TransformResult<String> result =
+ StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build();
final Exception exception = new Exception();
TransformEvaluator<String> evaluator =
new TransformEvaluator<String>() {
@@ -211,7 +210,7 @@ public class TransformExecutorTest {
}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<String> finishBundle() throws Exception {
return result;
}
};
@@ -248,7 +247,7 @@ public class TransformExecutorTest {
public void processElement(WindowedValue<String> element) throws Exception {}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<String> finishBundle() throws Exception {
throw exception;
}
};
@@ -277,7 +276,7 @@ public class TransformExecutorTest {
@Test
public void callWithEnforcementAppliesEnforcement() throws Exception {
- final TransformResult result =
+ final TransformResult<Object> result =
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
TransformEvaluator<Object> evaluator =
@@ -286,7 +285,7 @@ public class TransformExecutorTest {
public void processElement(WindowedValue<Object> element) throws Exception {}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<Object> finishBundle() throws Exception {
return result;
}
};
@@ -317,7 +316,7 @@ public class TransformExecutorTest {
assertThat(
testEnforcement.afterElements,
Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem));
- assertThat(testEnforcement.finishedBundles, contains(result));
+ assertThat(testEnforcement.finishedBundles, Matchers.<TransformResult<?>>contains(result));
}
@Test
@@ -333,7 +332,7 @@ public class TransformExecutorTest {
}
});
- final TransformResult result =
+ final TransformResult<Object> result =
StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
final CountDownLatch testLatch = new CountDownLatch(1);
final CountDownLatch evaluatorLatch = new CountDownLatch(1);
@@ -344,7 +343,7 @@ public class TransformExecutorTest {
public void processElement(WindowedValue<Object> element) throws Exception {}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<Object> finishBundle() throws Exception {
testLatch.countDown();
evaluatorLatch.await();
return result;
@@ -389,7 +388,7 @@ public class TransformExecutorTest {
}
});
- final TransformResult result =
+ final TransformResult<Object> result =
StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
final CountDownLatch testLatch = new CountDownLatch(1);
final CountDownLatch evaluatorLatch = new CountDownLatch(1);
@@ -403,7 +402,7 @@ public class TransformExecutorTest {
}
@Override
- public TransformResult finishBundle() throws Exception {
+ public TransformResult<Object> finishBundle() throws Exception {
return result;
}
};
@@ -434,7 +433,7 @@ public class TransformExecutorTest {
}
private static class RegisteringCompletionCallback implements CompletionCallback {
- private TransformResult handledResult = null;
+ private TransformResult<?> handledResult = null;
private boolean handledEmpty = false;
private Exception handledException = null;
private final CountDownLatch onMethod;
@@ -444,7 +443,7 @@ public class TransformExecutorTest {
}
@Override
- public CommittedResult handleResult(CommittedBundle<?> inputBundle, TransformResult result) {
+ public CommittedResult handleResult(CommittedBundle<?> inputBundle, TransformResult<?> result) {
handledResult = result;
onMethod.countDown();
@SuppressWarnings("rawtypes")
@@ -490,7 +489,7 @@ public class TransformExecutorTest {
private static class TestEnforcement<T> implements ModelEnforcement<T> {
private final List<WindowedValue<T>> beforeElements = new ArrayList<>();
private final List<WindowedValue<T>> afterElements = new ArrayList<>();
- private final List<TransformResult> finishedBundles = new ArrayList<>();
+ private final List<TransformResult<?>> finishedBundles = new ArrayList<>();
@Override
public void beforeElement(WindowedValue<T> element) {
@@ -505,7 +504,7 @@ public class TransformExecutorTest {
@Override
public void afterFinish(
CommittedBundle<T> input,
- TransformResult result,
+ TransformResult<T> result,
Iterable<? extends CommittedBundle<?>> outputs) {
finishedBundles.add(result);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 8d38275..5a10134 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -159,9 +159,10 @@ public class UnboundedReadEvaluatorFactoryTest {
longs.getProducingTransformInternal(), inputShards);
evaluator.processElement((WindowedValue) Iterables.getOnlyElement(inputShards.getElements()));
- TransformResult result = evaluator.finishBundle();
+ TransformResult<? super UnboundedSourceShard<Long, ?>> result = evaluator.finishBundle();
- WindowedValue<?> residual = Iterables.getOnlyElement(result.getUnprocessedElements());
+ WindowedValue<? super UnboundedSourceShard<Long, ?>> residual =
+ Iterables.getOnlyElement(result.getUnprocessedElements());
assertThat(
residual.getTimestamp(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
UnboundedSourceShard<Long, ?> residualShard =
@@ -206,7 +207,8 @@ public class UnboundedReadEvaluatorFactoryTest {
evaluator.processElement(
(WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) value);
}
- TransformResult result = evaluator.finishBundle();
+ TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
+ evaluator.finishBundle();
assertThat(
output.commit(Instant.now()).getElements(),
containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L)));
@@ -248,7 +250,8 @@ public class UnboundedReadEvaluatorFactoryTest {
evaluator.processElement(
(WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) value);
}
- TransformResult result = evaluator.finishBundle();
+ TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
+ evaluator.finishBundle();
// Read from the residual of the first read. This should not produce any output, but should
// include a residual shard in the result.
@@ -261,7 +264,8 @@ public class UnboundedReadEvaluatorFactoryTest {
Iterables.getOnlyElement(result.getUnprocessedElements());
secondEvaluator.processElement(residual);
- TransformResult secondResult = secondEvaluator.finishBundle();
+ TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> secondResult =
+ secondEvaluator.finishBundle();
// Sanity check that nothing was output (The test would have to run for more than a day to do
// so correctly.)
@@ -308,7 +312,8 @@ public class UnboundedReadEvaluatorFactoryTest {
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, inputBundle);
evaluator.processElement(shard);
- TransformResult result = evaluator.finishBundle();
+ TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
+ evaluator.finishBundle();
CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
inputBundle.withElements(
@@ -350,7 +355,8 @@ public class UnboundedReadEvaluatorFactoryTest {
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, inputBundle);
evaluator.processElement(shard);
- TransformResult result = evaluator.finishBundle();
+ TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
+ evaluator.finishBundle();
CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
inputBundle.withElements(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index 741f8f2..e2f987c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -118,7 +118,7 @@ public class WindowEvaluatorFactoryTest {
UncommittedBundle<Long> outputBundle = createOutputBundle(triggering, inputBundle);
- TransformResult result = runEvaluator(triggering, inputBundle, transform);
+ TransformResult<Long> result = runEvaluator(triggering, inputBundle, transform);
assertThat(
Iterables.getOnlyElement(result.getOutputBundles()),
@@ -143,7 +143,7 @@ public class WindowEvaluatorFactoryTest {
BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration));
BoundedWindow thirdWindow = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
- TransformResult result = runEvaluator(windowed, inputBundle, transform);
+ TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform);
assertThat(
Iterables.getOnlyElement(result.getOutputBundles()),
@@ -178,7 +178,7 @@ public class WindowEvaluatorFactoryTest {
CommittedBundle<Long> inputBundle = createInputBundle();
UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
- TransformResult result = runEvaluator(windowed, inputBundle, transform);
+ TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform);
assertThat(
Iterables.getOnlyElement(result.getOutputBundles()),
@@ -235,7 +235,7 @@ public class WindowEvaluatorFactoryTest {
CommittedBundle<Long> inputBundle = createInputBundle();
UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
- TransformResult result = runEvaluator(windowed, inputBundle, transform);
+ TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform);
assertThat(
Iterables.getOnlyElement(result.getOutputBundles()),
@@ -301,7 +301,7 @@ public class WindowEvaluatorFactoryTest {
return outputBundle;
}
- private TransformResult runEvaluator(
+ private TransformResult<Long> runEvaluator(
PCollection<Long> windowed,
CommittedBundle<Long> inputBundle,
Window.Bound<Long> windowTransform /* Required while Window.Bound is a composite */)
@@ -313,7 +313,7 @@ public class WindowEvaluatorFactoryTest {
evaluator.processElement(valueInGlobalWindow);
evaluator.processElement(valueInGlobalAndTwoIntervalWindows);
evaluator.processElement(valueInIntervalWindow);
- TransformResult result = evaluator.finishBundle();
+ TransformResult<Long> result = evaluator.finishBundle();
return result;
}