You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:34:57 UTC
[08/50] beam git commit: Remove the FnOutputT parameter from
DoFnOperator
Remove the FnOutputT parameter from DoFnOperator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e8f26085
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e8f26085
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e8f26085
Branch: refs/heads/DSL_SQL
Commit: e8f26085e889f8f618c0961a5458cbc42b432c01
Parents: b0601fd
Author: JingsongLi <lz...@aliyun.com>
Authored: Tue Jun 6 17:31:09 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 6 14:33:36 2017 +0200
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 10 +++++-----
.../wrappers/streaming/DoFnOperator.java | 20 ++++++++++----------
.../streaming/SplittableDoFnOperator.java | 12 ++++++------
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
.../beam/runners/flink/PipelineOptionsTest.java | 6 +++---
.../flink/streaming/DoFnOperatorTest.java | 11 +++++------
6 files changed, 30 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index d8c3049..2a7c5d6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -332,7 +332,7 @@ class FlinkStreamingTransformTranslators {
static class ParDoTranslationHelper {
interface DoFnOperatorFactory<InputT, OutputT> {
- DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator(
+ DoFnOperator<InputT, OutputT> createDoFnOperator(
DoFn<InputT, OutputT> doFn,
String stepName,
List<PCollectionView<?>> sideInputs,
@@ -395,7 +395,7 @@ class FlinkStreamingTransformTranslators {
context.getCoder((PCollection<OutputT>) outputs.get(mainOutputTag)));
if (sideInputs.isEmpty()) {
- DoFnOperator<InputT, OutputT, OutputT> doFnOperator =
+ DoFnOperator<InputT, OutputT> doFnOperator =
doFnOperatorFactory.createDoFnOperator(
doFn,
context.getCurrentTransform().getFullName(),
@@ -416,7 +416,7 @@ class FlinkStreamingTransformTranslators {
Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
transformSideInputs(sideInputs, context);
- DoFnOperator<InputT, OutputT, OutputT> doFnOperator =
+ DoFnOperator<InputT, OutputT> doFnOperator =
doFnOperatorFactory.createDoFnOperator(
doFn,
context.getCurrentTransform().getFullName(),
@@ -493,7 +493,7 @@ class FlinkStreamingTransformTranslators {
context,
new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
@Override
- public DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator(
+ public DoFnOperator<InputT, OutputT> createDoFnOperator(
DoFn<InputT, OutputT> doFn,
String stepName,
List<PCollectionView<?>> sideInputs,
@@ -547,7 +547,7 @@ class FlinkStreamingTransformTranslators {
@Override
public DoFnOperator<
KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
- OutputT, OutputT> createDoFnOperator(
+ OutputT> createDoFnOperator(
DoFn<
KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
OutputT> doFn,
http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8c27ed9..350f323 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -94,21 +94,21 @@ import org.joda.time.Instant;
* Flink operator for executing {@link DoFn DoFns}.
*
* @param <InputT> the input type of the {@link DoFn}
- * @param <FnOutputT> the output type of the {@link DoFn}
+ * @param <OutputT> the output type of the {@link DoFn}
* @param <OutputT> the output type of the operator, this can be different from the fn output
* type when we have additional tagged outputs
*/
-public class DoFnOperator<InputT, FnOutputT, OutputT>
+public class DoFnOperator<InputT, OutputT>
extends AbstractStreamOperator<WindowedValue<OutputT>>
implements OneInputStreamOperator<WindowedValue<InputT>, WindowedValue<OutputT>>,
TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, WindowedValue<OutputT>>,
KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> {
- protected DoFn<InputT, FnOutputT> doFn;
+ protected DoFn<InputT, OutputT> doFn;
protected final SerializedPipelineOptions serializedOptions;
- protected final TupleTag<FnOutputT> mainOutputTag;
+ protected final TupleTag<OutputT> mainOutputTag;
protected final List<TupleTag<?>> additionalOutputTags;
protected final Collection<PCollectionView<?>> sideInputs;
@@ -118,8 +118,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected final OutputManagerFactory<OutputT> outputManagerFactory;
- protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
- protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
+ protected transient DoFnRunner<InputT, OutputT> doFnRunner;
+ protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
protected transient SideInputHandler sideInputHandler;
@@ -127,7 +127,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected transient DoFnRunners.OutputManager outputManager;
- private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker;
+ private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
protected transient long currentInputWatermark;
@@ -156,10 +156,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
private transient Optional<Long> pushedBackWatermark;
public DoFnOperator(
- DoFn<InputT, FnOutputT> doFn,
+ DoFn<InputT, OutputT> doFn,
String stepName,
Coder<WindowedValue<InputT>> inputCoder,
- TupleTag<FnOutputT> mainOutputTag,
+ TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
OutputManagerFactory<OutputT> outputManagerFactory,
WindowingStrategy<?, ?> windowingStrategy,
@@ -192,7 +192,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
// allow overriding this in WindowDoFnOperator because this one dynamically creates
// the DoFn
- protected DoFn<InputT, FnOutputT> getDoFn() {
+ protected DoFn<InputT, OutputT> getDoFn() {
return doFn;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 4ac2ff5..5d08eba 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -55,19 +55,19 @@ import org.joda.time.Instant;
* the {@code @ProcessElement} method of a splittable {@link DoFn}.
*/
public class SplittableDoFnOperator<
- InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+ InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
extends DoFnOperator<
- KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT, OutputT> {
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
private transient ScheduledExecutorService executorService;
public SplittableDoFnOperator(
- DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn,
+ DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> doFn,
String stepName,
Coder<
WindowedValue<
KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
- TupleTag<FnOutputT> mainOutputTag,
+ TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
OutputManagerFactory<OutputT> outputManagerFactory,
WindowingStrategy<?, ?> windowingStrategy,
@@ -120,10 +120,10 @@ public class SplittableDoFnOperator<
new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
doFn,
serializedOptions.getPipelineOptions(),
- new OutputWindowedValue<FnOutputT>() {
+ new OutputWindowedValue<OutputT>() {
@Override
public void outputWindowedValue(
- FnOutputT output,
+ OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index ea578b9..78d585e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.operators.InternalTimer;
* Flink operator for executing window {@link DoFn DoFns}.
*/
public class WindowDoFnOperator<K, InputT, OutputT>
- extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, KV<K, OutputT>> {
+ extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index bc0b1c2..d0281ec 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -136,7 +136,7 @@ public class PipelineOptionsTest {
@Test(expected = Exception.class)
public void parDoBaseClassPipelineOptionsNullTest() {
- DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
new TestDoFn(),
"stepName",
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
@@ -157,7 +157,7 @@ public class PipelineOptionsTest {
@Test
public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
- DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
new TestDoFn(),
"stepName",
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
@@ -173,7 +173,7 @@ public class PipelineOptionsTest {
final byte[] serialized = SerializationUtils.serialize(doFnOperator);
@SuppressWarnings("unchecked")
- DoFnOperator<Object, Object, Object> deserialized = SerializationUtils.deserialize(serialized);
+ DoFnOperator<Object, Object> deserialized = SerializationUtils.deserialize(serialized);
TypeInformation<WindowedValue<Object>> typeInformation = TypeInformation.of(
new TypeHint<WindowedValue<Object>>() {});
http://git-wip-us.apache.org/repos/asf/beam/blob/e8f26085/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 132242e..ad9d236 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -111,7 +111,7 @@ public class DoFnOperatorTest {
TupleTag<String> outputTag = new TupleTag<>("main-output");
- DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
new IdentityDoFn<String>(),
"stepName",
windowedValueCoder,
@@ -155,7 +155,7 @@ public class DoFnOperatorTest {
.put(additionalOutput2, new OutputTag<String>(additionalOutput2.getId()){})
.build();
- DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
new MultiOutputDoFn(additionalOutput1, additionalOutput2),
"stepName",
windowedValueCoder,
@@ -223,7 +223,7 @@ public class DoFnOperatorTest {
TupleTag<String> outputTag = new TupleTag<>("main-output");
- DoFnOperator<Integer, String, String> doFnOperator = new DoFnOperator<>(
+ DoFnOperator<Integer, String> doFnOperator = new DoFnOperator<>(
fn,
"stepName",
windowedValueCoder,
@@ -335,8 +335,7 @@ public class DoFnOperatorTest {
TupleTag<KV<String, Integer>> outputTag = new TupleTag<>("main-output");
- DoFnOperator<
- KV<String, Integer>, KV<String, Integer>, KV<String, Integer>> doFnOperator =
+ DoFnOperator<KV<String, Integer>, KV<String, Integer>> doFnOperator =
new DoFnOperator<>(
fn,
"stepName",
@@ -433,7 +432,7 @@ public class DoFnOperatorTest {
keyCoder = StringUtf8Coder.of();
}
- DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
new IdentityDoFn<String>(),
"stepName",
windowedValueCoder,