You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/06 16:41:23 UTC

[33/50] [abbrv] incubator-beam git commit: Makes DoFnTester use new DoFn internally.

Makes DoFnTester use new DoFn internally.

There were 2 remaining users of DoFnTester.of(OldDoFn):
- SplittableParDo.ProcessElements: this is fixed in
  https://github.com/apache/incubator-beam/pull/1261
- GroupAlsoByWindowsProperties: this one is harder.
  Various GABWDoFn's use OldDoFn.windowingInternals,
  and we can't pass that through a new DoFn.
  So instead I removed usage of DoFnTester from
  GroupAlsoByWindowsProperties in favor of a tiny
  hand-coded solution.

So after #1261 DoFnTester.of(OldDoFn) can be deleted.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96455768
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96455768
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96455768

Branch: refs/heads/gearpump-runner
Commit: 96455768568616141a95833380f37c478a989397
Parents: e04cd47
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Nov 18 13:10:22 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 2 15:42:33 2016 -0800

----------------------------------------------------------------------
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   6 +-
 .../core/GroupByKeyViaGroupByKeyOnly.java       |  22 +-
 .../core/GroupAlsoByWindowsProperties.java      | 590 +++++++++++--------
 .../beam/sdk/transforms/DoFnAdapters.java       |   2 +
 .../apache/beam/sdk/transforms/DoFnTester.java  | 130 ++--
 .../sdk/transforms/reflect/DoFnInvokers.java    |  11 -
 .../beam/sdk/transforms/DoFnTesterTest.java     |   4 +-
 7 files changed, 394 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index f8f6207..b4b366c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Iterables;
 import java.util.List;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -30,7 +29,6 @@ import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.util.state.TimerCallback;
-import org.apache.beam.sdk.values.KV;
 import org.joda.time.Instant;
 
 /**
@@ -55,9 +53,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
   }
 
   @Override
-  public void processElement(
-      OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c)
-          throws Exception {
+  public void processElement(ProcessContext c) throws Exception {
     K key = c.element().getKey();
     // Used with Batch, we know that all the data is available for this key. We can't use the
     // timer manager from the context because it doesn't exist. So we create one and emulate the

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
index 79d2252..43047ca 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -26,15 +26,13 @@ import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -135,10 +133,9 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
       return input
           .apply(
               ParDo.of(
-                  new OldDoFn<
-                      KV<K, Iterable<WindowedValue<V>>>,
-                      KV<K, Iterable<WindowedValue<V>>>>() {
-                    @Override
+                  new DoFn<KV<K, Iterable<WindowedValue<V>>>,
+                           KV<K, Iterable<WindowedValue<V>>>>() {
+                    @ProcessElement
                     public void processElement(ProcessContext c) {
                       KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
                       K key = kvs.getKey();
@@ -251,16 +248,5 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
           input.getPipeline(), windowingStrategy, input.isBounded())
           .setCoder(outputKvCoder);
     }
-
-    private <W extends BoundedWindow>
-        GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
-            WindowingStrategy<?, W> strategy,
-            StateInternalsFactory<K> stateInternalsFactory,
-            Coder<V> inputIterableElementValueCoder) {
-      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
-          strategy,
-          stateInternalsFactory,
-          SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index d1e0c68..97b67c6 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -23,50 +23,60 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
 
+import com.google.common.base.Predicate;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /**
  * Properties of {@link GroupAlsoByWindowsDoFn}.
  *
- * <p>Some properties may not hold of some implementations, due to restrictions on the context
- * in which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not
+ * <p>Some properties may not hold of some implementations, due to restrictions on the context in
+ * which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not
  * support merging windows.
  */
 public class GroupAlsoByWindowsProperties {
 
   /**
-   * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide
-   * the appropriate windowing strategy under test.
+   * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide the
+   * appropriate windowing strategy under test.
    */
   public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> {
-    <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>
-    forStrategy(WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
+    <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> forStrategy(
+        WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
   }
 
   /**
@@ -76,8 +86,7 @@ public class GroupAlsoByWindowsProperties {
    * <p>The input type is deliberately left as a wildcard, since it is not relevant.
    */
   public static <K, InputT, OutputT> void emptyInputEmptyOutput(
-      GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory)
-          throws Exception {
+      GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory) throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
@@ -87,13 +96,14 @@ public class GroupAlsoByWindowsProperties {
     @SuppressWarnings("unchecked")
     K fakeKey = (K) "this key should never be used";
 
-    DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> result = runGABW(
-        gabwFactory,
-        windowingStrategy,
-        fakeKey,
-        Collections.<WindowedValue<InputT>>emptyList());
+    List<WindowedValue<KV<K, OutputT>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            fakeKey,
+            Collections.<WindowedValue<InputT>>emptyList());
 
-    assertThat(result.peekOutputElements(), hasSize(0));
+    assertThat(result, hasSize(0));
   }
 
   /**
@@ -102,38 +112,32 @@ public class GroupAlsoByWindowsProperties {
    */
   public static void groupsElementsIntoFixedWindows(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
+      throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
 
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
+    List<WindowedValue<KV<String, Iterable<String>>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            "key",
             WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
+                "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v2",
-                new Instant(2),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
+                "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v3",
-                new Instant(13),
-                Arrays.asList(window(10, 20)),
-                PaneInfo.NO_FIRING));
+                "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING));
 
-    assertThat(result.peekOutputElements(), hasSize(2));
+    assertThat(result, hasSize(2));
 
     TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
+        getOnlyElementInWindow(result, window(0, 10));
     assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
     assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
 
     TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
+        getOnlyElementInWindow(result, window(10, 20));
     assertThat(item1.getValue().getValue(), contains("v3"));
     assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
   }
