You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/15 00:14:56 UTC

[1/4] incubator-beam git commit: Fix DoFnTester side inputs

Repository: incubator-beam
Updated Branches:
  refs/heads/master 79c26d9c1 -> cb0356932


Fix DoFnTester side inputs

The side inputs were being stored as iterables, but being returned as
the raw type.

Store the side input values directly instead.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c1af625
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c1af625
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c1af625

Branch: refs/heads/master
Commit: 1c1af62586db36212ebf76eb8307d1993666afa5
Parents: f0119b2
Author: Thomas Groh <tg...@google.com>
Authored: Thu Jul 14 10:33:22 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 14 17:13:10 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  | 70 ++++++++----------
 .../beam/sdk/transforms/DoFnTesterTest.java     | 74 ++++++++++++++++----
 2 files changed, 91 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c1af625/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 8cfb550..a638feb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -103,50 +103,35 @@ public class DoFnTester<InputT, OutputT> {
    * Registers the tuple of values of the side input {@link PCollectionView}s to
    * pass to the {@link DoFn} under test.
    *
-   * <p>If needed, first creates a fresh instance of the {@link DoFn}
-   * under test.
+   * <p>Resets the state of this {@link DoFnTester}.
    *
    * <p>If this isn't called, {@code DoFnTester} assumes the
    * {@link DoFn} takes no side inputs.
    */
-  public void setSideInputs(Map<PCollectionView<?>, Iterable<WindowedValue<?>>> sideInputs) {
+  public void setSideInputs(Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs) {
     this.sideInputs = sideInputs;
     resetState();
   }
 
   /**
-   * Registers the values of a side input {@link PCollectionView} to
-   * pass to the {@link DoFn} under test.
+   * Registers the values of a side input {@link PCollectionView} to pass to the {@link DoFn} under
+   * test.
    *
-   * <p>If needed, first creates a fresh instance of the {@code DoFn}
-   * under test.
+   * <p>The provided value is the final value of the side input in the specified window, not
+   * the value of the input PCollection in that window.
    *
-   * <p>If this isn't called, {@code DoFnTester} assumes the
-   * {@code DoFn} takes no side inputs.
+   * <p>If this isn't called, {@code DoFnTester} will return the default value for any side input
+   * that is used.
    */
-  public void setSideInput(PCollectionView<?> sideInput, Iterable<WindowedValue<?>> value) {
-    sideInputs.put(sideInput, value);
-  }
-
-  /**
-   * Registers the values for a side input {@link PCollectionView} to
-   * pass to the {@link DoFn} under test. All values are placed
-   * in the global window.
-   */
-  public void setSideInputInGlobalWindow(
-      PCollectionView<?> sideInput,
-      Iterable<?> value) {
-    sideInputs.put(
-        sideInput,
-        Iterables.transform(value, new Function<Object, WindowedValue<?>>() {
-          @Override
-          public WindowedValue<?> apply(Object input) {
-            return WindowedValue.valueInGlobalWindow(input);
-          }
-        }));
+  public <T> void setSideInput(PCollectionView<T> sideInput, BoundedWindow window, T value) {
+    Map<BoundedWindow, T> windowValues = (Map<BoundedWindow, T>) sideInputs.get(sideInput);
+    if (windowValues == null) {
+      windowValues = new HashMap<>();
+      sideInputs.put(sideInput, windowValues);
+    }
+    windowValues.put(window, value);
   }
 
-
   /**
    * Registers the list of {@code TupleTag}s that can be used by the
    * {@code DoFn} under test to output to side output
@@ -523,14 +508,14 @@ public class DoFnTester<InputT, OutputT> {
     private final TestContext<InT, OutT> context;
     private final TupleTag<OutT> mainOutputTag;
     private final WindowedValue<InT> element;
-    private final Map<PCollectionView<?>, ?> sideInputs;
+    private final Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs;
 
     private TestProcessContext(
         DoFn<InT, OutT> fn,
         TestContext<InT, OutT> context,
         WindowedValue<InT> element,
         TupleTag<OutT> mainOutputTag,
-        Map<PCollectionView<?>, ?> sideInputs) {
+        Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs) {
       fn.super();
       this.context = context;
       this.element = element;
@@ -545,9 +530,17 @@ public class DoFnTester<InputT, OutputT> {
 
     @Override
     public <T> T sideInput(PCollectionView<T> view) {
-      @SuppressWarnings("unchecked")
-      T sideInput = (T) sideInputs.get(view);
-      return sideInput;
+      Map<BoundedWindow, ?> viewValues = sideInputs.get(view);
+      if (viewValues != null) {
+        BoundedWindow sideInputWindow =
+            view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window());
+        @SuppressWarnings("unchecked")
+        T windowValue = (T) viewValues.get(sideInputWindow);
+        if (windowValue != null) {
+          return windowValue;
+        }
+      }
+      return view.fromIterableInternal(Collections.<WindowedValue<?>>emptyList());
     }
 
     @Override
@@ -668,7 +661,7 @@ public class DoFnTester<InputT, OutputT> {
   final DoFn<InputT, OutputT> origFn;
 
   /** The side input values to provide to the DoFn under test. */
-  private Map<PCollectionView<?>, Iterable<WindowedValue<?>>> sideInputs =
+  private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs =
       new HashMap<>();
 
   private Map<String, Object> accumulators;
@@ -703,11 +696,6 @@ public class DoFnTester<InputT, OutputT> {
         SerializableUtils.deserializeFromByteArray(
             SerializableUtils.serializeToByteArray(origFn),
             origFn.toString());
-    PTuple runnerSideInputs = PTuple.empty();
-    for (Map.Entry<PCollectionView<?>, Iterable<WindowedValue<?>>> entry
-        : sideInputs.entrySet()) {
-      runnerSideInputs = runnerSideInputs.and(entry.getKey().getTagInternal(), entry.getValue());
-    }
     outputs = new HashMap<>();
     accumulators = new HashMap<>();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c1af625/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index b391671..8460a7c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -24,8 +24,13 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 
 import org.hamcrest.Matchers;
@@ -150,19 +155,15 @@ public class DoFnTesterTest {
     tester.processElement(2L);
 
     List<TimestampedValue<String>> peek = tester.peekOutputElementsWithTimestamp();
-    TimestampedValue<String> one =
-        TimestampedValue.of("1", new Instant(1000L));
-    TimestampedValue<String> two =
-        TimestampedValue.of("2", new Instant(2000L));
+    TimestampedValue<String> one = TimestampedValue.of("1", new Instant(1000L));
+    TimestampedValue<String> two = TimestampedValue.of("2", new Instant(2000L));
     assertThat(peek, hasItems(one, two));
 
     tester.processElement(3L);
     tester.processElement(4L);
 
-    TimestampedValue<String> three =
-        TimestampedValue.of("3", new Instant(3000L));
-    TimestampedValue<String> four =
-        TimestampedValue.of("4", new Instant(4000L));
+    TimestampedValue<String> three = TimestampedValue.of("3", new Instant(3000L));
+    TimestampedValue<String> four = TimestampedValue.of("4", new Instant(4000L));
     peek = tester.peekOutputElementsWithTimestamp();
     assertThat(peek, hasItems(one, two, three, four));
     List<TimestampedValue<String>> take = tester.takeOutputElementsWithTimestamp();
@@ -219,14 +220,63 @@ public class DoFnTesterTest {
     tester.processElement(2L);
     tester.finishBundle();
 
-    assertThat(tester.peekOutputElementsInWindow(GlobalWindow.INSTANCE),
-        containsInAnyOrder(TimestampedValue.of("1", new Instant(1000L)),
+    assertThat(
+        tester.peekOutputElementsInWindow(GlobalWindow.INSTANCE),
+        containsInAnyOrder(
+            TimestampedValue.of("1", new Instant(1000L)),
             TimestampedValue.of("2", new Instant(2000L))));
-    assertThat(tester.peekOutputElementsInWindow(
-        new IntervalWindow(new Instant(0L), new Instant(10L))),
+    assertThat(
+        tester.peekOutputElementsInWindow(new IntervalWindow(new Instant(0L), new Instant(10L))),
         Matchers.<TimestampedValue<String>>emptyIterable());
   }
 
+  @Test
+  public void fnWithSideInputDefault() throws Exception {
+    final PCollectionView<Integer> value =
+        PCollectionViews.singletonView(
+            TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+    DoFn<Integer, Integer> fn = new SideInputDoFn(value);
+
+    DoFnTester<Integer, Integer> tester = DoFnTester.of(fn);
+
+    tester.processElement(1);
+    tester.processElement(2);
+    tester.processElement(4);
+    tester.processElement(8);
+    assertThat(tester.peekOutputElements(), containsInAnyOrder(0, 0, 0, 0));
+  }
+
+  @Test
+  public void fnWithSideInputExplicit() throws Exception {
+    final PCollectionView<Integer> value =
+        PCollectionViews.singletonView(
+            TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+    DoFn<Integer, Integer> fn = new SideInputDoFn(value);
+
+    DoFnTester<Integer, Integer> tester = DoFnTester.of(fn);
+    tester.setSideInput(value, GlobalWindow.INSTANCE, -2);
+    tester.processElement(16);
+    tester.processElement(32);
+    tester.processElement(64);
+    tester.processElement(128);
+    tester.finishBundle();
+
+    assertThat(tester.peekOutputElements(), containsInAnyOrder(-2, -2, -2, -2));
+  }
+
+  private static class SideInputDoFn extends DoFn<Integer, Integer> {
+    private final PCollectionView<Integer> value;
+
+    private SideInputDoFn(PCollectionView<Integer> value) {
+      this.value = value;
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.sideInput(value));
+    }
+  }
+
   /**
    * A DoFn that adds values to an aggregator and converts input to String in processElement.
    */


[3/4] incubator-beam git commit: Increase visibility Restrictions in DoFnTester

Posted by ke...@apache.org.
Increase visibility Restrictions in DoFnTester

No in-package representation refers to the fields of DoFnTester
directly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/60443524
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/60443524
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/60443524

Branch: refs/heads/master
Commit: 604435249750729b223d90f22300571ff9f4bcfc
Parents: b8cd573
Author: Thomas Groh <tg...@google.com>
Authored: Thu Jul 14 15:34:36 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 14 17:13:11 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/DoFnTester.java  | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60443524/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index f8479de..c38f0ab 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -639,10 +639,10 @@ public class DoFnTester<InputT, OutputT> {
     FINISHED
   }
 
-  final PipelineOptions options = PipelineOptionsFactory.create();
+  private final PipelineOptions options = PipelineOptionsFactory.create();
 
   /** The original DoFn under test. */
-  final DoFn<InputT, OutputT> origFn;
+  private final DoFn<InputT, OutputT> origFn;
 
   /** The side input values to provide to the DoFn under test. */
   private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs =
@@ -651,23 +651,23 @@ public class DoFnTester<InputT, OutputT> {
   private Map<String, Object> accumulators;
 
   /** The output tags used by the DoFn under test. */
-  TupleTag<OutputT> mainOutputTag = new TupleTag<>();
+  private TupleTag<OutputT> mainOutputTag = new TupleTag<>();
 
   /** The original DoFn under test, if started. */
   DoFn<InputT, OutputT> fn;
 
   /** The ListOutputManager to examine the outputs. */
-  Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
+  private Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
 
   /** The state of processing of the DoFn under test. */
-  State state;
+  private State state;
 
-  DoFnTester(DoFn<InputT, OutputT> origFn) {
+  private DoFnTester(DoFn<InputT, OutputT> origFn) {
     this.origFn = origFn;
     resetState();
   }
 
-  void resetState() {
+  private void resetState() {
     fn = null;
     outputs = null;
     accumulators = null;
@@ -675,7 +675,7 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   @SuppressWarnings("unchecked")
-  void initializeState() {
+  private void initializeState() {
     fn = (DoFn<InputT, OutputT>)
         SerializableUtils.deserializeFromByteArray(
             SerializableUtils.serializeToByteArray(origFn),


[4/4] incubator-beam git commit: This closes #659

Posted by ke...@apache.org.
This closes #659


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cb035693
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cb035693
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cb035693

Branch: refs/heads/master
Commit: cb03569326f29816d622329e2c3b1e4aad1751d1
Parents: 79c26d9 6044352
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 14 17:14:00 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 14 17:14:00 2016 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowRunnerTest.java    |  10 --
 .../apache/beam/sdk/transforms/DoFnTester.java  | 102 +++++++------------
 .../beam/sdk/transforms/DoFnTesterTest.java     |  74 +++++++++++---
 3 files changed, 99 insertions(+), 87 deletions(-)
----------------------------------------------------------------------



[2/4] incubator-beam git commit: Remove DoFnTester#setSideOutputTags

Posted by ke...@apache.org.
Remove DoFnTester#setSideOutputTags

Side Outputs are appended to the map of outputs on-demand.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b8cd5739
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b8cd5739
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b8cd5739

Branch: refs/heads/master
Commit: b8cd5739492b16c3481281074586f891f4554999
Parents: 1c1af62
Author: Thomas Groh <tg...@google.com>
Authored: Thu Jul 14 15:34:02 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 14 17:13:11 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunnerTest.java   | 10 ----------
 .../org/apache/beam/sdk/transforms/DoFnTester.java  | 16 ----------------
 2 files changed, 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b8cd5739/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index f3cbb38..fe288ad 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.dataflow;
 
 import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -84,7 +83,6 @@ import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
 
 import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.DataflowPackage;
@@ -1060,8 +1058,6 @@ public class DataflowRunnerTest {
             keyCoder,
             ismCoder,
             false /* unique keys */));
-    doFnTester.setSideOutputTags(TupleTagList.of(
-        ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
 
     IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
     IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
@@ -1162,8 +1158,6 @@ public class DataflowRunnerTest {
             keyCoder,
             ismCoder,
             true /* unique keys */));
-    doFnTester.setSideOutputTags(TupleTagList.of(
-        ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
 
     IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
 
@@ -1203,8 +1197,6 @@ public class DataflowRunnerTest {
                IsmRecord<WindowedValue<Long>>> doFnTester = DoFnTester.of(
         new BatchViewAsMultimap.ToIsmMetadataRecordForSizeDoFn<Long, Long, IntervalWindow>(
             windowCoder));
-    doFnTester.setSideOutputTags(TupleTagList.of(
-        ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
 
     IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
     IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));
@@ -1256,8 +1248,6 @@ public class DataflowRunnerTest {
                IsmRecord<WindowedValue<Long>>> doFnTester = DoFnTester.of(
         new BatchViewAsMultimap.ToIsmMetadataRecordForKeyDoFn<Long, Long, IntervalWindow>(
             keyCoder, windowCoder));
-    doFnTester.setSideOutputTags(TupleTagList.of(
-        ImmutableList.<TupleTag<?>>of(outputForSizeTag, outputForEntrySetTag)));
 
     IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10));
     IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b8cd5739/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index a638feb..f8479de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.PTuple;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -34,7 +33,6 @@ import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
 
 import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
@@ -133,20 +131,6 @@ public class DoFnTester<InputT, OutputT> {
   }
 
   /**
-   * Registers the list of {@code TupleTag}s that can be used by the
-   * {@code DoFn} under test to output to side output
-   * {@code PCollection}s.
-   *
-   * <p>If needed, first creates a fresh instance of the DoFn under test.
-   *
-   * <p>If this isn't called, {@code DoFnTester} assumes the
-   * {@code DoFn} doesn't emit to any side outputs.
-   */
-  public void setSideOutputTags(TupleTagList sideOutputTags) {
-    resetState();
-  }
-
-  /**
    * A convenience operation that first calls {@link #startBundle},
    * then calls {@link #processElement} on each of the input elements, then
    * calls {@link #finishBundle}, then returns the result of