You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/19 01:12:28 UTC
[4/7] beam git commit: Minor cleanups in ParDoEvaluator
Minor cleanups in ParDoEvaluator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cc16b0d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cc16b0d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cc16b0d
Branch: refs/heads/master
Commit: 1cc16b0d6cea7b01b01427758eaf427cc29635b6
Parents: 3fd8890
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 12:25:02 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700
----------------------------------------------------------------------
...oFnLifecycleManagerRemovingTransformEvaluator.java | 6 +++---
.../apache/beam/runners/direct/ParDoEvaluator.java | 14 +++++---------
.../beam/runners/direct/ParDoEvaluatorFactory.java | 2 +-
.../SplittableProcessElementsEvaluatorFactory.java | 2 +-
...ifecycleManagerRemovingTransformEvaluatorTest.java | 8 ++++----
.../beam/runners/direct/ParDoEvaluatorTest.java | 4 ++--
6 files changed, 16 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index 9bcd569..e537962 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -31,16 +31,16 @@ import org.slf4j.LoggerFactory;
class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements TransformEvaluator<InputT> {
private static final Logger LOG =
LoggerFactory.getLogger(DoFnLifecycleManagerRemovingTransformEvaluator.class);
- private final ParDoEvaluator<InputT, ?> underlying;
+ private final ParDoEvaluator<InputT> underlying;
private final DoFnLifecycleManager lifecycleManager;
public static <InputT> DoFnLifecycleManagerRemovingTransformEvaluator<InputT> wrapping(
- ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
+ ParDoEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
return new DoFnLifecycleManagerRemovingTransformEvaluator<>(underlying, lifecycleManager);
}
private DoFnLifecycleManagerRemovingTransformEvaluator(
- ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
+ ParDoEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
this.underlying = underlying;
this.lifecycleManager = lifecycleManager;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 49d0723..131716f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -40,9 +40,9 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
+class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
- public static <InputT, OutputT> ParDoEvaluator<InputT, OutputT> create(
+ public static <InputT, OutputT> ParDoEvaluator<InputT> create(
EvaluationContext evaluationContext,
DirectStepContext stepContext,
AppliedPTransform<?, ?, ?> application,
@@ -93,13 +93,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
throw UserCodeException.wrap(e);
}
- return new ParDoEvaluator<>(
- evaluationContext, runner, application, aggregatorChanges, outputManager, stepContext);
+ return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
}
////////////////////////////////////////////////////////////////////////////////////////////////
- private final EvaluationContext evaluationContext;
private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
private final AppliedPTransform<?, ?, ?> transform;
private final AggregatorContainer.Mutator aggregatorChanges;
@@ -109,13 +107,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements;
private ParDoEvaluator(
- EvaluationContext evaluationContext,
PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
AppliedPTransform<?, ?, ?> transform,
AggregatorContainer.Mutator aggregatorChanges,
BundleOutputManager outputManager,
DirectStepContext stepContext) {
- this.evaluationContext = evaluationContext;
this.fnRunner = fnRunner;
this.transform = transform;
this.outputManager = outputManager;
@@ -153,11 +149,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
} catch (Exception e) {
throw UserCodeException.wrap(e);
}
- StepTransformResult.Builder resultBuilder;
+ StepTransformResult.Builder<InputT> resultBuilder;
CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
if (state != null) {
resultBuilder =
- StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
+ StepTransformResult.<InputT>withHold(transform, state.getEarliestWatermarkHold())
.withState(state);
} else {
resultBuilder = StepTransformResult.withoutHold(transform);
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 0372295..93f204a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -126,7 +126,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
fnManager);
}
- ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
+ ParDoEvaluator<InputT> createParDoEvaluator(
AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
StructuralKey<?> key,
List<PCollectionView<?>> sideInputs,
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 64cef35..00b16dd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -98,7 +98,7 @@ class SplittableProcessElementsEvaluatorFactory<
.getExecutionContext(application, inputBundle.getKey())
.getOrCreateStepContext(stepName, stepName);
- ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
parDoEvaluator =
delegateFactory.createParDoEvaluator(
application,
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index d046ce5..1ac4d6d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -53,7 +53,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void delegatesToUnderlying() throws Exception {
- ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+ ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
DoFn<?, ?> original = lifecycleManager.get();
TransformEvaluator<Object> evaluator =
DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
@@ -72,7 +72,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void removesOnExceptionInProcessElement() throws Exception {
- ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+ ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
doThrow(Exception.class).when(underlying).processElement(any(WindowedValue.class));
DoFn<?, ?> original = lifecycleManager.get();
@@ -91,7 +91,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void removesOnExceptionInOnTimer() throws Exception {
- ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+ ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
doThrow(Exception.class)
.when(underlying)
.onTimer(any(TimerData.class), any(BoundedWindow.class));
@@ -114,7 +114,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
@Test
public void removesOnExceptionInFinishBundle() throws Exception {
- ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+ ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
doThrow(Exception.class).when(underlying).finishBundle();
DoFn<?, ?> original = lifecycleManager.get();
http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 65a1248..2be0f9d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -98,7 +98,7 @@ public class ParDoEvaluatorTest {
UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(output);
when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
- ParDoEvaluator<Integer, Integer> evaluator =
+ ParDoEvaluator<Integer> evaluator =
createEvaluator(singletonView, fn, output);
IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L));
@@ -130,7 +130,7 @@ public class ParDoEvaluatorTest {
WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L))));
}
- private ParDoEvaluator<Integer, Integer> createEvaluator(
+ private ParDoEvaluator<Integer> createEvaluator(
PCollectionView<Integer> singletonView,
RecorderFn fn,
PCollection<Integer> output) {