You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/15 22:29:02 UTC

[02/10] incubator-beam git commit: Pushes uses of OldDoFn deeper inside Flink runner

Pushes uses of OldDoFn deeper inside Flink runner

In particular, various DoFnOperator's now take a regular DoFn
rather than an OldDoFn, and convert it to an OldDoFn internally.

This allows to remove uses of ParDo.getFn() returning OldDoFn.

The only case where the OldDoFn inside a DoFnOperator is actually an
OldDoFn rather than DoFn in disguise is now WindowDoFnOperator, which
overrides getDoFn to return an actual GABW OldDoFn.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8330bfa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8330bfa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8330bfa7

Branch: refs/heads/master
Commit: 8330bfa74cd72e51a29649745e87a4f1a6e5ffa1
Parents: af616d9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 16:47:01 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:55:24 2016 -0800

----------------------------------------------------------------------
 .../FlinkBatchTransformTranslators.java         |  9 +---
 .../FlinkStreamingTransformTranslators.java     |  8 ++--
 .../functions/FlinkDoFnFunction.java            | 10 +++--
 .../functions/FlinkMultiOutputDoFnFunction.java | 10 +++--
 .../wrappers/streaming/DoFnOperator.java        | 43 ++++++++++++++++----
 .../wrappers/streaming/WindowDoFnOperator.java  |  8 ++--
 .../beam/runners/flink/PipelineOptionsTest.java |  5 +--
 .../flink/streaming/DoFnOperatorTest.java       |  8 ++--
 8 files changed, 63 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 9ac907f..497b293 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -50,7 +50,6 @@ import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -523,8 +522,6 @@ class FlinkBatchTransformTranslators {
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
-
       TypeInformation<WindowedValue<OutputT>> typeInformation =
           context.getTypeInfo(context.getOutput(transform));
 
@@ -539,7 +536,7 @@ class FlinkBatchTransformTranslators {
 
       FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
           new FlinkDoFnFunction<>(
-              oldDoFn,
+              doFn,
               context.getOutput(transform).getWindowingStrategy(),
               sideInputStrategies,
               context.getPipelineOptions());
@@ -570,8 +567,6 @@ class FlinkBatchTransformTranslators {
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      final OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
-
       Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
 
       Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
@@ -618,7 +613,7 @@ class FlinkBatchTransformTranslators {
       @SuppressWarnings("unchecked")
       FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper =
           new FlinkMultiOutputDoFnFunction(
-              oldDoFn,
+              doFn,
               windowingStrategy,
               sideInputStrategies,
               context.getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 042f8df..42ef630 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -358,7 +358,7 @@ public class FlinkStreamingTransformTranslators {
       if (sideInputs.isEmpty()) {
         DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
             new DoFnOperator<>(
-                transform.getFn(),
+                transform.getNewFn(),
                 inputTypeInfo,
                 new TupleTag<OutputT>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
@@ -381,7 +381,7 @@ public class FlinkStreamingTransformTranslators {
 
         DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
             new DoFnOperator<>(
-                transform.getFn(),
+                transform.getNewFn(),
                 inputTypeInfo,
                 new TupleTag<OutputT>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
@@ -515,7 +515,7 @@ public class FlinkStreamingTransformTranslators {
       if (sideInputs.isEmpty()) {
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
             new DoFnOperator<>(
-                transform.getFn(),
+                transform.getNewFn(),
                 inputTypeInfo,
                 transform.getMainOutputTag(),
                 transform.getSideOutputTags().getAll(),
@@ -542,7 +542,7 @@ public class FlinkStreamingTransformTranslators {
 
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
             new DoFnOperator<>(
-                transform.getFn(),
+                transform.getNewFn(),
                 inputTypeInfo,
                 transform.getMainOutputTag(),
                 transform.getSideOutputTags().getAll(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index db045f5..ed200d5 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -20,7 +20,10 @@ package org.apache.beam.runners.flink.translation.functions;
 import java.util.Map;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -46,16 +49,17 @@ public class FlinkDoFnFunction<InputT, OutputT>
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   public FlinkDoFnFunction(
-      OldDoFn<InputT, OutputT> doFn,
+      DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions options) {
-    this.doFn = doFn;
+    this.doFn = DoFnAdapters.toOldDoFn(doFn);
     this.sideInputs = sideInputs;
     this.serializedOptions = new SerializedPipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
 
-    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
+    this.requiresWindowAccess =
+        DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow();
     this.hasSideInputs = !sideInputs.isEmpty();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index 7be4bb4..7f6a436 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -20,8 +20,11 @@ package org.apache.beam.runners.flink.translation.functions;
 import java.util.Map;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -54,16 +57,17 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   public FlinkMultiOutputDoFnFunction(
-      OldDoFn<InputT, OutputT> doFn,
+      DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions options,
       Map<TupleTag<?>, Integer> outputMap) {
-    this.doFn = doFn;
+    this.doFn = DoFnAdapters.toOldDoFn(doFn);
     this.serializedOptions = new SerializedPipelineOptions(options);
     this.outputMap = outputMap;
 
-    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
+    this.requiresWindowAccess =
+        DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow();
     this.hasSideInputs = !sideInputs.isEmpty();
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index a29664b..6729aaa 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -88,7 +89,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
       TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT> {
 
-  protected OldDoFn<InputT, FnOutputT> doFn;
+  protected OldDoFn<InputT, FnOutputT> oldDoFn;
+
   protected final SerializedPipelineOptions serializedOptions;
 
   protected final TupleTag<FnOutputT> mainOutputTag;
@@ -117,8 +119,9 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> restoredSideInputState;
 
+  @Deprecated
   public DoFnOperator(
-      OldDoFn<InputT, FnOutputT> doFn,
+      OldDoFn<InputT, FnOutputT> oldDoFn,
       TypeInformation<WindowedValue<InputT>> inputType,
       TupleTag<FnOutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
@@ -127,7 +130,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       Map<Integer, PCollectionView<?>> sideInputTagMapping,
       Collection<PCollectionView<?>> sideInputs,
       PipelineOptions options) {
-    this.doFn = doFn;
+    this.oldDoFn = oldDoFn;
     this.mainOutputTag = mainOutputTag;
     this.sideOutputTags = sideOutputTags;
     this.sideInputTagMapping = sideInputTagMapping;
@@ -148,21 +151,43 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     setChainingStrategy(ChainingStrategy.ALWAYS);
   }
 
+  public DoFnOperator(
+      DoFn<InputT, FnOutputT> doFn,
+      TypeInformation<WindowedValue<InputT>> inputType,
+      TupleTag<FnOutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      OutputManagerFactory<OutputT> outputManagerFactory,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<Integer, PCollectionView<?>> sideInputTagMapping,
+      Collection<PCollectionView<?>> sideInputs,
+      PipelineOptions options) {
+    this(
+        DoFnAdapters.toOldDoFn(doFn),
+        inputType,
+        mainOutputTag,
+        sideOutputTags,
+        outputManagerFactory,
+        windowingStrategy,
+        sideInputTagMapping,
+        sideInputs,
+        options);
+  }
+
   protected ExecutionContext.StepContext createStepContext() {
     return new StepContext();
   }
 
   // allow overriding this in WindowDoFnOperator because this one dynamically creates
   // the DoFn
-  protected OldDoFn<InputT, FnOutputT> getDoFn() {
-    return doFn;
+  protected OldDoFn<InputT, FnOutputT> getOldDoFn() {
+    return oldDoFn;
   }
 
   @Override
   public void open() throws Exception {
     super.open();
 
-    this.doFn = getDoFn();
+    this.oldDoFn = getOldDoFn();
 
     currentInputWatermark = Long.MIN_VALUE;
     currentOutputWatermark = currentInputWatermark;
@@ -220,7 +245,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
     DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.createDefault(
         serializedOptions.getPipelineOptions(),
-        doFn,
+        oldDoFn,
         sideInputReader,
         outputManagerFactory.create(output),
         mainOutputTag,
@@ -232,13 +257,13 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     pushbackDoFnRunner =
         PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
 
-    doFn.setup();
+    oldDoFn.setup();
   }
 
   @Override
   public void close() throws Exception {
     super.close();
-    doFn.teardown();
+    oldDoFn.teardown();
   }
 
   protected final long getPushbackWatermarkHold() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index f2d7f1c..9cea529 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -106,7 +106,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
       PipelineOptions options,
       Coder<K> keyCoder) {
     super(
-        null,
+        (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) null,
         inputType,
         mainOutputTag,
         sideOutputTags,
@@ -124,7 +124,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   }
 
   @Override
-  protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
+  protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getOldDoFn() {
     StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
       @Override
       public StateInternals<K> stateInternalsForKey(K key) {
@@ -138,10 +138,10 @@ public class WindowDoFnOperator<K, InputT, OutputT>
     // has the window type as generic parameter while WindowingStrategy is almost always
     // untyped.
     @SuppressWarnings("unchecked")
-    OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
+    OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> oldDoFn =
         GroupAlsoByWindowViaWindowSetDoFn.create(
             windowingStrategy, stateInternalsFactory, (SystemReduceFn) systemReduceFn);
-    return doFn;
+    return oldDoFn;
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index e44a705..4c97cc7 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -99,7 +98,7 @@ public class PipelineOptionsTest {
   @Test(expected = Exception.class)
   public void parDoBaseClassPipelineOptionsNullTest() {
     DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
-        DoFnAdapters.toOldDoFn(new TestDoFn()),
+        new TestDoFn(),
         TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
         new TupleTag<>("main-output"),
         Collections.<TupleTag<?>>emptyList(),
@@ -118,7 +117,7 @@ public class PipelineOptionsTest {
   public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
 
     DoFnOperator<Object, Object, Object> doFnOperator = new DoFnOperator<>(
-        DoFnAdapters.toOldDoFn(new TestDoFn()),
+        new TestDoFn(),
         TypeInformation.of(new TypeHint<WindowedValue<Object>>() {}),
         new TupleTag<>("main-output"),
         Collections.<TupleTag<?>>emptyList(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8330bfa7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 65e244a..113802d 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -35,8 +35,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PCollectionViewTesting;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -97,7 +95,7 @@ public class DoFnOperatorTest {
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
-        DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()),
+        new IdentityDoFn<String>(),
         coderTypeInfo,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
@@ -141,7 +139,7 @@ public class DoFnOperatorTest {
         .build();
 
     DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
-        DoFnAdapters.toOldDoFn(new MultiOutputDoFn(sideOutput1, sideOutput2)),
+        new MultiOutputDoFn(sideOutput1, sideOutput2),
         coderTypeInfo,
         mainOutput,
         ImmutableList.<TupleTag<?>>of(sideOutput1, sideOutput2),
@@ -201,7 +199,7 @@ public class DoFnOperatorTest {
             .build();
 
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
-        DoFnAdapters.toOldDoFn(new IdentityDoFn<String>()),
+        new IdentityDoFn<String>(),
         coderTypeInfo,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),