@@ -146,14 +150,17 @@ public class GroupAlsoByWindowsProperties {
    */
   public static void groupsElementsIntoSlidingWindowsWithMinTimestamp(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
+      throws Exception {
 
-    WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(
-        SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
+    WindowingStrategy<?, IntervalWindow> windowingStrategy =
+        WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
 
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
+    List<WindowedValue<KV<String, Iterable<String>>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            "key",
             WindowedValue.of(
                 "v1",
                 new Instant(5),
@@ -165,21 +172,21 @@ public class GroupAlsoByWindowsProperties {
                 Arrays.asList(window(0, 20), window(10, 30)),
                 PaneInfo.NO_FIRING));
 
-    assertThat(result.peekOutputElements(), hasSize(3));
+    assertThat(result, hasSize(3));
 
     TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10)));
+        getOnlyElementInWindow(result, window(-10, 10));
     assertThat(item0.getValue().getValue(), contains("v1"));
     assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
 
     TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20)));
+        getOnlyElementInWindow(result, window(0, 20));
     assertThat(item1.getValue().getValue(), containsInAnyOrder("v1", "v2"));
     // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
     assertThat(item1.getTimestamp(), equalTo(new Instant(10)));
 
     TimestampedValue<KV<String, Iterable<String>>> item2 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30)));
+        getOnlyElementInWindow(result, window(10, 30));
     assertThat(item2.getValue().getValue(), contains("v2"));
     // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
     assertThat(item2.getTimestamp(), equalTo(new Instant(20)));
@@ -194,14 +201,17 @@ public class GroupAlsoByWindowsProperties {
   public static void combinesElementsInSlidingWindows(
       GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
       CombineFn<Long, ?, Long> combineFn)
-          throws Exception {
+      throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
             .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
 
-    DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
+    List<WindowedValue<KV<String, Long>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            "k",
             WindowedValue.of(
                 1L,
                 new Instant(5),
@@ -218,23 +228,20 @@ public class GroupAlsoByWindowsProperties {
                 Arrays.asList(window(0, 20), window(10, 30)),
                 PaneInfo.NO_FIRING));
 
-    assertThat(result.peekOutputElements(), hasSize(3));
+    assertThat(result, hasSize(3));
 
-    TimestampedValue<KV<String, Long>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10)));
+    TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, window(-10, 10));
     assertThat(item0.getValue().getKey(), equalTo("k"));
     assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L))));
     assertThat(item0.getTimestamp(), equalTo(new Instant(5L)));
 
-    TimestampedValue<KV<String, Long>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20)));
+    TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, window(0, 20));
     assertThat(item1.getValue().getKey(), equalTo("k"));
     assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L))));
     // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
     assertThat(item1.getTimestamp(), equalTo(new Instant(10L)));
 
