You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/15 22:29:02 UTC
[02/10] incubator-beam git commit: Pushes uses of OldDoFn deeper
inside Flink runner
Pushes uses of OldDoFn deeper inside Flink runner
In particular, various DoFnOperator's now take a regular DoFn
rather than an OldDoFn, and convert it to an OldDoFn internally.
This allows to remove uses of ParDo.getFn() returning OldDoFn.
The only case where the OldDoFn inside a DoFnOperator is actually an
OldDoFn rather than DoFn in disguise is now WindowDoFnOperator, which
overrides getDoFn to return an actual GABW OldDoFn.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8330bfa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8330bfa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8330bfa7
Branch: refs/heads/master
Commit: 8330bfa74cd72e51a29649745e87a4f1a6e5ffa1
Parents: af616d9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 16:47:01 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:55:24 2016 -0800
----------------------------------------------------------------------
.../FlinkBatchTransformTranslators.java | 9 +---
.../FlinkStreamingTransformTranslators.java | 8 ++--
.../functions/FlinkDoFnFunction.java | 10 +++--
.../functions/FlinkMultiOutputDoFnFunction.java | 10 +++--
.../wrappers/streaming/DoFnOperator.java | 43 ++++++++++++++++----
.../wrappers/streaming/WindowDoFnOperator.java | 8 ++--
.../beam/runners/flink/PipelineOptionsTest.java | 5 +--
.../flink/streaming/DoFnOperatorTest.java | 8 ++--
8 files changed, 63 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 9ac907f..497b293 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -50,7 +50,6 @@ import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
@@ -523,8 +522,6 @@ class FlinkBatchTransformTranslators {
DataSet<WindowedValue<InputT>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
- final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
-
TypeInformation<WindowedValue<OutputT>> typeInformation =
context.getTypeInfo(context.getOutput(transform));
@@ -539,7 +536,7 @@ class FlinkBatchTransformTranslators {
FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
new FlinkDoFnFunction<>(
- oldDoFn,
+ doFn,
context.getOutput(transform).getWindowingStrategy(),
sideInputStrategies,
context.getPipelineOptions());
@@ -570,8 +567,6 @@ class FlinkBatchTransformTranslators {
DataSet<WindowedValue<InputT>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
- final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
-
Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
@@ -618,7 +613,7 @@ class FlinkBatchTransformTranslators {
@SuppressWarnings("unchecked")
FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper =
new FlinkMultiOutputDoFnFunction(
- oldDoFn,
+ doFn,
windowingStrategy,
sideInputStrategies,
context.getPipelineOptions(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 042f8df..42ef630 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -358,7 +358,7 @@ public class FlinkStreamingTransformTranslators {
if (sideInputs.isEmpty()) {
DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
new DoFnOperator<>(
- transform.getFn(),
+ transform.getNewFn(),
inputTypeInfo,
new TupleTag<OutputT>("main output"),
Collections.<TupleTag<?>>emptyList(),
@@ -381,7 +381,7 @@ public class FlinkStreamingTransformTranslators {
DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
new DoFnOperator<>(
- transform.getFn(),
+ transform.getNewFn(),
inputTypeInfo,
new TupleTag<OutputT>("main output"),
Collections.<TupleTag<?>>emptyList(),
@@ -515,7 +515,7 @@ public class FlinkStreamingTransformTranslators {
if (sideInputs.isEmpty()) {
DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
new DoFnOperator<>(
- transform.getFn(),
+ transform.getNewFn(),
inputTypeInfo,
transform.getMainOutputTag(),
transform.getSideOutputTags().getAll(),
@@ -542,7 +542,7 @@ public class FlinkStreamingTransformTranslators {
DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
new DoFnOperator<>(
- transform.getFn(),
+ transform.getNewFn(),
inputTypeInfo,
transform.getMainOutputTag(),
transform.getSideOutputTags().getAll(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index db045f5..ed200d5 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -20,7 +20,10 @@ package org.apache.beam.runners.flink.translation.functions;
import java.util.Map;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
@@ -46,16 +49,17 @@ public class FlinkDoFnFunction<InputT, OutputT>
private final WindowingStrategy<?, ?> windowingStrategy;
public FlinkDoFnFunction(
- OldDoFn<InputT, OutputT> doFn,
+ DoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions options) {
- this.doFn = doFn;
+ this.doFn = DoFnAdapters.toOldDoFn(doFn);
this.sideInputs = sideInputs;
this.serializedOptions = new SerializedPipelineOptions(options);
this.windowingStrategy = windowingStrategy;
- this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
+ this.requiresWindowAccess =
+ DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow();
this.hasSideInputs = !sideInputs.isEmpty();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index 7be4bb4..7f6a436 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -20,8 +20,11 @@ package org.apache.beam.runners.flink.translation.functions;
import java.util.Map;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
@@ -54,16 +57,17 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
private final WindowingStrategy<?, ?> windowingStrategy;
public FlinkMultiOutputDoFnFunction(
- OldDoFn<InputT, OutputT> doFn,
+ DoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions options,
Map<TupleTag<?>, Integer> outputMap) {
- this.doFn = doFn;
+ this.doFn = DoFnAdapters.toOldDoFn(doFn);
this.serializedOptions = new SerializedPipelineOptions(options);
this.outputMap = outputMap;
- this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
+ this.requiresWindowAccess =
+ DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow();
this.hasSideInputs = !sideInputs.isEmpty();
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index a29664b..6729aaa 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -88,7 +89,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT> {
- protected OldDoFn<InputT, FnOutputT> doFn;
+ protected OldDoFn<InputT, FnOutputT> oldDoFn;
+
protected final SerializedPipelineOptions serializedOptions;
protected final TupleTag<FnOutputT> mainOutputTag;
@@ -117,8 +119,9 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> restoredSideInputState;
+ @Deprecated
public DoFnOperator(
- OldDoFn<InputT, FnOutputT> doFn,
+ OldDoFn<InputT, FnOutputT> oldDoFn,
TypeInformation<WindowedValue<InputT>> inputType,
TupleTag<FnOutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
@@ -127,7 +130,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
Map<Integer, PCollectionView<?>> sideInputTagMapping,
Collection<PCollectionView<?>> sideInputs,
PipelineOptions options) {
- this.doFn = doFn;
+ this.oldDoFn = oldDoFn;
this.mainOutputTag = mainOutputTag;
this.sideOutputTags = sideOutputTags;
this.sideInputTagMapping = sideInputTagMapping;
@@ -148,21 +151,43 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
setChainingStrategy(ChainingStrategy.ALWAYS);
}
+ public DoFnOperator(
+ DoFn<InputT, FnOutputT> doFn,
+ TypeInformation<WindowedValue<InputT>> inputType,
+ TupleTag<FnOutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ OutputManagerFactory<OutputT> outputManagerFactory,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<Integer, PCollectionView<?>> sideInputTagMapping,
+ Collection<PCollectionView<?>> sideInputs,
+ PipelineOptions options) {
+ this(
+ DoFnAdapters.toOldDoFn(doFn),
+ inputType,
+ mainOutputTag,
+ sideOutputTags,
+ outputManagerFactory,
+ windowingStrategy,
+ sideInputTagMapping,
+ sideInputs,
+ options);
+ }
+
protected ExecutionContext.StepContext createStepContext() {
return new StepContext();
}
// allow overriding this in WindowDoFnOperator because this one dynamically creates
// the DoFn
- protected OldDoFn<InputT, FnOutputT> getDoFn() {
- return doFn;
+ protected OldDoFn<InputT, FnOutputT> getOldDoFn() {
+ return oldDoFn;
}
@Override
public void open() throws Exception {
super.open();
- this.doFn = getDoFn();
+ this.oldDoFn = getOldDoFn();
currentInputWatermark = Long.MIN_VALUE;
currentOutputWatermark = currentInputWatermark;
@@ -220,7 +245,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.createDefault(
serializedOptions.getPipelineOptions(),
- doFn,
+ oldDoFn,
sideInputReader,
outputManagerFactory.create(output),
mainOutputTag,
@@ -232,13 +257,13 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
pushbackDoFnRunner =
PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
- doFn.setup();
+ oldDoFn.setup();
}
@Override
public void close() throws Exception {
super.close();
- doFn.teardown();
+ oldDoFn.teardown();
}
protected final long getPushbackWatermarkHold() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index f2d7f1c..9cea529 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -106,7 +106,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
PipelineOptions options,
Coder<K> keyCoder) {
super(
- null,
+ (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) null,
inputType,
mainOutputTag,
sideOutputTags,
@@ -124,7 +124,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
}
@Override
- protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
+ protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getOldDoFn() {
StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
@Override
public StateInternals<K> stateInternalsForKey(K key) {
@@ -138,10 +138,10 @@ public class WindowDoFnOperator<K, InputT, OutputT>
// has the window type as generic parameter while WindowingStrategy is almost always
// untyped.
@SuppressWarnings("unchecked")
- OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
+ OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> oldDoFn =
GroupAlsoByWindowViaWindowSetDoFn.create(
windowingStrategy, stateInternalsFactory, (SystemReduceFn) systemReduceFn);
- return doFn;
+ return oldDoFn;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index e44a705..4c97cc7 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -99,7 +98,7 @@ public class PipelineOptionsTest {
@Test(expected = Exception.class)
public void parDoBaseClassPipelineOptionsNullTest() {
DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
- DoFnAdapters.toOldDoFn(new TestDoFn()),
+ new TestDoFn(),
TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
new TupleTag<>("main-output"),
Collections.<TupleTag<?>>emptyList(),
@@ -118,7 +117,7 @@ public class PipelineOptionsTest {
public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
- DoFnAdapters.toOldDoFn(new TestDoFn()),
+ new TestDoFn(),
TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
new TupleTag<>("main-output"),
Collections.<TupleTag<?>>emptyList(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 65e244a..113802d 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -35,8 +35,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PCollectionViewTesting;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -97,7 +95,7 @@ public class DoFnOperatorTest {
TupleTag<String> outputTag = new TupleTag<>("main-output");
DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
- DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()),
+ new IdentityDoFn<String>(),
coderTypeInfo,
outputTag,
Collections.<TupleTag<?>>emptyList(),
@@ -141,7 +139,7 @@ public class DoFnOperatorTest {
.build();
DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
- DoFnAdapters.toOldDoFn(new MultiOutputDoFn(sideOutput1, sideOutput2)),
+ new MultiOutputDoFn(sideOutput1, sideOutput2),
coderTypeInfo,
mainOutput,
ImmutableList.<TupleTag<?>>of(sideOutput1, sideOutput2),
@@ -201,7 +199,7 @@ public class DoFnOperatorTest {
.build();
DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
- DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()),
+ new IdentityDoFn<String>(),
coderTypeInfo,
outputTag,
Collections.<TupleTag<?>>emptyList(),