You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/02 17:43:51 UTC

[04/11] incubator-beam git commit: Put classes in runners-core package into runners.core namespace

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
new file mode 100644
index 0000000..45062fb
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.ExecutableTrigger;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Test utility that runs a {@link ReduceFn}, {@link WindowFn}, {@link Trigger} using in-memory stub
+ * implementations to provide the {@link TimerInternals} and {@link WindowingInternals} needed to
+ * run {@code Trigger}s and {@code ReduceFn}s.
+ *
+ * @param <InputT> The element types.
+ * @param <OutputT> The final type for elements in the window (for instance,
+ *     {@code Iterable<InputT>})
+ * @param <W> The type of windows being used.
+ */
+public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
+  private static final String KEY = "TEST_KEY";
+
+  private final TestInMemoryStateInternals<String> stateInternals =
+      new TestInMemoryStateInternals<>(KEY);
+  private final TestTimerInternals timerInternals = new TestTimerInternals();
+
+  private final WindowFn<Object, W> windowFn;
+  private final TestWindowingInternals windowingInternals;
+  private final Coder<OutputT> outputCoder;
+  private final WindowingStrategy<Object, W> objectStrategy;
+  private final ReduceFn<String, InputT, OutputT, W> reduceFn;
+  private final PipelineOptions options;
+
+  /**
+   * If true, the output watermark is automatically advanced to the latest possible
+   * point when the input watermark is advanced. This is the default for most tests.
+   * If false, the output watermark must be explicitly advanced by the test, which can
+   * be used to exercise some of the more subtle behavior of WatermarkHold.
+   */
+  private boolean autoAdvanceOutputWatermark;
+
+  private ExecutableTrigger executableTrigger;
+
+  private final InMemoryLongSumAggregator droppedDueToClosedWindow =
+      new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+
+  public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W>
+      nonCombining(WindowingStrategy<?, W> windowingStrategy) throws Exception {
+    return new ReduceFnTester<Integer, Iterable<Integer>, W>(
+        windowingStrategy,
+        SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()),
+        IterableCoder.of(VarIntCoder.of()),
+        PipelineOptionsFactory.create(),
+        NullSideInputReader.empty());
+  }
+
+  public static <W extends BoundedWindow> ReduceFnTester<Integer, Iterable<Integer>, W>
+      nonCombining(WindowFn<?, W> windowFn, Trigger trigger, AccumulationMode mode,
+          Duration allowedDataLateness, ClosingBehavior closingBehavior) throws Exception {
+    WindowingStrategy<?, W> strategy =
+        WindowingStrategy.of(windowFn)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTrigger(trigger)
+            .withMode(mode)
+            .withAllowedLateness(allowedDataLateness)
+            .withClosingBehavior(closingBehavior);
+    return nonCombining(strategy);
+  }
+
+  public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
+      combining(WindowingStrategy<?, W> strategy,
+          KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn,
+          Coder<OutputT> outputCoder) throws Exception {
+
+    CoderRegistry registry = new CoderRegistry();
+    registry.registerStandardCoders();
+    AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
+        AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
+            combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
+    return new ReduceFnTester<Integer, OutputT, W>(
+        strategy,
+        SystemReduceFn.<String, Integer, AccumT, OutputT, W>combining(StringUtf8Coder.of(), fn),
+        outputCoder,
+        PipelineOptionsFactory.create(),
+        NullSideInputReader.empty());
+  }
+
+  public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
+  combining(WindowingStrategy<?, W> strategy,
+      KeyedCombineFnWithContext<String, Integer, AccumT, OutputT> combineFn,
+      Coder<OutputT> outputCoder,
+      PipelineOptions options,
+      SideInputReader sideInputReader) throws Exception {
+    CoderRegistry registry = new CoderRegistry();
+    registry.registerStandardCoders();
+    AppliedCombineFn<String, Integer, AccumT, OutputT> fn =
+        AppliedCombineFn.<String, Integer, AccumT, OutputT>withInputCoder(
+            combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
+    return new ReduceFnTester<Integer, OutputT, W>(
+        strategy,
+        SystemReduceFn.<String, Integer, AccumT, OutputT, W>combining(StringUtf8Coder.of(), fn),
+        outputCoder,
+        options,
+        sideInputReader);
+  }
+  public static <W extends BoundedWindow, AccumT, OutputT> ReduceFnTester<Integer, OutputT, W>
+      combining(WindowFn<?, W> windowFn, Trigger trigger, AccumulationMode mode,
+          KeyedCombineFn<String, Integer, AccumT, OutputT> combineFn, Coder<OutputT> outputCoder,
+          Duration allowedDataLateness) throws Exception {
+
+    WindowingStrategy<?, W> strategy =
+        WindowingStrategy.of(windowFn)
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withTrigger(trigger)
+            .withMode(mode)
+            .withAllowedLateness(allowedDataLateness);
+
+    return combining(strategy, combineFn, outputCoder);
+  }
+
+  private ReduceFnTester(WindowingStrategy<?, W> wildcardStrategy,
+      ReduceFn<String, InputT, OutputT, W> reduceFn, Coder<OutputT> outputCoder,
+      PipelineOptions options, SideInputReader sideInputReader) throws Exception {
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<Object, W> objectStrategy = (WindowingStrategy<Object, W>) wildcardStrategy;
+
+    this.objectStrategy = objectStrategy;
+    this.reduceFn = reduceFn;
+    this.windowFn = objectStrategy.getWindowFn();
+    this.windowingInternals = new TestWindowingInternals(sideInputReader);
+    this.outputCoder = outputCoder;
+    this.autoAdvanceOutputWatermark = true;
+    this.executableTrigger = wildcardStrategy.getTrigger();
+    this.options = options;
+  }
+
+  public void setAutoAdvanceOutputWatermark(boolean autoAdvanceOutputWatermark) {
+    this.autoAdvanceOutputWatermark = autoAdvanceOutputWatermark;
+  }
+
+  @Nullable
+  public Instant getNextTimer(TimeDomain domain) {
+    return timerInternals.getNextTimer(domain);
+  }
+
+  ReduceFnRunner<String, InputT, OutputT, W> createRunner() {
+    return new ReduceFnRunner<>(
+        KEY,
+        objectStrategy,
+        stateInternals,
+        timerInternals,
+        windowingInternals,
+        droppedDueToClosedWindow,
+        reduceFn,
+        options);
+  }
+
+  public ExecutableTrigger getTrigger() {
+    return executableTrigger;
+  }
+
+  public boolean isMarkedFinished(W window) {
+    return createRunner().isFinished(window);
+  }
+
+  public boolean hasNoActiveWindows() {
+    return createRunner().hasNoActiveWindows();
+  }
+
+  @SafeVarargs
+  public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) {
+    assertHasOnlyGlobalAndAllowedTags(
+        ImmutableSet.copyOf(expectedWindows),
+        ImmutableSet.<StateTag<? super String, ?>>of(TriggerRunner.FINISHED_BITS_TAG));
+  }
+
+  @SafeVarargs
+  public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expectedWindows) {
+    assertHasOnlyGlobalAndAllowedTags(
+        ImmutableSet.copyOf(expectedWindows),
+        ImmutableSet.<StateTag<? super String, ?>>of(
+            TriggerRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG,
+            WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
+            WatermarkHold.EXTRA_HOLD_TAG));
+  }
+
+  public final void assertHasOnlyGlobalState() {
+    assertHasOnlyGlobalAndAllowedTags(
+        Collections.<W>emptySet(), Collections.<StateTag<? super String, ?>>emptySet());
+  }
+
+  @SafeVarargs
+  public final void assertHasOnlyGlobalAndPaneInfoFor(W... expectedWindows) {
+    assertHasOnlyGlobalAndAllowedTags(
+        ImmutableSet.copyOf(expectedWindows),
+        ImmutableSet.<StateTag<? super String, ?>>of(
+            PaneInfoTracker.PANE_INFO_TAG,
+            WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()),
+            WatermarkHold.EXTRA_HOLD_TAG));
+  }
+
+  /**
+   * Verifies that the the set of windows that have any state stored is exactly
+   * {@code expectedWindows} and that each of these windows has only tags from {@code allowedTags}.
+   */
+  private void assertHasOnlyGlobalAndAllowedTags(
+      Set<W> expectedWindows, Set<StateTag<? super String, ?>> allowedTags) {
+    Set<StateNamespace> expectedWindowsSet = new HashSet<>();
+    for (W expectedWindow : expectedWindows) {
+      expectedWindowsSet.add(windowNamespace(expectedWindow));
+    }
+    Map<StateNamespace, Set<StateTag<? super String, ?>>> actualWindows = new HashMap<>();
+
+    for (StateNamespace namespace : stateInternals.getNamespacesInUse()) {
+      if (namespace instanceof StateNamespaces.GlobalNamespace) {
+        continue;
+      } else if (namespace instanceof StateNamespaces.WindowNamespace) {
+        Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
+        if (tagsInUse.isEmpty()) {
+          continue;
+        }
+        actualWindows.put(namespace, tagsInUse);
+        Set<StateTag<? super String, ?>> unexpected = Sets.difference(tagsInUse, allowedTags);
+        if (unexpected.isEmpty()) {
+          continue;
+        } else {
+          fail(namespace + " has unexpected states: " + tagsInUse);
+        }
+      } else if (namespace instanceof StateNamespaces.WindowAndTriggerNamespace) {
+        Set<StateTag<? super String, ?>> tagsInUse = stateInternals.getTagsInUse(namespace);
+        assertTrue(namespace + " contains " + tagsInUse, tagsInUse.isEmpty());
+      } else {
+        fail("Unrecognized namespace " + namespace);
+      }
+    }
+
+    assertEquals("Still in use: " + actualWindows.toString(), expectedWindowsSet,
+        actualWindows.keySet());
+  }
+
+  private StateNamespace windowNamespace(W window) {
+    return StateNamespaces.window(windowFn.windowCoder(), window);
+  }
+
+  public Instant getWatermarkHold() {
+    return stateInternals.earliestWatermarkHold();
+  }
+
+  public Instant getOutputWatermark() {
+    return timerInternals.currentOutputWatermarkTime();
+  }
+
+  public long getElementsDroppedDueToClosedWindow() {
+    return droppedDueToClosedWindow.getSum();
+  }
+
+  /**
+   * How many panes do we have in the output?
+   */
+  public int getOutputSize() {
+    return windowingInternals.outputs.size();
+  }
+
+  /**
+   * Retrieve the values that have been output to this time, and clear out the output accumulator.
+   */
+  public List<WindowedValue<OutputT>> extractOutput() {
+    ImmutableList<WindowedValue<OutputT>> result =
+        FluentIterable.from(windowingInternals.outputs)
+            .transform(new Function<WindowedValue<KV<String, OutputT>>, WindowedValue<OutputT>>() {
+              @Override
+              public WindowedValue<OutputT> apply(WindowedValue<KV<String, OutputT>> input) {
+                return input.withValue(input.getValue().getValue());
+              }
+            })
+            .toList();
+    windowingInternals.outputs.clear();
+    return result;
+  }
+
+  /**
+   * Advance the input watermark to the specified time, firing any timers that should
+   * fire. Then advance the output watermark as far as possible.
+   */
+  public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
+    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
+    timerInternals.advanceInputWatermark(runner, newInputWatermark);
+    runner.persist();
+  }
+
+  /**
+   * If {@link #autoAdvanceOutputWatermark} is {@literal false}, advance the output watermark
+   * to the given value. Otherwise throw.
+   */
+  public void advanceOutputWatermark(Instant newOutputWatermark) throws Exception {
+    timerInternals.advanceOutputWatermark(newOutputWatermark);
+  }
+
+  /** Advance the processing time to the specified time, firing any timers that should fire. */
+  public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
+    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
+    timerInternals.advanceProcessingTime(runner, newProcessingTime);
+    runner.persist();
+  }
+
+  /**
+   * Advance the synchronized processing time to the specified time,
+   * firing any timers that should fire.
+   */
+  public void advanceSynchronizedProcessingTime(Instant newProcessingTime) throws Exception {
+    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
+    timerInternals.advanceSynchronizedProcessingTime(runner, newProcessingTime);
+    runner.persist();
+  }
+
+  /**
+   * Inject all the timestamped values (after passing through the window function) as if they
+   * arrived in a single chunk of a bundle (or work-unit).
+   */
+  @SafeVarargs
+  public final void injectElements(TimestampedValue<InputT>... values) throws Exception {
+    for (TimestampedValue<InputT> value : values) {
+      WindowTracing.trace("TriggerTester.injectElements: {}", value);
+    }
+    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
+    runner.processElements(
+        Iterables.transform(
+            Arrays.asList(values),
+            new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() {
+              @Override
+              public WindowedValue<InputT> apply(TimestampedValue<InputT> input) {
+                try {
+                  InputT value = input.getValue();
+                  Instant timestamp = input.getTimestamp();
+                  Collection<W> windows =
+                      windowFn.assignWindows(
+                          new TestAssignContext<W>(
+                              windowFn, value, timestamp, GlobalWindow.INSTANCE));
+                  return WindowedValue.of(value, timestamp, windows, PaneInfo.NO_FIRING);
+                } catch (Exception e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            }));
+
+    // Persist after each bundle.
+    runner.persist();
+  }
+
+  public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception {
+    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
+    runner.onTimer(
+        TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
+    runner.persist();
+  }
+
+  /**
+   * Simulate state.
+   */
+  private static class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
+
+    public TestInMemoryStateInternals(K key) {
+      super(key);
+    }
+
+    public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) {
+      Set<StateTag<? super K, ?>> inUse = new HashSet<>();
+      for (Entry<StateTag<? super K, ?>, State> entry :
+        inMemoryState.getTagsInUse(namespace).entrySet()) {
+        if (!isEmptyForTesting(entry.getValue())) {
+          inUse.add(entry.getKey());
+        }
+      }
+      return inUse;
+    }
+
+    public Set<StateNamespace> getNamespacesInUse() {
+      return inMemoryState.getNamespacesInUse();
+    }
+
+    /** Return the earliest output watermark hold in state, or null if none. */
+    public Instant earliestWatermarkHold() {
+      Instant minimum = null;
+      for (State storage : inMemoryState.values()) {
+        if (storage instanceof WatermarkHoldState) {
+          Instant hold = ((WatermarkHoldState<?>) storage).read();
+          if (minimum == null || (hold != null && hold.isBefore(minimum))) {
+            minimum = hold;
+          }
+        }
+      }
+      return minimum;
+    }
+  }
+
+  /**
+   * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output
+   * elements.
+   */
+  private class TestWindowingInternals implements WindowingInternals<InputT, KV<String, OutputT>> {
+    private List<WindowedValue<KV<String, OutputT>>> outputs = new ArrayList<>();
+    private SideInputReader sideInputReader;
+
+    private TestWindowingInternals(SideInputReader sideInputReader) {
+      this.sideInputReader = sideInputReader;
+    }
+
+    @Override
+    public void outputWindowedValue(KV<String, OutputT> output, Instant timestamp,
+        Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+      // Copy the output value (using coders) before capturing it.
+      KV<String, OutputT> copy = SerializableUtils.<KV<String, OutputT>>ensureSerializableByCoder(
+          KvCoder.of(StringUtf8Coder.of(), outputCoder), output, "outputForWindow");
+      WindowedValue<KV<String, OutputT>> value = WindowedValue.of(copy, timestamp, windows, pane);
+      outputs.add(value);
+    }
+
+    @Override
+    public TimerInternals timerInternals() {
+      throw new UnsupportedOperationException(
+          "Testing triggers should not use timers from WindowingInternals.");
+    }
+
+    @Override
+    public Collection<? extends BoundedWindow> windows() {
+      throw new UnsupportedOperationException(
+          "Testing triggers should not use windows from WindowingInternals.");
+    }
+
+    @Override
+    public PaneInfo pane() {
+      throw new UnsupportedOperationException(
+          "Testing triggers should not use pane from WindowingInternals.");
+    }
+
+    @Override
+    public <T> void writePCollectionViewData(
+        TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+      throw new UnsupportedOperationException(
+          "Testing triggers should not use writePCollectionViewData from WindowingInternals.");
+    }
+
+    @Override
+    public StateInternals<Object> stateInternals() {
+      // Safe for testing only
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      TestInMemoryStateInternals<Object> untypedStateInternals =
+          (TestInMemoryStateInternals) stateInternals;
+      return untypedStateInternals;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+      if (!sideInputReader.contains(view)) {
+        throw new IllegalArgumentException("calling sideInput() with unknown view");
+      }
+      BoundedWindow sideInputWindow =
+          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+      return sideInputReader.get(view, sideInputWindow);
+    }
+  }
+
+  private static class TestAssignContext<W extends BoundedWindow>
+      extends WindowFn<Object, W>.AssignContext {
+    private Object element;
+    private Instant timestamp;
+    private BoundedWindow window;
+
+    public TestAssignContext(
+        WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) {
+      windowFn.super();
+      this.element = element;
+      this.timestamp = timestamp;
+      this.window = window;
+    }
+
+    @Override
+    public Object element() {
+      return element;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return window;
+    }
+  }
+
+  private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
+    private final String name;
+    private long sum = 0;
+
+    public InMemoryLongSumAggregator(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public void addValue(Long value) {
+      sum += value;
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public CombineFn<Long, ?, Long> getCombineFn() {
+      return new Sum.SumLongFn();
+    }
+
+    public long getSum() {
+      return sum;
+    }
+  }
+
+  /**
+   * Simulate the firing of timers and progression of input and output watermarks for a
+   * single computation and key in a Windmill-like streaming environment. Similar to
+   * {@link BatchTimerInternals}, but also tracks the output watermark.
+   */
+  private class TestTimerInternals implements TimerInternals {
+    /** At most one timer per timestamp is kept. */
+    private Set<TimerData> existingTimers = new HashSet<>();
+
+    /** Pending input watermark timers, in timestamp order. */
+    private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+
+    /** Pending processing time timers, in timestamp order. */
+    private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+
+    /** Current input watermark. */
+    @Nullable
+    private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+    /** Current output watermark. */
+    @Nullable
+    private Instant outputWatermarkTime = null;
+
+    /** Current processing time. */
+    private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+    /** Current synchronized processing time. */
+    @Nullable
+    private Instant synchronizedProcessingTime = null;
+
+    @Nullable
+    public Instant getNextTimer(TimeDomain domain) {
+      TimerData data = null;
+      switch (domain) {
+        case EVENT_TIME:
+           data = watermarkTimers.peek();
+           break;
+        case PROCESSING_TIME:
+        case SYNCHRONIZED_PROCESSING_TIME:
+          data = processingTimers.peek();
+          break;
+      }
+      checkNotNull(data); // cases exhaustive
+      return data == null ? null : data.getTimestamp();
+    }
+
+    private PriorityQueue<TimerData> queue(TimeDomain domain) {
+      switch (domain) {
+        case EVENT_TIME:
+          return watermarkTimers;
+        case PROCESSING_TIME:
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return processingTimers;
+      }
+      throw new RuntimeException(); // cases exhaustive
+    }
+
+    @Override
+    public void setTimer(TimerData timer) {
+      WindowTracing.trace("TestTimerInternals.setTimer: {}", timer);
+      if (existingTimers.add(timer)) {
+        queue(timer.getDomain()).add(timer);
+      }
+    }
+
+    @Override
+    public void deleteTimer(TimerData timer) {
+      WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer);
+      existingTimers.remove(timer);
+      queue(timer.getDomain()).remove(timer);
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return processingTime;
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return synchronizedProcessingTime;
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return checkNotNull(inputWatermarkTime);
+    }
+
+    @Override
+    @Nullable
+    public Instant currentOutputWatermarkTime() {
+      return outputWatermarkTime;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("watermarkTimers", watermarkTimers)
+          .add("processingTimers", processingTimers)
+          .add("inputWatermarkTime", inputWatermarkTime)
+          .add("outputWatermarkTime", outputWatermarkTime)
+          .add("processingTime", processingTime)
+          .toString();
+    }
+
+    public void advanceInputWatermark(
+        ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark) throws Exception {
+      checkNotNull(newInputWatermark);
+      checkState(
+          !newInputWatermark.isBefore(inputWatermarkTime),
+          "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
+          newInputWatermark);
+      WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}",
+          inputWatermarkTime, newInputWatermark);
+      inputWatermarkTime = newInputWatermark;
+      advanceAndFire(runner, newInputWatermark, TimeDomain.EVENT_TIME);
+
+      Instant hold = stateInternals.earliestWatermarkHold();
+      if (hold == null) {
+        WindowTracing.trace("TestTimerInternals.advanceInputWatermark: no holds, "
+            + "so output watermark = input watermark");
+        hold = inputWatermarkTime;
+      }
+      if (autoAdvanceOutputWatermark) {
+        advanceOutputWatermark(hold);
+      }
+    }
+
+    public void advanceOutputWatermark(Instant newOutputWatermark) {
+      checkNotNull(newOutputWatermark);
+      if (newOutputWatermark.isAfter(inputWatermarkTime)) {
+        WindowTracing.trace(
+            "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}",
+            newOutputWatermark, inputWatermarkTime);
+        newOutputWatermark = inputWatermarkTime;
+      }
+      checkState(
+          outputWatermarkTime == null || !newOutputWatermark.isBefore(outputWatermarkTime),
+          "Cannot move output watermark time backwards from %s to %s", outputWatermarkTime,
+          newOutputWatermark);
+      WindowTracing.trace("TestTimerInternals.advanceOutputWatermark: from {} to {}",
+          outputWatermarkTime, newOutputWatermark);
+      outputWatermarkTime = newOutputWatermark;
+    }
+
+    public void advanceProcessingTime(
+        ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime) throws Exception {
+      checkState(!newProcessingTime.isBefore(processingTime),
+          "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime);
+      WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}", processingTime,
+          newProcessingTime);
+      processingTime = newProcessingTime;
+      advanceAndFire(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
+    }
+
+    public void advanceSynchronizedProcessingTime(
+        ReduceFnRunner<?, ?, ?, ?> runner, Instant newSynchronizedProcessingTime) throws Exception {
+      checkState(!newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
+          "Cannot move processing time backwards from %s to %s", processingTime,
+          newSynchronizedProcessingTime);
+      WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}",
+          synchronizedProcessingTime, newSynchronizedProcessingTime);
+      synchronizedProcessingTime = newSynchronizedProcessingTime;
+      advanceAndFire(
+          runner, newSynchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    }
+
+    private void advanceAndFire(
+        ReduceFnRunner<?, ?, ?, ?> runner, Instant currentTime, TimeDomain domain)
+            throws Exception {
+      PriorityQueue<TimerData> queue = queue(domain);
+      boolean shouldFire = false;
+
+      do {
+        TimerData timer = queue.peek();
+        // Timers fire when the current time progresses past the timer time.
+        shouldFire = timer != null && currentTime.isAfter(timer.getTimestamp());
+        if (shouldFire) {
+          WindowTracing.trace(
+              "TestTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime);
+          // Remove before firing, so that if the trigger adds another identical
+          // timer we don't remove it.
+          queue.remove();
+
+          runner.onTimer(timer);
+        }
+      } while (shouldFire);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
new file mode 100644
index 0000000..adb0aac
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.util.BaseExecutionContext.StepContext;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for base {@link DoFnRunnerBase} functionality.
+ */
+@RunWith(JUnit4.class)
+public class SimpleDoFnRunnerTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testExceptionsWrappedAsUserCodeException() {
+    ThrowingDoFn fn = new ThrowingDoFn();
+    DoFnRunner<String, String> runner = createRunner(fn);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(is(fn.exceptionToThrow));
+
+    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
+  }
+
+  @Test
+  public void testSystemDoFnInternalExceptionsNotWrapped() {
+    ThrowingSystemDoFn fn = new ThrowingSystemDoFn();
+    DoFnRunner<String, String> runner = createRunner(fn);
+
+    thrown.expect(is(fn.exceptionToThrow));
+
+    runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
+  }
+
+  private DoFnRunner<String, String> createRunner(OldDoFn<String, String> fn) {
+    // Pass in only necessary parameters for the test
+    List<TupleTag<?>> sideOutputTags = Arrays.asList();
+    StepContext context = mock(StepContext.class);
+    return new SimpleDoFnRunner<>(
+          null, fn, null, null, null, sideOutputTags, context, null, null);
+  }
+
+  static class ThrowingDoFn extends OldDoFn<String, String> {
+    final Exception exceptionToThrow =
+        new UnsupportedOperationException("Expected exception");
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      throw exceptionToThrow;
+    }
+  }
+
+  @SystemDoFnInternal
+  static class ThrowingSystemDoFn extends ThrowingDoFn {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java
deleted file mode 100644
index 20a9852..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaceForTest;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link BatchTimerInternals}.
- */
-@RunWith(JUnit4.class)
-public class BatchTimerInternalsTest {
-
-  private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
-
-  @Mock
-  private ReduceFnRunner<?, ?, ?, ?> mockRunner;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testFiringTimers() throws Exception {
-    BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(processingTime2);
-
-    underTest.advanceProcessingTime(mockRunner, new Instant(20));
-    Mockito.verify(mockRunner).onTimer(processingTime1);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    // Advancing just a little shouldn't refire
-    underTest.advanceProcessingTime(mockRunner, new Instant(21));
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    // Adding the timer and advancing a little should refire
-    underTest.setTimer(processingTime1);
-    Mockito.verify(mockRunner).onTimer(processingTime1);
-    underTest.advanceProcessingTime(mockRunner, new Instant(21));
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    // And advancing the rest of the way should still have the other timer
-    underTest.advanceProcessingTime(mockRunner, new Instant(30));
-    Mockito.verify(mockRunner).onTimer(processingTime2);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-  }
-
-  @Test
-  public void testTimerOrdering() throws Exception {
-    BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
-    TimerData watermarkTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    TimerData watermarkTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(watermarkTime1);
-    underTest.setTimer(processingTime2);
-    underTest.setTimer(watermarkTime2);
-
-    underTest.advanceInputWatermark(mockRunner, new Instant(30));
-    Mockito.verify(mockRunner).onTimer(watermarkTime1);
-    Mockito.verify(mockRunner).onTimer(watermarkTime2);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    underTest.advanceProcessingTime(mockRunner, new Instant(30));
-    Mockito.verify(mockRunner).onTimer(processingTime1);
-    Mockito.verify(mockRunner).onTimer(processingTime2);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-  }
-
-  @Test
-  public void testDeduplicate() throws Exception {
-    BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
-    TimerData watermarkTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
-    TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    underTest.setTimer(watermarkTime);
-    underTest.setTimer(watermarkTime);
-    underTest.setTimer(processingTime);
-    underTest.setTimer(processingTime);
-    underTest.advanceProcessingTime(mockRunner, new Instant(20));
-    underTest.advanceInputWatermark(mockRunner, new Instant(20));
-
-    Mockito.verify(mockRunner).onTimer(processingTime);
-    Mockito.verify(mockRunner).onTimer(watermarkTime);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
deleted file mode 100644
index 215cd4c..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
+++ /dev/null
@@ -1,658 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Properties of {@link GroupAlsoByWindowsDoFn}.
- *
- * <p>Some properties may not hold of some implementations, due to restrictions on the context
- * in which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not
- * support merging windows.
- */
-public class GroupAlsoByWindowsProperties {
-
-  /**
-   * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide
-   * the appropriate windowing strategy under test.
-   */
-  public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> {
-    <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>
-    forStrategy(WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
-  }
-
-  /**
-   * Tests that for empty input and the given {@link WindowingStrategy}, the provided GABW
-   * implementation produces no output.
-   *
-   * <p>The input type is deliberately left as a wildcard, since it is not relevant.
-   */
-  public static <K, InputT, OutputT> void emptyInputEmptyOutput(
-      GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
-
-    // This key should never actually be used, though it is eagerly passed to the
-    // StateInternalsFactory so must be non-null
-    @SuppressWarnings("unchecked")
-    K fakeKey = (K) "this key should never be used";
-
-    DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> result = runGABW(
-        gabwFactory,
-        windowingStrategy,
-        fakeKey,
-        Collections.<WindowedValue<InputT>>emptyList());
-
-    assertThat(result.peekOutputElements(), hasSize(0));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows.
-   */
-  public static void groupsElementsIntoFixedWindows(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(2),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(13),
-                Arrays.asList(window(10, 20)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them into sliding windows.
-   *
-   * <p>In the input here, each element occurs in multiple windows.
-   */
-  public static void groupsElementsIntoSlidingWindowsWithMinTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(
-        SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(5),
-                Arrays.asList(window(-10, 10), window(0, 20)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(15),
-                Arrays.asList(window(0, 20), window(10, 30)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(3));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10)));
-    assertThat(item0.getValue().getValue(), contains("v1"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20)));
-    assertThat(item1.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
-    assertThat(item1.getTimestamp(), equalTo(new Instant(10)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item2 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30)));
-    assertThat(item2.getValue().getValue(), contains("v2"));
-    // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
-    assertThat(item2.getTimestamp(), equalTo(new Instant(20)));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups and combines them according to sliding windows.
-   *
-   * <p>In the input here, each element occurs in multiple windows.
-   */
-  public static void combinesElementsInSlidingWindows(
-      GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
-      CombineFn<Long, ?, Long> combineFn)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
-
-    DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                1L,
-                new Instant(5),
-                Arrays.asList(window(-10, 10), window(0, 20)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                2L,
-                new Instant(15),
-                Arrays.asList(window(0, 20), window(10, 30)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                4L,
-                new Instant(18),
-                Arrays.asList(window(0, 20), window(10, 30)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(3));
-
-    TimestampedValue<KV<String, Long>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10)));
-    assertThat(item0.getValue().getKey(), equalTo("k"));
-    assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L))));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(5L)));
-
-    TimestampedValue<KV<String, Long>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20)));
-    assertThat(item1.getValue().getKey(), equalTo("k"));
-    assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L))));
-    // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
-    assertThat(item1.getTimestamp(), equalTo(new Instant(10L)));
-
-    TimestampedValue<KV<String, Long>> item2 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30)));
-    assertThat(item2.getValue().getKey(), equalTo("k"));
-    assertThat(item2.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(2L, 4L))));
-    // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window
-    assertThat(item2.getTimestamp(), equalTo(new Instant(20L)));
-  }
-
-  /**
-   * Tests that the given GABW implementation correctly groups elements that fall into overlapping
-   * windows that are not merged.
-   */
-  public static void groupsIntoOverlappingNonmergingWindows(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 5)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(4),
-                Arrays.asList(window(1, 5)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(4),
-                Arrays.asList(window(0, 5)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 5)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3"));
-    assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(1, 5)));
-    assertThat(item1.getValue().getValue(), contains("v2"));
-    assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp()));
-  }
-
-  /**
-   * Tests that the given GABW implementation correctly groups elements into merged sessions.
-   */
-  public static void groupsElementsInMergedSessions(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(15),
-                Arrays.asList(window(15, 25)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
-  }
-
-  /**
-   * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
-   * session window correctly according to the provided {@link CombineFn}.
-   */
-  public static void combinesElementsPerSession(
-      GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
-      CombineFn<Long, ?, Long> combineFn)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)));
-
-    DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                1L,
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                2L,
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                4L,
-                new Instant(15),
-                Arrays.asList(window(15, 25)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Long>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
-    assertThat(item0.getValue().getKey(), equalTo("k"));
-    assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
-
-    TimestampedValue<KV<String, Long>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
-    assertThat(item1.getValue().getKey(), equalTo("k"));
-    assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
-    assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows and also sets the output timestamp
-   * according to the policy {@link OutputTimeFns#outputAtEndOfWindow()}.
-   */
-  public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "key",
-            WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(2),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(13),
-                Arrays.asList(window(10, 20)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp()));
-  }
-
-  /**
-   * Tests that for a simple sequence of elements on the same key, the given GABW implementation
-   * correctly groups them according to fixed windows and also sets the output timestamp
-   * according to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}.
-   */
-  public static void groupsElementsIntoFixedWindowsWithLatestTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                "v1",
-                new Instant(1),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(2),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(13),
-                Arrays.asList(window(10, 20)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(2)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20)));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(13)));
-  }
-
-  /**
-   * Tests that the given GABW implementation correctly groups elements into merged sessions
-   * with output timestamps at the end of the merged window.
-   */
-  public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
-
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                "v1",
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(15),
-                Arrays.asList(window(15, 25)),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15)));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp()));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25)));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp()));
-  }
-
-  /**
-   * Tests that the given GABW implementation correctly groups elements into merged sessions
-   * with output timestamps at the end of the merged window.
-   */
-  public static void groupsElementsInMergedSessionsWithLatestTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp());
-
-    BoundedWindow unmergedWindow = window(15, 25);
-    DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                "v1",
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v2",
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                "v3",
-                new Instant(15),
-                Arrays.asList(unmergedWindow),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    BoundedWindow mergedWindow = window(0, 15);
-    TimestampedValue<KV<String, Iterable<String>>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(mergedWindow));
-    assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2"));
-    assertThat(item0.getTimestamp(), equalTo(new Instant(5)));
-
-    TimestampedValue<KV<String, Iterable<String>>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(unmergedWindow));
-    assertThat(item1.getValue().getValue(), contains("v3"));
-    assertThat(item1.getTimestamp(), equalTo(new Instant(15)));
-  }
-
-  /**
-   * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per
-   * session window correctly according to the provided {@link CombineFn}.
-   */
-  public static void combinesElementsPerSessionWithEndOfWindowTimestamp(
-      GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory,
-      CombineFn<Long, ?, Long> combineFn)
-          throws Exception {
-
-    WindowingStrategy<?, IntervalWindow> windowingStrategy =
-        WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10)))
-        .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow());
-
-    BoundedWindow secondWindow = window(15, 25);
-    DoFnTester<?, KV<String, Long>> result =
-        runGABW(gabwFactory, windowingStrategy, "k",
-            WindowedValue.of(
-                1L,
-                new Instant(0),
-                Arrays.asList(window(0, 10)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                2L,
-                new Instant(5),
-                Arrays.asList(window(5, 15)),
-                PaneInfo.NO_FIRING),
-            WindowedValue.of(
-                4L,
-                new Instant(15),
-                Arrays.asList(secondWindow),
-                PaneInfo.NO_FIRING));
-
-    assertThat(result.peekOutputElements(), hasSize(2));
-
-    BoundedWindow firstResultWindow = window(0, 15);
-    TimestampedValue<KV<String, Long>> item0 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(firstResultWindow));
-    assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L))));
-    assertThat(item0.getTimestamp(), equalTo(firstResultWindow.maxTimestamp()));
-
-    TimestampedValue<KV<String, Long>> item1 =
-        Iterables.getOnlyElement(result.peekOutputElementsInWindow(secondWindow));
-    assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L))));
-    assertThat(item1.getTimestamp(),
-        equalTo(secondWindow.maxTimestamp()));
-  }
-
-  @SafeVarargs
-  private static <K, InputT, OutputT, W extends BoundedWindow>
-  DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW(
-      GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
-      WindowingStrategy<?, W> windowingStrategy,
-      K key,
-      WindowedValue<InputT>... values) throws Exception {
-    return runGABW(gabwFactory, windowingStrategy, key, Arrays.asList(values));
-  }
-
-  private static <K, InputT, OutputT, W extends BoundedWindow>
-  DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW(
-      GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory,
-      WindowingStrategy<?, W> windowingStrategy,
-      K key,
-      Collection<WindowedValue<InputT>> values) throws Exception {
-
-    final StateInternalsFactory<K> stateInternalsCache = new CachingStateInternalsFactory<K>();
-
-    DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> tester =
-        DoFnTester.of(gabwFactory.forStrategy(windowingStrategy, stateInternalsCache));
-
-    // Though we use a DoFnTester, the function itself is instantiated directly by the
-    // runner and should not be serialized; it may not even be serializable.
-    tester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    tester.startBundle();
-    tester.processElement(KV.<K, Iterable<WindowedValue<InputT>>>of(key, values));
-    tester.finishBundle();
-
-    // Sanity check for corruption
-    for (KV<K, OutputT> elem : tester.peekOutputElements()) {
-      assertThat(elem.getKey(), equalTo(key));
-    }
-
-    return tester;
-  }
-
-  private static BoundedWindow window(long start, long end) {
-    return new IntervalWindow(new Instant(start), new Instant(end));
-  }
-
-  private static final class CachingStateInternalsFactory<K> implements StateInternalsFactory<K> {
-    private final LoadingCache<K, StateInternals<K>> stateInternalsCache;
-
-    private CachingStateInternalsFactory() {
-      this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader<K>());
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public StateInternals<K> stateInternalsForKey(K key) {
-      try {
-        return stateInternalsCache.get(key);
-      } catch (Exception exc) {
-        throw new RuntimeException(exc);
-      }
-    }
-  }
-
-  private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals<K>> {
-    @Override
-    public StateInternals<K> load(K key) throws Exception {
-      return InMemoryStateInternals.forKey(key);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
deleted file mode 100644
index a1586c8..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Unit tests for {@link GroupAlsoByWindowsViaOutputBufferDoFn}.
- */
-@RunWith(JUnit4.class)
-public class GroupAlsoByWindowsViaOutputBufferDoFnTest {
-
-  private class BufferingGABWViaOutputBufferDoFnFactory<K, InputT>
-  implements GroupAlsoByWindowsDoFnFactory<K, InputT, Iterable<InputT>> {
-
-    private final Coder<InputT> inputCoder;
-
-    public BufferingGABWViaOutputBufferDoFnFactory(Coder<InputT> inputCoder) {
-      this.inputCoder = inputCoder;
-    }
-
-    @Override
-    public <W extends BoundedWindow>
-        GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
-            WindowingStrategy<?, W> windowingStrategy,
-            StateInternalsFactory<K> stateInternalsFactory) {
-      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>(
-          windowingStrategy,
-          stateInternalsFactory,
-          SystemReduceFn.<K, InputT, W>buffering(inputCoder));
-    }
-  }
-
-  @Test
-  public void testEmptyInputEmptyOutput() throws Exception {
-    GroupAlsoByWindowsProperties.emptyInputEmptyOutput(
-        new BufferingGABWViaOutputBufferDoFnFactory<>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindows(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSlidingWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsIntoOverlappingNonmergingWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsIntoOverlappingNonmergingWindows(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsIntoSessions() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsInMergedSessions(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindowsWithEndOfWindowTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindowsWithLatestTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithLatestTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSessionsWithLatestTimestamp() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithLatestTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a62e5018/runners/core-java/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java
deleted file mode 100644
index c63e43e..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import java.util.Arrays;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.LateDataDroppingDoFnRunner.LateDataFilter;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Unit tests for {@link LateDataDroppingDoFnRunner}.
- */
-@RunWith(JUnit4.class)
-public class LateDataDroppingDoFnRunnerTest {
-  private static final FixedWindows WINDOW_FN = FixedWindows.of(Duration.millis(10));
-
-  @Mock private TimerInternals mockTimerInternals;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testLateDataFilter() throws Exception {
-    when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(new Instant(15L));
-
-    InMemoryLongSumAggregator droppedDueToLateness =
-        new InMemoryLongSumAggregator("droppedDueToLateness");
-    LateDataFilter lateDataFilter = new LateDataFilter(
-        WindowingStrategy.of(WINDOW_FN), mockTimerInternals, droppedDueToLateness);
-
-    Iterable<WindowedValue<Integer>> actual = lateDataFilter.filter(
-        "a",
-        ImmutableList.of(
-            createDatum(13, 13L),
-            createDatum(5, 5L), // late element, earlier than 4L.
-            createDatum(16, 16L),
-            createDatum(18, 18L)));
-
-    Iterable<WindowedValue<Integer>> expected =  ImmutableList.of(
-        createDatum(13, 13L),
-        createDatum(16, 16L),
-        createDatum(18, 18L));
-    assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class)));
-    assertEquals(1, droppedDueToLateness.sum);
-  }
-
-  private <T> WindowedValue<T> createDatum(T element, long timestampMillis) {
-    Instant timestamp = new Instant(timestampMillis);
-    return WindowedValue.of(
-        element,
-        timestamp,
-        Arrays.asList(WINDOW_FN.assignWindow(timestamp)),
-        PaneInfo.NO_FIRING);
-  }
-
-  private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
-    private final String name;
-    private long sum = 0;
-
-    public InMemoryLongSumAggregator(String name) {
-      this.name = name;
-    }
-
-    @Override
-    public void addValue(Long value) {
-      sum += value;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<Long, ?, Long> getCombineFn() {
-      return new Sum.SumLongFn();
-    }
-  }
-}