You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:34:57 UTC

[08/50] beam git commit: Remove the FnOutputT parameter from DoFnOperator

Remove the FnOutputT parameter from DoFnOperator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e8f26085
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e8f26085
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e8f26085

Branch: refs/heads/DSL_SQL
Commit: e8f26085e889f8f618c0961a5458cbc42b432c01
Parents: b0601fd
Author: JingsongLi <lz...@aliyun.com>
Authored: Tue Jun 6 17:31:09 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 6 14:33:36 2017 +0200

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 10 +++++-----
 .../wrappers/streaming/DoFnOperator.java        | 20 ++++++++++----------
 .../streaming/SplittableDoFnOperator.java       | 12 ++++++------
 .../wrappers/streaming/WindowDoFnOperator.java  |  2 +-
 .../beam/runners/flink/PipelineOptionsTest.java |  6 +++---
 .../flink/streaming/DoFnOperatorTest.java       | 11 +++++------
 6 files changed, 30 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index d8c3049..2a7c5d6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -332,7 +332,7 @@ class FlinkStreamingTransformTranslators {
   static class ParDoTranslationHelper {
 
     interface DoFnOperatorFactory<InputT, OutputT> {
-      DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator(
+      DoFnOperator<InputT, OutputT> createDoFnOperator(
           DoFn<InputT, OutputT> doFn,
           String stepName,
           List<PCollectionView<?>> sideInputs,
@@ -395,7 +395,7 @@ class FlinkStreamingTransformTranslators {
               context.getCoder((PCollection<OutputT>) outputs.get(mainOutputTag)));
 
       if (sideInputs.isEmpty()) {
-        DoFnOperator<InputT, OutputT, OutputT> doFnOperator =
+        DoFnOperator<InputT, OutputT> doFnOperator =
             doFnOperatorFactory.createDoFnOperator(
                 doFn,
                 context.getCurrentTransform().getFullName(),
@@ -416,7 +416,7 @@ class FlinkStreamingTransformTranslators {
         Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
             transformSideInputs(sideInputs, context);
 
-        DoFnOperator<InputT, OutputT, OutputT> doFnOperator =
+        DoFnOperator<InputT, OutputT> doFnOperator =
             doFnOperatorFactory.createDoFnOperator(
                 doFn,
                 context.getCurrentTransform().getFullName(),
@@ -493,7 +493,7 @@ class FlinkStreamingTransformTranslators {
           context,
           new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
             @Override
-            public DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator(
+            public DoFnOperator<InputT, OutputT> createDoFnOperator(
                 DoFn<InputT, OutputT> doFn,
                 String stepName,
                 List<PCollectionView<?>> sideInputs,
@@ -547,7 +547,7 @@ class FlinkStreamingTransformTranslators {
             @Override
             public DoFnOperator<
                 KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
-                OutputT, OutputT> createDoFnOperator(
+                OutputT> createDoFnOperator(
                     DoFn<
                         KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
                         OutputT> doFn,

http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8c27ed9..350f323 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -94,21 +94,21 @@ import org.joda.time.Instant;
  * Flink operator for executing {@link DoFn DoFns}.
  *
  * @param <InputT> the input type of the {@link DoFn}
- * @param <FnOutputT> the output type of the {@link DoFn}
+ * @param <OutputT> the output type of the {@link DoFn}
  * @param <OutputT> the output type of the operator, this can be different from the fn output
  *                 type when we have additional tagged outputs
  */
-public class DoFnOperator<InputT, FnOutputT, OutputT>
+public class DoFnOperator<InputT, OutputT>
     extends AbstractStreamOperator<WindowedValue<OutputT>>
     implements OneInputStreamOperator<WindowedValue<InputT>, WindowedValue<OutputT>>,
       TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, WindowedValue<OutputT>>,
     KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> {
 
-  protected DoFn<InputT, FnOutputT> doFn;
+  protected DoFn<InputT, OutputT> doFn;
 
   protected final SerializedPipelineOptions serializedOptions;
 
-  protected final TupleTag<FnOutputT> mainOutputTag;
+  protected final TupleTag<OutputT> mainOutputTag;
   protected final List<TupleTag<?>> additionalOutputTags;
 
   protected final Collection<PCollectionView<?>> sideInputs;
@@ -118,8 +118,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected final OutputManagerFactory<OutputT> outputManagerFactory;
 
-  protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
-  protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
+  protected transient DoFnRunner<InputT, OutputT> doFnRunner;
+  protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
 
   protected transient SideInputHandler sideInputHandler;
 
@@ -127,7 +127,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected transient DoFnRunners.OutputManager outputManager;
 
-  private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker;
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
 
   protected transient long currentInputWatermark;
 
@@ -156,10 +156,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   private transient Optional<Long> pushedBackWatermark;
 
   public DoFnOperator(
-      DoFn<InputT, FnOutputT> doFn,
+      DoFn<InputT, OutputT> doFn,
       String stepName,
       Coder<WindowedValue<InputT>> inputCoder,
-      TupleTag<FnOutputT> mainOutputTag,
+      TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       OutputManagerFactory<OutputT> outputManagerFactory,
       WindowingStrategy<?, ?> windowingStrategy,
@@ -192,7 +192,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   // allow overriding this in WindowDoFnOperator because this one dynamically creates
   // the DoFn
-  protected DoFn<InputT, FnOutputT> getDoFn() {
+  protected DoFn<InputT, OutputT> getDoFn() {
     return doFn;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 4ac2ff5..5d08eba 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -55,19 +55,19 @@ import org.joda.time.Instant;
  * the {@code @ProcessElement} method of a splittable {@link DoFn}.
  */
 public class SplittableDoFnOperator<
-    InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+    InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
     extends DoFnOperator<
-    KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT, OutputT> {
+    KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
 
   private transient ScheduledExecutorService executorService;
 
   public SplittableDoFnOperator(
-      DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn,
+      DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> doFn,
       String stepName,
       Coder<
           WindowedValue<
               KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
-      TupleTag<FnOutputT> mainOutputTag,
+      TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
       OutputManagerFactory<OutputT> outputManagerFactory,
       WindowingStrategy<?, ?> windowingStrategy,
@@ -120,10 +120,10 @@ public class SplittableDoFnOperator<
         new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
             doFn,
             serializedOptions.getPipelineOptions(),
-            new OutputWindowedValue<FnOutputT>() {
+            new OutputWindowedValue<OutputT>() {
               @Override
               public void outputWindowedValue(
-                  FnOutputT output,
+                  OutputT output,
                   Instant timestamp,
                   Collection<? extends BoundedWindow> windows,
                   PaneInfo pane) {

http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index ea578b9..78d585e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.operators.InternalTimer;
  * Flink operator for executing window {@link DoFn DoFns}.
  */
 public class WindowDoFnOperator<K, InputT, OutputT>
-    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, KV<K, OutputT>> {
+    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
 
   private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index bc0b1c2..d0281ec 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -136,7 +136,7 @@ public class PipelineOptionsTest {
 
   @Test(expected = Exception.class)
   public void parDoBaseClassPipelineOptionsNullTest() {
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new TestDoFn(),
         "stepName",
         WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
@@ -157,7 +157,7 @@ public class PipelineOptionsTest {
   @Test
   public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
 
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new TestDoFn(),
         "stepName",
         WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
@@ -173,7 +173,7 @@ public class PipelineOptionsTest {
     final byte[] serialized = SerializationUtils.serialize(doFnOperator);
 
     @SuppressWarnings("unchecked")
-    DoFnOperator<Object, Object, Object> deserialized = SerializationUtils.deserialize(serialized);
+    DoFnOperator<Object, Object> deserialized = SerializationUtils.deserialize(serialized);
 
     TypeInformation<WindowedValue<Object>> typeInformation = TypeInformation.of(
         new TypeHint<WindowedValue<Object>>() {});

http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 132242e..ad9d236 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -111,7 +111,7 @@ public class DoFnOperatorTest {
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new IdentityDoFn<String>(),
         "stepName",
         windowedValueCoder,
@@ -155,7 +155,7 @@ public class DoFnOperatorTest {
             .put(additionalOutput2, new OutputTag<String>(additionalOutput2.getId()){})
             .build();
 
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new MultiOutputDoFn(additionalOutput1, additionalOutput2),
         "stepName",
         windowedValueCoder,
@@ -223,7 +223,7 @@ public class DoFnOperatorTest {
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
-    DoFnOperator<Integer, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<Integer, String> doFnOperator = new DoFnOperator<>(
         fn,
         "stepName",
         windowedValueCoder,
@@ -335,8 +335,7 @@ public class DoFnOperatorTest {
 
     TupleTag<KV<String, Integer>> outputTag = new TupleTag<>("main-output");
 
-    DoFnOperator<
-        KV<String, Integer>, KV<String, Integer>, KV<String, Integer>> doFnOperator =
+    DoFnOperator<KV<String, Integer>, KV<String, Integer>> doFnOperator =
         new DoFnOperator<>(
             fn,
             "stepName",
@@ -433,7 +432,7 @@ public class DoFnOperatorTest {
       keyCoder = StringUtf8Coder.of();
     }
 
-    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
         new IdentityDoFn<String>(),
         "stepName",
         windowedValueCoder,