-    TimestampedValue<KV<String, Long>> item2 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30)));
+    TimestampedValue<KV<String, Long>> item2 = getOnlyElementInWindow(result, window(10, 30));
     assertThat(item2.getValue().getKey(), equalTo("k"));
     assertThat(item2.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(2L, 4L))));
     // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
@@ -247,79 +254,63 @@ public class GroupAlsoByWindowsProperties {
    */
   public static void groupsIntoOverlappingNonmergingWindows(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
+      throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
 
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 5)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(4),
-                Arrays.asList(window(1, 5)),
-                PaneInfo.NO_FIRING),
+    List<WindowedValue<KV<String, Iterable<String>>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            "key",
+            WindowedValue.of("v1", new Instant(1), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING),
+            WindowedValue.of("v2", new Instant(4), Arrays.asList(window(1, 5)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v3",
-                new Instant(4),
-                Arrays.asList(window(0, 5)),
-                PaneInfo.NO_FIRING));
+                "v3", new Instant(4), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING));
 
-    assertThat(result.peekOutputElements(), hasSize(2));
+    assertThat(result, hasSize(2));
 
     TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 5)));
+        getOnlyElementInWindow(result, window(0, 5));
     assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3"));
     assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp()));
 
     TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(1, 5)));
+        getOnlyElementInWindow(result, window(1, 5));
     assertThat(item1.getValue().getValue(), contains("v2"));
     assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp()));
   }
 
-  /**
-   * Tests that the given GABW implementation correctly groups elements into merged sessions.
-   */
+  /** Tests that the given GABW implementation correctly groups elements into merged sessions. */
   public static void groupsElementsInMergedSessions(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
+      throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
 
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
+    List<WindowedValue<KV<String, Iterable<String>>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            "key",
             WindowedValue.of(
-                "v1",
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
+                "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v2",
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
+                "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v3",
-                new Instant(15),
-                Arrays.asList(window(15, 25)),
-                PaneInfo.NO_FIRING));
+                "v3", new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING));
 
-    assertThat(result.peekOutputElements(), hasSize(2));
+    assertThat(result, hasSize(2));
 
     TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
+        getOnlyElementInWindow(result, window(0, 15));
     assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
     assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
 
     TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
+        getOnlyElementInWindow(result, window(15, 25));
     assertThat(item1.getValue().getValue(), contains("v3"));
     assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
   }
@@ -331,39 +322,29 @@ public class GroupAlsoByWindowsProperties {
   public static void combinesElementsPerSession(
       GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
       CombineFn<Long, ?, Long> combineFn)
-          throws Exception {
+      throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
 
-    DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                1L,
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
+    List<WindowedValue<KV<String, Long>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            "k",
+            WindowedValue.of(1L, new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
+            WindowedValue.of(2L, new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                2L,
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                4L,
-                new Instant(15),
-                Arrays.asList(window(15, 25)),
-                PaneInfo.NO_FIRING));
+                4L, new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING));
 
-    assertThat(result.peekOutputElements(), hasSize(2));
+    assertThat(result, hasSize(2));
 
-    TimestampedValue<KV<String, Long>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
+    TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, window(0, 15));
     assertThat(item0.getValue().getKey(), equalTo("k"));
     assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
     assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
 
-    TimestampedValue<KV<String, Long>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
+    TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, window(15, 25));
     assertThat(item1.getValue().getKey(), equalTo("k"));
     assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
     assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
@@ -371,176 +352,152 @@ public class GroupAlsoByWindowsProperties {
 
   /**
    * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows and also sets the output timestamp
-   * according to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
+   * correctly groups them according to fixed windows and also sets the output timestamp according
+   * to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
    */
   public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
+      throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
 
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
+    List<WindowedValue<KV<String, Iterable<String>>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            "key",
             WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
+                "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v2",
-                new Instant(2),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
+                "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v3",
-                new Instant(13),
-                Arrays.asList(window(10, 20)),
-                PaneInfo.NO_FIRING));
+                "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING));
 
-    assertThat(result.peekOutputElements(), hasSize(2));
+    assertThat(result, hasSize(2));
 
     TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
+        getOnlyElementInWindow(result, window(0, 10));
     assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
     assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
 
     TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
