You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/17 17:53:11 UTC
[5/7] beam git commit: Removes some OldDoFn code from DoFnRunners
Removes some OldDoFn code from DoFnRunners
DoFnRunners.createDefault() can be replaced with simpleRunner()
at the existing call sites, since it is never called with a
ReduceFnExecutor at those call sites.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b26ec89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b26ec89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b26ec89
Branch: refs/heads/master
Commit: 2b26ec8934725a600954ced9c4063766a582396a
Parents: 149d52b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jan 12 13:10:40 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jan 13 14:34:23 2017 -0800
----------------------------------------------------------------------
.../operators/ApexParDoOperator.java | 2 +-
.../apache/beam/runners/core/DoFnRunners.java | 137 +------------------
.../beam/runners/direct/ParDoEvaluator.java | 9 +-
.../wrappers/streaming/DoFnOperator.java | 2 +-
.../beam/runners/dataflow/util/DoFnInfo.java | 62 ++++-----
.../runners/spark/translation/DoFnFunction.java | 11 +-
.../spark/translation/MultiDoFnFunction.java | 9 +-
.../sdk/transforms/reflect/DoFnInvokers.java | 17 +--
8 files changed, 55 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 1a3387c..de4c15d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -305,7 +305,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
sideOutputPortMapping.put(sideOutputTags.get(i), port);
}
- DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault(
+ DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
pipelineOptions.get(),
doFn,
sideInputReader,
http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 820bfcd..2f3e93c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -18,9 +18,7 @@
package org.apache.beam.runners.core;
import java.util.List;
-import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
@@ -53,7 +51,7 @@ public class DoFnRunners {
* compressed {@link WindowedValue}. It is the responsibility of the runner to perform any key
* partitioning needed, etc.
*/
- static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
+ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
PipelineOptions options,
DoFn<InputT, OutputT> fn,
SideInputReader sideInputReader,
@@ -119,137 +117,4 @@ public class DoFnRunners {
stepContext.timerInternals(),
droppedDueToLatenessAggregator);
}
-
- /**
- * Creates a {@link DoFnRunner} for the provided {@link DoFn}.
- */
- public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
- PipelineOptions options,
- DoFn<InputT, OutputT> doFn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- StepContext stepContext,
- AggregatorFactory aggregatorFactory,
- WindowingStrategy<?, ?> windowingStrategy) {
-
- // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
- // and window-exploded processing is achieved within the simple runner
- return simpleRunner(
- options,
- doFn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- aggregatorFactory,
- windowingStrategy);
- }
-
- /**
- * Creates a {@link DoFnRunner} for the provided {@link OldDoFn}.
- *
- * <p>In particular, if the {@link OldDoFn} is a {@link ReduceFnExecutor}, a specialized
- * implementation detail of streaming {@link GroupAlsoByWindow}, then it will create a special
- * runner that operates on {@link KeyedWorkItem KeyedWorkItems}, drops late data and counts
- * dropped elements.
- *
- * @deprecated please port uses of {@link OldDoFn} to use {@link DoFn}
- */
- @Deprecated
- public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
- PipelineOptions options,
- OldDoFn<InputT, OutputT> doFn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- StepContext stepContext,
- AggregatorFactory aggregatorFactory,
- WindowingStrategy<?, ?> windowingStrategy) {
-
- DoFnRunner<InputT, OutputT> doFnRunner = simpleRunner(
- options,
- doFn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- aggregatorFactory,
- windowingStrategy);
-
- if (!(doFn instanceof ReduceFnExecutor)) {
- return doFnRunner;
- } else {
- // When a DoFn is a ReduceFnExecutor, we know it has to have an aggregator for dropped
- // elements and we also learn that for some K and V,
- // InputT = KeyedWorkItem<K, V>
- // OutputT = KV<K, V>
-
- Aggregator<Long, Long> droppedDueToLatenessAggregator =
- ((ReduceFnExecutor<?, ?, ?, ?>) doFn).getDroppedDueToLatenessAggregator();
-
- @SuppressWarnings({"unchecked", "cast", "rawtypes"})
- DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
- (DoFnRunner) doFnRunner,
- stepContext,
- (WindowingStrategy) windowingStrategy,
- droppedDueToLatenessAggregator);
-
- return runner;
- }
- }
-
- /**
- * Creates the right kind of {@link DoFnRunner} for an object that can be either a {@link DoFn} or
- * {@link OldDoFn}. This can be used so that the client need not explicitly reference either such
- * class, but merely deserialize a payload and pass it to this method.
- *
- * @deprecated for migration purposes only for services where users may still submit either {@link
- * OldDoFn} or {@link DoFn}. If you know that you have a {@link DoFn} then you should use the
- * variant for that instead.
- */
- @Deprecated
- public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
- PipelineOptions options,
- Object deserializedFn,
- SideInputReader sideInputReader,
- OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- StepContext stepContext,
- AggregatorFactory aggregatorFactory,
- WindowingStrategy<?, ?> windowingStrategy) {
- if (deserializedFn instanceof DoFn) {
- return createDefault(
- options,
- (DoFn) deserializedFn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- aggregatorFactory,
- windowingStrategy);
- } else if (deserializedFn instanceof OldDoFn) {
- return createDefault(
- options,
- (OldDoFn) deserializedFn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- aggregatorFactory,
- windowingStrategy);
- } else {
- throw new IllegalArgumentException(String.format("Cannot create %s for %s of class %s",
- DoFnRunner.class.getSimpleName(),
- deserializedFn,
- deserializedFn.getClass()));
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index e146470..97d5360 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.direct;
import com.google.common.collect.ImmutableList;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -30,6 +29,7 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
@@ -47,7 +47,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
DirectStepContext stepContext,
AppliedPTransform<?, ?, ?> application,
WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy,
- Serializable fn, // may be OldDoFn or DoFn
+ DoFn<InputT, OutputT> fn,
StructuralKey<?> key,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
@@ -72,8 +72,11 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
ReadyCheckingSideInputReader sideInputReader =
evaluationContext.createSideInputReader(sideInputs);
+
+ // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
+ // and window-exploded processing is achieved within the simple runner
DoFnRunner<InputT, OutputT> underlying =
- DoFnRunners.createDefault(
+ DoFnRunners.simpleRunner(
evaluationContext.getPipelineOptions(),
fn,
sideInputReader,
http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 95f2bfd..90cdf4c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -244,7 +244,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
sideInputReader = sideInputHandler;
}
- DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.createDefault(
+ DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner(
serializedOptions.getPipelineOptions(),
oldDoFn,
sideInputReader,
http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index b84def8..0c5be90 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.dataflow.util;
-import static com.google.common.base.Preconditions.checkState;
-
import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
@@ -29,14 +27,13 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
/**
- * Wrapper class holding the necessary information to serialize a {@link OldDoFn}
- * or {@link DoFn}.
+ * Wrapper class holding the necessary information to serialize a {@link DoFn}.
*
- * @param <InputT> the type of the (main) input elements of the {@link OldDoFn}
- * @param <OutputT> the type of the (main) output elements of the {@link OldDoFn}
+ * @param <InputT> the type of the (main) input elements of the {@link DoFn}
+ * @param <OutputT> the type of the (main) output elements of the {@link DoFn}
*/
public class DoFnInfo<InputT, OutputT> implements Serializable {
- private final Serializable doFn;
+ private final DoFn<InputT, OutputT> doFn;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Iterable<PCollectionView<?>> sideInputViews;
private final Coder<InputT> inputCoder;
@@ -48,17 +45,37 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
* {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob.
*/
public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
+ DoFn<InputT, OutputT> doFn,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Iterable<PCollectionView<?>> sideInputViews,
+ Coder<InputT> inputCoder,
+ long mainOutput,
+ Map<Long, TupleTag<?>> outputMap) {
+ return new DoFnInfo<>(
+ doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap);
+ }
+
+ /** TODO: remove this when Dataflow worker uses the DoFn overload. */
+ @Deprecated
+ @SuppressWarnings("unchecked")
+ public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
Serializable doFn,
WindowingStrategy<?, ?> windowingStrategy,
Iterable<PCollectionView<?>> sideInputViews,
Coder<InputT> inputCoder,
long mainOutput,
Map<Long, TupleTag<?>> outputMap) {
- return new DoFnInfo(doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap);
+ return forFn(
+ (DoFn<InputT, OutputT>) doFn,
+ windowingStrategy,
+ sideInputViews,
+ inputCoder,
+ mainOutput,
+ outputMap);
}
private DoFnInfo(
- Serializable doFn,
+ DoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
Iterable<PCollectionView<?>> sideInputViews,
Coder<InputT> inputCoder,
@@ -72,34 +89,15 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
this.outputMap = outputMap;
}
- /**
- * @deprecated use {@link #forFn}.
- */
+ /** TODO: remove this when Dataflow worker uses {@link #getDoFn}. */
@Deprecated
- public DoFnInfo(
- OldDoFn doFn,
- WindowingStrategy<?, ?> windowingStrategy,
- Iterable<PCollectionView<?>> sideInputViews,
- Coder<InputT> inputCoder,
- long mainOutput,
- Map<Long, TupleTag<?>> outputMap) {
- this((Serializable) doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap);
- }
-
- /** Returns the embedded serialized function. It may be a {@code DoFn} or {@code OldDoFn}. */
public Serializable getFn() {
return doFn;
}
- /** @deprecated use {@link #getFn()} */
- @Deprecated
- public OldDoFn getDoFn() {
- checkState(
- doFn instanceof OldDoFn,
- "Deprecated %s.getDoFn() called when the payload was actually a new %s",
- DoFnInfo.class.getSimpleName(),
- DoFn.class.getSimpleName());
- return (OldDoFn) doFn;
+ /** Returns the embedded function. */
+ public DoFn<InputT, OutputT> getDoFn() {
+ return doFn;
}
public WindowingStrategy<?, ?> getWindowingStrategy() {
http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index af8e089..bd6cfbe 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -80,18 +80,21 @@ public class DoFnFunction<InputT, OutputT>
Iterator<WindowedValue<InputT>> iter) throws Exception {
DoFnOutputManager outputManager = new DoFnOutputManager();
+
+ // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
+ // and window-exploded processing is achieved within the simple runner
DoFnRunner<InputT, OutputT> doFnRunner =
- DoFnRunners.createDefault(
+ DoFnRunners.simpleRunner(
runtimeContext.getPipelineOptions(),
doFn,
new SparkSideInputReader(sideInputs),
outputManager,
- new TupleTag<OutputT>() {},
+ new TupleTag<OutputT>() {
+ },
Collections.<TupleTag<?>>emptyList(),
new SparkProcessContext.NoOpStepContext(),
new SparkAggregators.Factory(runtimeContext, accumulator),
- windowingStrategy
- );
+ windowingStrategy);
return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 0f9417a..cceffc8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-
import scala.Tuple2;
@@ -88,8 +87,11 @@ public class MultiDoFnFunction<InputT, OutputT>
Iterator<WindowedValue<InputT>> iter) throws Exception {
DoFnOutputManager outputManager = new DoFnOutputManager();
+
+ // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
+ // and window-exploded processing is achieved within the simple runner
DoFnRunner<InputT, OutputT> doFnRunner =
- DoFnRunners.createDefault(
+ DoFnRunners.simpleRunner(
runtimeContext.getPipelineOptions(),
doFn,
new SparkSideInputReader(sideInputs),
@@ -98,8 +100,7 @@ public class MultiDoFnFunction<InputT, OutputT>
Collections.<TupleTag<?>>emptyList(),
new SparkProcessContext.NoOpStepContext(),
new SparkAggregators.Factory(runtimeContext, accumulator),
- windowingStrategy
- );
+ windowingStrategy);
return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2b26ec89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index b141d51..33c5a6a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -36,21 +36,12 @@ public class DoFnInvokers {
return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn);
}
- /**
- * Temporarily retained for compatibility with Dataflow worker.
- * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}.
- *
- * @deprecated Use {@link #invokerFor(DoFn)}.
- */
- @SuppressWarnings("unchecked")
+ /** TODO: remove this when Dataflow worker uses the DoFn overload. */
@Deprecated
+ @SuppressWarnings({"unchecked"})
public static <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(
- Serializable deserializedFn) {
- if (deserializedFn instanceof DoFn) {
- return invokerFor((DoFn<InputT, OutputT>) deserializedFn);
- }
- throw new UnsupportedOperationException(
- "Only DoFn supported, was: " + deserializedFn.getClass());
+ Serializable fn) {
+ return invokerFor((DoFn) fn);
}
private DoFnInvokers() {}