You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:37 UTC

[03/50] [abbrv] beam git commit: Minor cleanups in ParDoEvaluator

Minor cleanups in ParDoEvaluator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cc16b0d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cc16b0d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cc16b0d

Branch: refs/heads/DSL_SQL
Commit: 1cc16b0d6cea7b01b01427758eaf427cc29635b6
Parents: 3fd8890
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 12:25:02 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700

----------------------------------------------------------------------
 ...oFnLifecycleManagerRemovingTransformEvaluator.java |  6 +++---
 .../apache/beam/runners/direct/ParDoEvaluator.java    | 14 +++++---------
 .../beam/runners/direct/ParDoEvaluatorFactory.java    |  2 +-
 .../SplittableProcessElementsEvaluatorFactory.java    |  2 +-
 ...ifecycleManagerRemovingTransformEvaluatorTest.java |  8 ++++----
 .../beam/runners/direct/ParDoEvaluatorTest.java       |  4 ++--
 6 files changed, 16 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index 9bcd569..e537962 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -31,16 +31,16 @@ import org.slf4j.LoggerFactory;
 class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements TransformEvaluator<InputT> {
   private static final Logger LOG =
       LoggerFactory.getLogger(DoFnLifecycleManagerRemovingTransformEvaluator.class);
-  private final ParDoEvaluator<InputT, ?> underlying;
+  private final ParDoEvaluator<InputT> underlying;
   private final DoFnLifecycleManager lifecycleManager;
 
   public static <InputT> DoFnLifecycleManagerRemovingTransformEvaluator<InputT> wrapping(
-      ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
+      ParDoEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
     return new DoFnLifecycleManagerRemovingTransformEvaluator<>(underlying, lifecycleManager);
   }
 
   private DoFnLifecycleManagerRemovingTransformEvaluator(
-      ParDoEvaluator<InputT, ?> underlying, DoFnLifecycleManager lifecycleManager) {
+      ParDoEvaluator<InputT> underlying, DoFnLifecycleManager lifecycleManager) {
     this.underlying = underlying;
     this.lifecycleManager = lifecycleManager;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 49d0723..131716f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -40,9 +40,9 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
-class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
+class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> {
 
-  public static <InputT, OutputT> ParDoEvaluator<InputT, OutputT> create(
+  public static <InputT, OutputT> ParDoEvaluator<InputT> create(
       EvaluationContext evaluationContext,
       DirectStepContext stepContext,
       AppliedPTransform<?, ?, ?> application,
@@ -93,13 +93,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
       throw UserCodeException.wrap(e);
     }
 
-    return new ParDoEvaluator<>(
-        evaluationContext, runner, application, aggregatorChanges, outputManager, stepContext);
+    return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext);
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
 
-  private final EvaluationContext evaluationContext;
   private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
   private final AppliedPTransform<?, ?, ?> transform;
   private final AggregatorContainer.Mutator aggregatorChanges;
@@ -109,13 +107,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
   private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements;
 
   private ParDoEvaluator(
-      EvaluationContext evaluationContext,
       PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
       AppliedPTransform<?, ?, ?> transform,
       AggregatorContainer.Mutator aggregatorChanges,
       BundleOutputManager outputManager,
       DirectStepContext stepContext) {
-    this.evaluationContext = evaluationContext;
     this.fnRunner = fnRunner;
     this.transform = transform;
     this.outputManager = outputManager;
@@ -153,11 +149,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
     } catch (Exception e) {
       throw UserCodeException.wrap(e);
     }
-    StepTransformResult.Builder resultBuilder;
+    StepTransformResult.Builder<InputT> resultBuilder;
     CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
     if (state != null) {
       resultBuilder =
-          StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
+          StepTransformResult.<InputT>withHold(transform, state.getEarliestWatermarkHold())
               .withState(state);
     } else {
       resultBuilder = StepTransformResult.withoutHold(transform);

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 0372295..93f204a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -126,7 +126,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
         fnManager);
   }
 
-  ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
+  ParDoEvaluator<InputT> createParDoEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
       StructuralKey<?> key,
       List<PCollectionView<?>> sideInputs,

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 64cef35..00b16dd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -98,7 +98,7 @@ class SplittableProcessElementsEvaluatorFactory<
             .getExecutionContext(application, inputBundle.getKey())
             .getOrCreateStepContext(stepName, stepName);
 
-    ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+    ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
         parDoEvaluator =
             delegateFactory.createParDoEvaluator(
                 application,

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index d046ce5..1ac4d6d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -53,7 +53,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void delegatesToUnderlying() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     DoFn<?, ?> original = lifecycleManager.get();
     TransformEvaluator<Object> evaluator =
         DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);
@@ -72,7 +72,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void removesOnExceptionInProcessElement() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     doThrow(Exception.class).when(underlying).processElement(any(WindowedValue.class));
 
     DoFn<?, ?> original = lifecycleManager.get();
@@ -91,7 +91,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void removesOnExceptionInOnTimer() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     doThrow(Exception.class)
         .when(underlying)
         .onTimer(any(TimerData.class), any(BoundedWindow.class));
@@ -114,7 +114,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
   @Test
   public void removesOnExceptionInFinishBundle() throws Exception {
-    ParDoEvaluator<Object, Object> underlying = mock(ParDoEvaluator.class);
+    ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
     doThrow(Exception.class).when(underlying).finishBundle();
 
     DoFn<?, ?> original = lifecycleManager.get();

http://git-wip-us.apache.org/repos/asf/beam/blob/1cc16b0d/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 65a1248..2be0f9d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -98,7 +98,7 @@ public class ParDoEvaluatorTest {
     UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(output);
     when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
 
-    ParDoEvaluator<Integer, Integer> evaluator =
+    ParDoEvaluator<Integer> evaluator =
         createEvaluator(singletonView, fn, output);
 
     IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L));
@@ -130,7 +130,7 @@ public class ParDoEvaluatorTest {
             WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L))));
   }
 
-  private ParDoEvaluator<Integer, Integer> createEvaluator(
+  private ParDoEvaluator<Integer> createEvaluator(
       PCollectionView<Integer> singletonView,
       RecorderFn fn,
       PCollection<Integer> output) {