+        getOnlyElementInWindow(result, window(10, 20));
     assertThat(item1.getValue().getValue(), contains("v3"));
     assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
   }
 
   /**
    * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows and also sets the output timestamp
-   * according to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
+   * correctly groups them according to fixed windows and also sets the output timestamp according
+   * to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
    */
   public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
+      throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
+            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
 
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
+    List<WindowedValue<KV<String, Iterable<String>>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            "k",
             WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
+                "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v2",
-                new Instant(2),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
+                "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v3",
-                new Instant(13),
-                Arrays.asList(window(10, 20)),
-                PaneInfo.NO_FIRING));
+                "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING));
 
-    assertThat(result.peekOutputElements(), hasSize(2));
+    assertThat(result, hasSize(2));
 
     TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
+        getOnlyElementInWindow(result, window(0, 10));
     assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
     assertThat(item0.getTimestamp(), equalTo(new Instant(2)));
 
     TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
+        getOnlyElementInWindow(result, window(10, 20));
     assertThat(item1.getValue().getValue(), contains("v3"));
     assertThat(item1.getTimestamp(), equalTo(new Instant(13)));
   }
 
   /**
-   * Tests that the given GABW implementation correctly groups elements into merged sessions
-   * with output timestamps at the end of the merged window.
+   * Tests that the given GABW implementation correctly groups elements into merged sessions with
+   * output timestamps at the end of the merged window.
    */
   public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
+      throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
             .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
 
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
+    List<WindowedValue<KV<String, Iterable<String>>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            "k",
             WindowedValue.of(
-                "v1",
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
+                "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v2",
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
+                "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v3",
-                new Instant(15),
-                Arrays.asList(window(15, 25)),
-                PaneInfo.NO_FIRING));
+                "v3", new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING));
 
-    assertThat(result.peekOutputElements(), hasSize(2));
+    assertThat(result, hasSize(2));
 
     TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
+        getOnlyElementInWindow(result, window(0, 15));
     assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
     assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
 
     TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
+        getOnlyElementInWindow(result, window(15, 25));
     assertThat(item1.getValue().getValue(), contains("v3"));
     assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
   }
 
   /**
-   * Tests that the given GABW implementation correctly groups elements into merged sessions
-   * with output timestamps at the end of the merged window.
+   * Tests that the given GABW implementation correctly groups elements into merged sessions with
+   * output timestamps at the end of the merged window.
    */
   public static void groupsElementsInMergedSessionsWithLatestTimestamp(
       GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
+      throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
             .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
 
     BoundedWindow unmergedWindow = window(15, 25);
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
+    List<WindowedValue<KV<String, Iterable<String>>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            "k",
             WindowedValue.of(
-                "v1",
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
+                "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v2",
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
+                "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
             WindowedValue.of(
-                "v3",
-                new Instant(15),
-                Arrays.asList(unmergedWindow),
-                PaneInfo.NO_FIRING));
+                "v3", new Instant(15), Arrays.asList(unmergedWindow), PaneInfo.NO_FIRING));
 
-    assertThat(result.peekOutputElements(), hasSize(2));
+    assertThat(result, hasSize(2));
 
     BoundedWindow mergedWindow = window(0, 15);
     TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(mergedWindow));
+        getOnlyElementInWindow(result, mergedWindow);
     assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
     assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
 
     TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(unmergedWindow));
+        getOnlyElementInWindow(result, unmergedWindow);
     assertThat(item1.getValue().getValue(), contains("v3"));
     assertThat(item1.getTimestamp(), equalTo(new Instant(15)));
   }
