You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 22:02:26 UTC
[1/2] beam git commit: Removes ReduceFnExecutor interface
Repository: beam
Updated Branches:
refs/heads/master b3334879f -> 11c3cd70b
Removes ReduceFnExecutor interface
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8989473b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8989473b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8989473b
Branch: refs/heads/master
Commit: 8989473b8e379a40b888565aadead001379c9398
Parents: b333487
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jan 24 13:32:24 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jan 24 13:32:24 2017 -0800
----------------------------------------------------------------------
.../apache/beam/runners/core/DoFnRunner.java | 20 --------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 5 +----
.../beam/runners/direct/ParDoEvaluator.java | 2 --
.../runners/spark/translation/DoFnFunction.java | 2 --
.../spark/translation/MultiDoFnFunction.java | 2 --
5 files changed, 1 insertion(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 66f95db..b29adcc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -17,12 +17,10 @@
*/
package org.apache.beam.runners.core;
-import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
/**
@@ -51,22 +49,4 @@ public interface DoFnRunner<InputT, OutputT> {
* additional tasks, such as flushing in-memory states.
*/
void finishBundle();
-
- /**
- * An internal interface for signaling that a {@link OldDoFn} requires late data dropping.
- */
- public interface ReduceFnExecutor<K, InputT, OutputT, W> {
- /**
- * Gets this object as a {@link OldDoFn}.
- *
- * <p>Most implementors of this interface are expected to be {@link OldDoFn} instances, and will
- * return themselves.
- */
- OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
-
- /**
- * Returns an aggregator that tracks elements that are dropped due to being late.
- */
- Aggregator<Long, Long> getDroppedDueToLatenessAggregator();
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index ecce4fc..d0387cf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.core;
-import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.Aggregator;
@@ -37,7 +36,7 @@ import org.apache.beam.sdk.values.KV;
@SystemDoFnInternal
public class GroupAlsoByWindowViaWindowSetDoFn<
K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
- extends OldDoFn<RinT, KV<K, OutputT>> implements ReduceFnExecutor<K, InputT, OutputT, W> {
+ extends OldDoFn<RinT, KV<K, OutputT>> {
public static <K, InputT, OutputT, W extends BoundedWindow>
OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
@@ -95,7 +94,6 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
reduceFnRunner.persist();
}
- @Override
public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
// Safe contravariant cast
@SuppressWarnings("unchecked")
@@ -104,7 +102,6 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
return asFn;
}
- @Override
public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
return droppedDueToLateness;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 97d5360..48f0f8d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -73,8 +73,6 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
ReadyCheckingSideInputReader sideInputReader =
evaluationContext.createSideInputReader(sideInputs);
- // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
- // and window-exploded processing is achieved within the simple runner
DoFnRunner<InputT, OutputT> underlying =
DoFnRunners.simpleRunner(
evaluationContext.getPipelineOptions(),
http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index bd6cfbe..4fd5e51 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -81,8 +81,6 @@ public class DoFnFunction<InputT, OutputT>
DoFnOutputManager outputManager = new DoFnOutputManager();
- // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
- // and window-exploded processing is achieved within the simple runner
DoFnRunner<InputT, OutputT> doFnRunner =
DoFnRunners.simpleRunner(
runtimeContext.getPipelineOptions(),
http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index cceffc8..911e6c5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -88,8 +88,6 @@ public class MultiDoFnFunction<InputT, OutputT>
DoFnOutputManager outputManager = new DoFnOutputManager();
- // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
- // and window-exploded processing is achieved within the simple runner
DoFnRunner<InputT, OutputT> doFnRunner =
DoFnRunners.simpleRunner(
runtimeContext.getPipelineOptions(),
[2/2] beam git commit: This closes #1833: Removes ReduceFnExecutor
interface
Posted by ke...@apache.org.
This closes #1833: Removes ReduceFnExecutor interface
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/11c3cd70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/11c3cd70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/11c3cd70
Branch: refs/heads/master
Commit: 11c3cd70b784650e8b60a5660449cfafdba84bbf
Parents: b333487 8989473
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jan 24 13:48:23 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jan 24 13:48:23 2017 -0800
----------------------------------------------------------------------
.../apache/beam/runners/core/DoFnRunner.java | 20 --------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 5 +----
.../beam/runners/direct/ParDoEvaluator.java | 2 --
.../runners/spark/translation/DoFnFunction.java | 2 --
.../spark/translation/MultiDoFnFunction.java | 2 --
5 files changed, 1 insertion(+), 30 deletions(-)
----------------------------------------------------------------------