You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 22:02:26 UTC

[1/2] beam git commit: Removes ReduceFnExecutor interface

Repository: beam
Updated Branches:
  refs/heads/master b3334879f -> 11c3cd70b


Removes ReduceFnExecutor interface


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8989473b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8989473b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8989473b

Branch: refs/heads/master
Commit: 8989473b8e379a40b888565aadead001379c9398
Parents: b333487
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jan 24 13:32:24 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jan 24 13:32:24 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunner.java    | 20 --------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  5 +----
 .../beam/runners/direct/ParDoEvaluator.java     |  2 --
 .../runners/spark/translation/DoFnFunction.java |  2 --
 .../spark/translation/MultiDoFnFunction.java    |  2 --
 5 files changed, 1 insertion(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 66f95db..b29adcc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -17,12 +17,10 @@
  */
 package org.apache.beam.runners.core;
 
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.joda.time.Instant;
 
 /**
@@ -51,22 +49,4 @@ public interface DoFnRunner<InputT, OutputT> {
    * additional tasks, such as flushing in-memory states.
    */
   void finishBundle();
-
-  /**
-   * An internal interface for signaling that a {@link OldDoFn} requires late data dropping.
-   */
-  public interface ReduceFnExecutor<K, InputT, OutputT, W> {
-    /**
-     * Gets this object as a {@link OldDoFn}.
-     *
-     * <p>Most implementors of this interface are expected to be {@link OldDoFn} instances, and will
-     * return themselves.
-     */
-    OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
-
-    /**
-     * Returns an aggregator that tracks elements that are dropped due to being late.
-     */
-    Aggregator<Long, Long> getDroppedDueToLatenessAggregator();
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index ecce4fc..d0387cf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -37,7 +36,7 @@ import org.apache.beam.sdk.values.KV;
 @SystemDoFnInternal
 public class GroupAlsoByWindowViaWindowSetDoFn<
         K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
-    extends OldDoFn<RinT, KV<K, OutputT>> implements ReduceFnExecutor<K, InputT, OutputT, W> {
+    extends OldDoFn<RinT, KV<K, OutputT>> {
 
   public static <K, InputT, OutputT, W extends BoundedWindow>
       OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
@@ -95,7 +94,6 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
     reduceFnRunner.persist();
   }
 
-  @Override
   public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
     // Safe contravariant cast
     @SuppressWarnings("unchecked")
@@ -104,7 +102,6 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
     return asFn;
   }
 
-  @Override
   public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
     return droppedDueToLateness;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 97d5360..48f0f8d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -73,8 +73,6 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
     ReadyCheckingSideInputReader sideInputReader =
         evaluationContext.createSideInputReader(sideInputs);
 
-    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
-    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> underlying =
         DoFnRunners.simpleRunner(
             evaluationContext.getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index bd6cfbe..4fd5e51 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -81,8 +81,6 @@ public class DoFnFunction<InputT, OutputT>
 
     DoFnOutputManager outputManager = new DoFnOutputManager();
 
-    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
-    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> doFnRunner =
         DoFnRunners.simpleRunner(
             runtimeContext.getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index cceffc8..911e6c5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -88,8 +88,6 @@ public class MultiDoFnFunction<InputT, OutputT>
 
     DoFnOutputManager outputManager = new DoFnOutputManager();
 
-    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
-    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> doFnRunner =
         DoFnRunners.simpleRunner(
             runtimeContext.getPipelineOptions(),


[2/2] beam git commit: This closes #1833: Removes ReduceFnExecutor interface

Posted by ke...@apache.org.
This closes #1833: Removes ReduceFnExecutor interface


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/11c3cd70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/11c3cd70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/11c3cd70

Branch: refs/heads/master
Commit: 11c3cd70b784650e8b60a5660449cfafdba84bbf
Parents: b333487 8989473
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jan 24 13:48:23 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jan 24 13:48:23 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunner.java    | 20 --------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  5 +----
 .../beam/runners/direct/ParDoEvaluator.java     |  2 --
 .../runners/spark/translation/DoFnFunction.java |  2 --
 .../spark/translation/MultiDoFnFunction.java    |  2 --
 5 files changed, 1 insertion(+), 30 deletions(-)
----------------------------------------------------------------------