@@ -552,81 +509,66 @@ public class GroupAlsoByWindowsProperties {
   public static void combinesElementsPerSessionWithEndOfWindowTimestamp(
       GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
       CombineFn<Long, ?, Long> combineFn)
-          throws Exception {
+      throws Exception {
 
     WindowingStrategy<?, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
+            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
 
     BoundedWindow secondWindow = window(15, 25);
-    DoFnTester<?, KV<String, Long>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                1L,
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                2L,
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                4L,
-                new Instant(15),
-                Arrays.asList(secondWindow),
-                PaneInfo.NO_FIRING));
+    List<WindowedValue<KV<String, Long>>> result =
+        runGABW(
+            gabwFactory,
+            windowingStrategy,
+            "k",
+            WindowedValue.of(1L, new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING),
+            WindowedValue.of(2L, new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING),
+            WindowedValue.of(4L, new Instant(15), Arrays.asList(secondWindow), PaneInfo.NO_FIRING));
 
-    assertThat(result.peekOutputElements(), hasSize(2));
+    assertThat(result, hasSize(2));
 
     BoundedWindow firstResultWindow = window(0, 15);
-    TimestampedValue<KV<String, Long>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(firstResultWindow));
+    TimestampedValue<KV<String, Long>> item0 = getOnlyElementInWindow(result, firstResultWindow);
     assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
     assertThat(item0.getTimestamp(), equalTo(firstResultWindow.maxTimestamp()));
 
-    TimestampedValue<KV<String, Long>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(secondWindow));
+    TimestampedValue<KV<String, Long>> item1 = getOnlyElementInWindow(result, secondWindow);
     assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
-    assertThat(item1.getTimestamp(),
-        equalTo(secondWindow.maxTimestamp()));
+    assertThat(item1.getTimestamp(), equalTo(secondWindow.maxTimestamp()));
   }
 
   @SafeVarargs
   private static <K, InputT, OutputT, W extends BoundedWindow>
-  DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW(
-      GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
-      WindowingStrategy<?, W> windowingStrategy,
-      K key,
-      WindowedValue<InputT>... values) throws Exception {
+      List<WindowedValue<KV<K, OutputT>>> runGABW(
+          GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
+          WindowingStrategy<?, W> windowingStrategy,
+          K key,
+          WindowedValue<InputT>... values)
+          throws Exception {
     return runGABW(gabwFactory, windowingStrategy, key, Arrays.asList(values));
   }
 
   private static <K, InputT, OutputT, W extends BoundedWindow>
-  DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW(
-      GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
-      WindowingStrategy<?, W> windowingStrategy,
-      K key,
-      Collection<WindowedValue<InputT>> values) throws Exception {
+      List<WindowedValue<KV<K, OutputT>>> runGABW(
+          GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
+          WindowingStrategy<?, W> windowingStrategy,
+          K key,
+          Collection<WindowedValue<InputT>> values)
+          throws Exception {
 
     final StateInternalsFactory<K> stateInternalsCache = new CachingStateInternalsFactory<K>();
 
-    DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> tester =
-        DoFnTester.of(gabwFactory.forStrategy(windowingStrategy, stateInternalsCache));
-
-    // Though we use a DoFnTester, the function itself is instantiated directly by the
-    // runner and should not be serialized; it may not even be serializable.
-    tester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    tester.startBundle();
-    tester.processElement(KV.<K, Iterable<WindowedValue<InputT>>>of(key, values));
-    tester.finishBundle();
+    List<WindowedValue<KV<K, OutputT>>> output =
+        processElement(
+            gabwFactory.forStrategy(windowingStrategy, stateInternalsCache),
+            KV.<K, Iterable<WindowedValue<InputT>>>of(key, values));
 
     // Sanity check for corruption
-    for (KV<K, OutputT> elem : tester.peekOutputElements()) {
-      assertThat(elem.getKey(), equalTo(key));
+    for (WindowedValue<KV<K, OutputT>> value : output) {
+      assertThat(value.getValue().getKey(), equalTo(key));
     }
 
-    return tester;
+    return output;
   }
 
   private static BoundedWindow window(long start, long end) {
@@ -657,4 +599,158 @@ public class GroupAlsoByWindowsProperties {
       return InMemoryStateInternals.forKey(key);
     }
   }
+
+  private static <K, InputT, OutputT, W extends BoundedWindow>
+      List<WindowedValue<KV<K, OutputT>>> processElement(
+          GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
+          KV<K, Iterable<WindowedValue<InputT>>> element)
+          throws Exception {
+    TestProcessContext<K, InputT, OutputT, W> c = new TestProcessContext<>(fn, element);
+    fn.processElement(c);
+    return c.getOutput();
+  }
+
+  private static <K, OutputT> TimestampedValue<KV<K, OutputT>> getOnlyElementInWindow(
+      List<WindowedValue<KV<K, OutputT>>> output, final BoundedWindow window) {
+    WindowedValue<KV<K, OutputT>> res =
+        Iterables.getOnlyElement(
+            Iterables.filter(
+                output,
+                new Predicate<WindowedValue<KV<K, OutputT>>>() {
+                  @Override
+                  public boolean apply(@Nullable WindowedValue<KV<K, OutputT>> input) {
+                    return input.getWindows().contains(window);
+                  }
+                }));
+    return TimestampedValue.of(res.getValue(), res.getTimestamp());
+  }
+
+  /**
+   * A {@link GroupAlsoByWindowsDoFn.ProcessContext} providing just enough context for a {@link
+   * GroupAlsoByWindowsDoFn} - namely, information about the element and output via {@link
+   * WindowingInternals}, but no side inputs/outputs and no normal output.
+   */
+  private static class TestProcessContext<K, InputT, OutputT, W extends BoundedWindow>
+      extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>.ProcessContext {
+    private final PipelineOptions options = PipelineOptionsFactory.create();
+    private final KV<K, Iterable<WindowedValue<InputT>>> element;
+    private final List<WindowedValue<KV<K, OutputT>>> output = new ArrayList<>();
+
+    private TestProcessContext(
+        GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn,
+        KV<K, Iterable<WindowedValue<InputT>>> element) {
+      fn.super();
+      this.element = element;
+    }
+
+    @Override
+    public KV<K, Iterable<WindowedValue<InputT>>> element() {
+      return element;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return GlobalWindow.INSTANCE;
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return PaneInfo.NO_FIRING;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public WindowingInternals<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>
+        windowingInternals() {
+      return new WindowingInternals<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>() {
+        @Override
+        public void outputWindowedValue(
+            KV<K, OutputT> output,
+            Instant timestamp,
+            Collection<? extends BoundedWindow> windows,
+            PaneInfo pane) {
+          TestProcessContext.this.output.add(WindowedValue.of(output, timestamp, windows, pane));
+        }
+
+        @Override
+        public <SideOutputT> void sideOutputWindowedValue(
+            TupleTag<SideOutputT> tag,
+            SideOutputT output,
+            Instant timestamp,
+            Collection<? extends BoundedWindow> windows,
+            PaneInfo pane) {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public StateInternals<?> stateInternals() {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public TimerInternals timerInternals() {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          return ImmutableList.of(GlobalWindow.INSTANCE);
+        }
+
+        @Override
+        public PaneInfo pane() {
+          return PaneInfo.NO_FIRING;
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
+          throw new UnsupportedOperationException();
+        }
+      };
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    @Override
+    public void output(KV<K, OutputT> output) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void outputWithTimestamp(KV<K, OutputT> output, Instant timestamp) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      throw new UnsupportedOperationException();
+    }
+
+    public List<WindowedValue<KV<K, OutputT>>> getOutput() {
+      return output;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 1a74ae7..6ee42e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
 
 import java.io.IOException;
 import java.util.Collection;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
@@ -185,6 +186,7 @@ public class DoFnAdapters {
    * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise,
    * returns {@code null}.
    */
+  @Nullable
   public static <InputT, OutputT> DoFn<InputT, OutputT> getDoFn(OldDoFn<InputT, OutputT> fn) {
     if (fn instanceof SimpleDoFnAdapter) {
       return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 17fa612..a9f93dd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -23,10 +23,10 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +36,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ValueInSingleWindow;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -43,7 +45,6 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
@@ -86,7 +87,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
    */
   @SuppressWarnings("unchecked")
   public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
-    return new DoFnTester<>(DoFnAdapters.toOldDoFn(fn));
+    checkNotNull(fn, "fn can't be null");
+    return new DoFnTester<>(fn);
   }
 
   /**
@@ -96,9 +98,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
    * @see #of(DoFn)
    */
   @SuppressWarnings("unchecked")
-   public static <InputT, OutputT> DoFnTester<InputT, OutputT>
+  @Deprecated
+  public static <InputT, OutputT> DoFnTester<InputT, OutputT>
       of(OldDoFn<InputT, OutputT> fn) {
-    return new DoFnTester<>(fn);
+    checkNotNull(fn, "fn can't be null");
+    return new DoFnTester<>(fn.toDoFn());
   }
 
   /**
@@ -238,7 +242,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     stateInternals = InMemoryStateInternals.forKey(new Object());
     timerInternals = new InMemoryTimerInternals();
     try {
-      fn.startBundle(context);
+      fnInvoker.invokeStartBundle(context);
     } catch (UserCodeException e) {
       unwrapUserCodeException(e);
     }
@@ -271,8 +275,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   }
 
   /**
-   * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a
-   * context where {@link OldDoFn.ProcessContext#element} returns the
+   * Calls {@link DoFn.ProcessElement} on the {@code DoFn} under test, in a
+   * context where {@link DoFn.ProcessContext#element} returns the
    * given element and timestamp.
    *
    * <p>Will call {@link #startBundle} automatically, if it hasn't
@@ -286,7 +290,13 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       startBundle();
     }
     try {
-      fn.processElement(createProcessContext(element));
+      final TestProcessContext processContext = createProcessContext(element);
+      fnInvoker.invokeProcessElement(new DoFnInvoker.FakeArgumentProvider<InputT, OutputT>() {
+        @Override
+        public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+          return processContext;
+        }
+      });
     } catch (UserCodeException e) {
       unwrapUserCodeException(e);
     }
@@ -308,13 +318,14 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
         "Must be inside bundle to call finishBundle, but was: %s",
         state);
     try {
-      fn.finishBundle(createContext(fn));
+      fnInvoker.invokeFinishBundle(createContext(fn));
     } catch (UserCodeException e) {
       unwrapUserCodeException(e);
     }
     if (cloningBehavior == CloningBehavior.CLONE_PER_BUNDLE) {
-      fn.teardown();
+      fnInvoker.invokeTeardown();
       fn = null;
+      fnInvoker = null;
       state = State.UNINITIALIZED;
     } else {
       state = State.BUNDLE_FINISHED;
@@ -532,11 +543,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     return mainOutputTag;
   }
 
-  private TestContext createContext(OldDoFn<InputT, OutputT> fn) {
+  private TestContext createContext(DoFn<InputT, OutputT> fn) {
     return new TestContext();
   }
 
-  private class TestContext extends OldDoFn<InputT, OutputT>.Context {
+  private class TestContext extends DoFn<InputT, OutputT>.Context {
     TestContext() {
       fn.super();
     }
@@ -557,7 +568,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
 
     @Override
-    protected <AggInT, AggOutT> Aggregator<AggInT, AggOutT> createAggregatorInternal(
+    protected <AggInT, AggOutT> Aggregator<AggInT, AggOutT> createAggregator(
         final String name, final CombineFn<AggInT, ?, AggOutT> combiner) {
       return aggregator(name, combiner);
     }
@@ -624,7 +635,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
             elem.getValue(), elem.getTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
   }
 
-  private class TestProcessContext extends OldDoFn<InputT, OutputT>.ProcessContext {
+  private class TestProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
     private final TestContext context;
     private final ValueInSingleWindow<InputT> element;
 
@@ -644,7 +655,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       Map<BoundedWindow, ?> viewValues = sideInputs.get(view);
       if (viewValues != null) {
         BoundedWindow sideInputWindow =
-            view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window());
+            view.getWindowingStrategyInternal()
+                .getWindowFn()
+                .getSideInputWindow(element.getWindow());
         @SuppressWarnings("unchecked")
         T windowValue = (T) viewValues.get(sideInputWindow);
         if (windowValue != null) {
@@ -660,73 +673,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
 
     @Override
-    public BoundedWindow window() {
-      return element.getWindow();
-    }
-
-    @Override
     public PaneInfo pane() {
       return element.getPane();
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return new WindowingInternals<InputT, OutputT>() {
-        @Override
-        public StateInternals<?> stateInternals() {
-          return stateInternals;
-        }
-
-        @Override
-        public void outputWindowedValue(
-            OutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          for (BoundedWindow window : windows) {
-            context.noteOutput(
-                mainOutputTag, ValueInSingleWindow.of(output, timestamp, window, pane));
-          }
-        }
-
-        @Override
-        public <SideOutputT> void sideOutputWindowedValue(
-            TupleTag<SideOutputT> tag,
-            SideOutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          for (BoundedWindow window : windows) {
-            context.noteOutput(
-                tag, ValueInSingleWindow.of(output, timestamp, window, pane));
-          }
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return timerInternals;
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return Collections.singleton(element.getWindow());
-        }
-
-        @Override
-        public PaneInfo pane() {
-          return element.getPane();
-        }
-
-        @Override
-        public <T> T sideInput(
-            PCollectionView<T> view, BoundedWindow sideInputWindow) {
-          throw new UnsupportedOperationException(
-              "SideInput from WindowingInternals is not supported in in the context of DoFnTester");
-        }
-      };
-    }
-
-    @Override
     public PipelineOptions getPipelineOptions() {
       return context.getPipelineOptions();
     }
@@ -753,10 +704,10 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
 
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
         String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new IllegalStateException("Aggregators should not be created within ProcessContext. "
-          + "Instead, create an aggregator at OldDoFn construction time with"
+          + "Instead, create an aggregator at DoFn construction time with"
           + " createAggregator, and ensure they are set up by the time startBundle is"
           + " called with setupDelegateAggregators.");
     }
@@ -768,8 +719,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       finishBundle();
     }
     if (state == State.BUNDLE_FINISHED) {
-      fn.teardown();
+      fnInvoker.invokeTeardown();
       fn = null;
+      fnInvoker = null;
     }
     state = State.TORN_DOWN;
   }
@@ -786,8 +738,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
 
   private final PipelineOptions options = PipelineOptionsFactory.create();
 
-  /** The original {@link OldDoFn} under test. */
-  private final OldDoFn<InputT, OutputT> origFn;
+  /** The original {@link DoFn} under test. */
+  private final DoFn<InputT, OutputT> origFn;
 
   /**
    * Whether to clone the original {@link DoFn} or just use it as-is.
@@ -805,8 +757,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   /** The output tags used by the {@link DoFn} under test. */
   private TupleTag<OutputT> mainOutputTag = new TupleTag<>();
 
-  /** The original OldDoFn under test, if started. */
-  OldDoFn<InputT, OutputT> fn;
+  /** The original DoFn under test, if started. */
+  private DoFn<InputT, OutputT> fn;
+  private DoFnInvoker<InputT, OutputT> fnInvoker;
 
   /** The outputs from the {@link DoFn} under test. */
   private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
@@ -817,7 +770,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   /** The state of processing of the {@link DoFn} under test. */
   private State state = State.UNINITIALIZED;
 
-  private DoFnTester(OldDoFn<InputT, OutputT> origFn) {
+  private DoFnTester(DoFn<InputT, OutputT> origFn) {
     this.origFn = origFn;
   }
 
@@ -828,12 +781,13 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     if (cloningBehavior.equals(CloningBehavior.DO_NOT_CLONE)) {
       fn = origFn;
     } else {
-      fn = (OldDoFn<InputT, OutputT>)
+      fn = (DoFn<InputT, OutputT>)
           SerializableUtils.deserializeFromByteArray(
               SerializableUtils.serializeToByteArray(origFn),
               origFn.toString());
     }
-    fn.setup();
+    fnInvoker = DoFnInvokers.invokerFor(fn);
+    fnInvoker.invokeSetup();
     outputs = new HashMap<>();
     accumulators = new HashMap<>();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 4ad7dad..50a7082 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -18,9 +18,6 @@
 package org.apache.beam.sdk.transforms.reflect;
 
 import java.io.Serializable;
-import java.lang.reflect.Constructor;
-import java.util.LinkedHashMap;
-import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -45,14 +42,6 @@ public class DoFnInvokers {
     return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn);
   }
 
-  /**
-   * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class.
-   * Needed because generating an invoker class is expensive, and to avoid generating an excessive
-   * number of classes consuming PermGen memory.
-   */
-  private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache =
-      new LinkedHashMap<>();
-
   private DoFnInvokers() {}
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96455768/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index ac76b2e..ff8a9bc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -350,14 +350,14 @@ public class DoFnTesterTest {
     }
   }
 
-  private static class SideInputDoFn extends OldDoFn<Integer, Integer> {
+  private static class SideInputDoFn extends DoFn<Integer, Integer> {
     private final PCollectionView<Integer> value;
 
     private SideInputDoFn(PCollectionView<Integer> value) {
       this.value = value;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       c.output(c.sideInput(value));
     }