You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:41:00 UTC

[29/50] [abbrv] incubator-beam git commit: Put classes in runners-core package into runners.core namespace

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
new file mode 100644
index 0000000..59a7c92
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link PushbackSideInputDoFnRunner}.
+ */
+@RunWith(JUnit4.class)
+public class PushbackSideInputDoFnRunnerTest {
+  @Mock private ReadyCheckingSideInputReader reader;
+  private TestDoFnRunner<Integer, Integer> underlying;
+  private PCollectionView<Integer> singletonView;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+    singletonView =
+        created
+            .apply(Window.into(new IdentitySideInputWindowFn()))
+            .apply(Sum.integersGlobally().asSingletonView());
+
+    underlying = new TestDoFnRunner<>();
+  }
+
+  private PushbackSideInputDoFnRunner<Integer, Integer> createRunner(
+      ImmutableList<PCollectionView<?>> views) {
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        PushbackSideInputDoFnRunner.create(underlying, views, reader);
+    runner.startBundle();
+    return runner;
+  }
+
+  @Test
+  public void startFinishBundleDelegates() {
+    PushbackSideInputDoFnRunner runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    assertThat(underlying.started, is(true));
+    assertThat(underlying.finished, is(false));
+    runner.finishBundle();
+    assertThat(underlying.finished, is(true));
+  }
+
+  @Test
+  public void processElementSideInputNotReady() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(false);
+
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    WindowedValue<Integer> oneWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            new IntervalWindow(new Instant(-500L), new Instant(0L)),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> oneWindowPushback =
+        runner.processElementInReadyWindows(oneWindow);
+    assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
+    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+  }
+
+  @Test
+  public void processElementSideInputNotReadyMultipleWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(false);
+
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+  }
+
+  @Test
+  public void processElementSideInputNotReadySomeWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
+        .thenReturn(false);
+    when(
+            reader.isReady(
+                Mockito.eq(singletonView),
+                org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
+        .thenReturn(true);
+
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
+    IntervalWindow bigWindow =
+        new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(
+        multiWindowPushback,
+        containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
+    assertThat(underlying.inputElems,
+        containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING),
+            WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING)));
+  }
+
+  @Test
+  public void processElementSideInputReadyAllWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(true);
+
+    ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
+    PushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, emptyIterable());
+    assertThat(underlying.inputElems,
+        containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
+  }
+
+  @Test
+  public void processElementNoSideInputs() {
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of());
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, emptyIterable());
+    assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
+  }
+
+  private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+    List<WindowedValue<InputT>> inputElems;
+    private boolean started = false;
+    private boolean finished = false;
+
+    @Override
+    public void startBundle() {
+      started = true;
+      inputElems = new ArrayList<>();
+    }
+
+    @Override
+    public void processElement(WindowedValue<InputT> elem) {
+      inputElems.add(elem);
+    }
+
+    @Override
+    public void finishBundle() {
+      finished = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
new file mode 100644
index 0000000..4d5680c
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -0,0 +1,1446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import com.google.common.collect.Iterables;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.WindowMatchers;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
+import org.apache.beam.sdk.transforms.CombineWithContext.Context;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests for {@link ReduceFnRunner}. These tests instantiate a full "stack" of
+ * {@link ReduceFnRunner} with enclosed {@link ReduceFn}, down to the installed {@link Trigger}
+ * (sometimes mocked). They proceed by injecting elements and advancing watermark and
+ * processing time, then verifying produced panes and counters.
+ */
+@RunWith(JUnit4.class)
+public class ReduceFnRunnerTest {
+  @Mock private SideInputReader mockSideInputReader;
+  private Trigger mockTrigger;
+  private PCollectionView<Integer> mockView;
+
+  private IntervalWindow firstWindow;
+
+  private static Trigger.TriggerContext anyTriggerContext() {
+    return Mockito.<Trigger.TriggerContext>any();
+  }
+  private static Trigger.OnElementContext anyElementContext() {
+    return Mockito.<Trigger.OnElementContext>any();
+  }
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+
+    mockTrigger = mock(Trigger.class, withSettings().serializable());
+
+    @SuppressWarnings("unchecked")
+    PCollectionView<Integer> mockViewUnchecked =
+        mock(PCollectionView.class, withSettings().serializable());
+    mockView = mockViewUnchecked;
+    firstWindow = new IntervalWindow(new Instant(0), new Instant(10));
+  }
+
+  private void injectElement(ReduceFnTester<Integer, ?, IntervalWindow> tester, int element)
+      throws Exception {
+    doNothing().when(mockTrigger).onElement(anyElementContext());
+    tester.injectElements(TimestampedValue.of(element, new Instant(element)));
+  }
+
+  private void triggerShouldFinish(Trigger mockTrigger) throws Exception {
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Exception {
+        @SuppressWarnings("unchecked")
+        Trigger.TriggerContext context =
+            (Trigger.TriggerContext) invocation.getArguments()[0];
+        context.trigger().setFinished(true);
+        return null;
+      }
+    })
+    .when(mockTrigger).onFire(anyTriggerContext());
+ }
+
+  @Test
+  public void testOnElementBufferingDiscarding() throws Exception {
+    // Test basic execution of a trigger using a non-combining window set and discarding mode.
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // Pane of {1, 2}
+    injectElement(tester, 1);
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    injectElement(tester, 2);
+    assertThat(tester.extractOutput(),
+        contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10)));
+
+    // Pane of just 3, and finish
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    injectElement(tester, 3);
+    assertThat(tester.extractOutput(),
+            contains(isSingleWindowedValue(containsInAnyOrder(3), 3, 0, 10)));
+    assertTrue(tester.isMarkedFinished(firstWindow));
+    tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
+
+    // This element shouldn't be seen, because the trigger has finished
+    injectElement(tester, 4);
+
+    assertEquals(1, tester.getElementsDroppedDueToClosedWindow());
+  }
+
+  @Test
+  public void testOnElementBufferingAccumulating() throws Exception {
+    // Test basic execution of a trigger using a non-combining window set and accumulating mode.
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+            AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    injectElement(tester, 1);
+
+    // Fires {1, 2}
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    injectElement(tester, 2);
+
+    // Fires {1, 2, 3} because we are in accumulating mode
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    injectElement(tester, 3);
+
+    // This element shouldn't be seen, because the trigger has finished
+    injectElement(tester, 4);
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10),
+            isSingleWindowedValue(containsInAnyOrder(1, 2, 3), 3, 0, 10)));
+    assertTrue(tester.isMarkedFinished(firstWindow));
+    tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
+  }
+
+  @Test
+  public void testOnElementCombiningDiscarding() throws Exception {
+    // Test basic execution of a trigger using a non-combining window set and discarding mode.
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
+        FixedWindows.of(Duration.millis(10)), mockTrigger, AccumulationMode.DISCARDING_FIRED_PANES,
+        new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of(), Duration.millis(100));
+
+    injectElement(tester, 2);
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    injectElement(tester, 3);
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    injectElement(tester, 4);
+
+    // This element shouldn't be seen, because the trigger has finished
+    injectElement(tester, 6);
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(equalTo(5), 2, 0, 10),
+            isSingleWindowedValue(equalTo(4), 4, 0, 10)));
+    assertTrue(tester.isMarkedFinished(firstWindow));
+    tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
+  }
+
+  /**
+   * Tests that the garbage collection time for a fixed window does not overflow the end of time.
+   */
+  @Test
+  public void testFixedWindowEndOfTimeGarbageCollection() throws Exception {
+
+    Duration allowedLateness = Duration.standardDays(365);
+    Duration windowSize = Duration.millis(10);
+    WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(windowSize);
+
+    // This timestamp falls into a window where the end of the window is before the end of the
+    // global window - the "end of time" - yet its expiration time is after.
+    final Instant elementTimestamp =
+        GlobalWindow.INSTANCE.maxTimestamp().minus(allowedLateness).plus(1);
+
+    IntervalWindow window = Iterables.getOnlyElement(
+        windowFn.assignWindows(
+            windowFn.new AssignContext() {
+              @Override
+              public Object element() {
+                throw new UnsupportedOperationException();
+              }
+              @Override
+              public Instant timestamp() {
+                return elementTimestamp;
+              }
+
+              @Override
+              public BoundedWindow window() {
+                throw new UnsupportedOperationException();
+              }
+            }));
+
+    assertTrue(
+        window.maxTimestamp().isBefore(GlobalWindow.INSTANCE.maxTimestamp()));
+    assertTrue(
+        window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp()));
+
+    // Test basic execution of a trigger using a non-combining window set and accumulating mode.
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(
+            windowFn,
+            AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()),
+            AccumulationMode.DISCARDING_FIRED_PANES,
+            new Sum.SumIntegerFn().<String>asKeyedFn(),
+            VarIntCoder.of(),
+            allowedLateness);
+
+    tester.injectElements(TimestampedValue.of(13, elementTimestamp));
+
+    // Should fire ON_TIME pane and there will be a checkState that the cleanup time
+    // is prior to timestamp max value
+    tester.advanceInputWatermark(window.maxTimestamp());
+
+    // Nothing in the ON_TIME pane (not governed by triggers, but by ReduceFnRunner)
+    assertThat(tester.extractOutput(), emptyIterable());
+
+    tester.injectElements(TimestampedValue.of(42, elementTimestamp));
+
+    // Now the final pane should fire, demonstrating that the GC time was truncated
+    tester.advanceInputWatermark(GlobalWindow.INSTANCE.maxTimestamp());
+    assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55))));
+  }
+
+  @Test
+  public void testOnElementCombiningAccumulating() throws Exception {
+    // Test basic execution of a trigger using a non-combining window set and accumulating mode.
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+            AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
+            VarIntCoder.of(), Duration.millis(100));
+
+    injectElement(tester, 1);
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    injectElement(tester, 2);
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    injectElement(tester, 3);
+
+    // This element shouldn't be seen, because the trigger has finished
+    injectElement(tester, 4);
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(equalTo(3), 1, 0, 10),
+            isSingleWindowedValue(equalTo(6), 3, 0, 10)));
+    assertTrue(tester.isMarkedFinished(firstWindow));
+    tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
+  }
+
+  @Test
+  public void testOnElementCombiningWithContext() throws Exception {
+    Integer expectedValue = 5;
+    WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy
+        .of(FixedWindows.of(Duration.millis(10)))
+        .withTrigger(mockTrigger)
+        .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+        .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+        .withAllowedLateness(Duration.millis(100));
+
+    TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
+    options.setValue(5);
+
+    when(mockSideInputReader.contains(Matchers.<PCollectionView<Integer>>any())).thenReturn(true);
+    when(mockSideInputReader.get(
+        Matchers.<PCollectionView<Integer>>any(), any(BoundedWindow.class))).thenReturn(5);
+
+    @SuppressWarnings({"rawtypes", "unchecked", "unused"})
+    Object suppressWarningsVar = when(mockView.getWindowingStrategyInternal())
+        .thenReturn((WindowingStrategy) windowingStrategy);
+
+    SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue);
+    // Test basic execution of a trigger using a non-combining window set and discarding mode.
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
+        windowingStrategy, combineFn.<String>asKeyedFn(),
+        VarIntCoder.of(), options, mockSideInputReader);
+
+    injectElement(tester, 2);
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    injectElement(tester, 3);
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    injectElement(tester, 4);
+
+    // This element shouldn't be seen, because the trigger has finished
+    injectElement(tester, 6);
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(equalTo(5), 2, 0, 10),
+            isSingleWindowedValue(equalTo(4), 4, 0, 10)));
+    assertTrue(tester.isMarkedFinished(firstWindow));
+    tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
+  }
+
+  @Test
+  public void testWatermarkHoldAndLateData() throws Exception {
+    // Test handling of late data. Specifically, ensure the watermark hold is correct.
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+            AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // Input watermark -> null
+    assertEquals(null, tester.getWatermarkHold());
+    assertEquals(null, tester.getOutputWatermark());
+
+    // All on time data, verify watermark hold.
+    injectElement(tester, 1);
+    injectElement(tester, 3);
+    assertEquals(new Instant(1), tester.getWatermarkHold());
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    injectElement(tester, 2);
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output, contains(
+        isSingleWindowedValue(containsInAnyOrder(1, 2, 3),
+            1, // timestamp
+            0, // window start
+            10))); // window end
+    assertThat(output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
+
+    // Holding for the end-of-window transition.
+    assertEquals(new Instant(9), tester.getWatermarkHold());
+    // Nothing dropped.
+    assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
+
+    // Input watermark -> 4, output watermark should advance that far as well
+    tester.advanceInputWatermark(new Instant(4));
+    assertEquals(new Instant(4), tester.getOutputWatermark());
+
+    // Some late, some on time. Verify that we only hold to the minimum of on-time.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(4));
+    injectElement(tester, 2);
+    injectElement(tester, 3);
+    assertEquals(new Instant(9), tester.getWatermarkHold());
+    injectElement(tester, 5);
+    assertEquals(new Instant(5), tester.getWatermarkHold());
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    injectElement(tester, 4);
+    output = tester.extractOutput();
+    assertThat(output,
+        contains(
+            isSingleWindowedValue(containsInAnyOrder(
+                1, 2, 3, // earlier firing
+                2, 3, 4, 5), // new elements
+            4, // timestamp
+            0, // window start
+            10))); // window end
+    assertThat(output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
+
+    // All late -- output at end of window timestamp.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(8));
+    injectElement(tester, 6);
+    injectElement(tester, 5);
+    assertEquals(new Instant(9), tester.getWatermarkHold());
+    injectElement(tester, 4);
+
+    // Fire the ON_TIME pane
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    tester.advanceInputWatermark(new Instant(10));
+
+    // Output time is end of the window, because all the new data was late, but the pane
+    // is the ON_TIME pane.
+    output = tester.extractOutput();
+    assertThat(output,
+        contains(isSingleWindowedValue(
+            containsInAnyOrder(1, 2, 3, // earlier firing
+                2, 3, 4, 5, // earlier firing
+                4, 5, 6), // new elements
+            9, // timestamp
+            0, // window start
+            10))); // window end
+    assertThat(output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)));
+
+    // This is "pending" at the time the watermark makes it way-late.
+    // Because we're about to expire the window, we output it.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    injectElement(tester, 8);
+    assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
+
+    // Exceed the GC limit, triggering the last pane to be fired
+    tester.advanceInputWatermark(new Instant(50));
+    output = tester.extractOutput();
+    // Output time is still end of the window, because the new data (8) was behind
+    // the output watermark.
+    assertThat(output,
+        contains(isSingleWindowedValue(
+            containsInAnyOrder(1, 2, 3, // earlier firing
+                2, 3, 4, 5, // earlier firing
+                4, 5, 6, // earlier firing
+                8), // new element prior to window becoming expired
+            9, // timestamp
+            0, // window start
+            10))); // window end
+    assertThat(
+        output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(false, true, Timing.LATE, 3, 1)));
+    assertEquals(new Instant(50), tester.getOutputWatermark());
+    assertEquals(null, tester.getWatermarkHold());
+
+    // Late timers are ignored
+    tester.fireTimer(new IntervalWindow(new Instant(0), new Instant(10)), new Instant(12),
+        TimeDomain.EVENT_TIME);
+
+    // And because we're past the end of window + allowed lateness, everything should be cleaned up.
+    assertFalse(tester.isMarkedFinished(firstWindow));
+    tester.assertHasOnlyGlobalAndFinishedSetsFor();
+  }
+
+  @Test
+  public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception {
+    // Make sure holds are only set if they are accompanied by an end-of-window timer.
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+            AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10),
+            ClosingBehavior.FIRE_ALWAYS);
+    tester.setAutoAdvanceOutputWatermark(false);
+
+    // Case: Unobservably late
+    tester.advanceInputWatermark(new Instant(15));
+    tester.advanceOutputWatermark(new Instant(11));
+    injectElement(tester, 14);
+    // Hold was applied, waiting for end-of-window timer.
+    assertEquals(new Instant(14), tester.getWatermarkHold());
+    assertEquals(new Instant(19), tester.getNextTimer(TimeDomain.EVENT_TIME));
+
+    // Trigger the end-of-window timer.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    tester.advanceInputWatermark(new Instant(20));
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    // Hold has been replaced with garbage collection hold. Waiting for garbage collection.
+    assertEquals(new Instant(29), tester.getWatermarkHold());
+    assertEquals(new Instant(29), tester.getNextTimer(TimeDomain.EVENT_TIME));
+
+    // Case: Maybe late 1
+    injectElement(tester, 13);
+    // No change to hold or timers.
+    assertEquals(new Instant(29), tester.getWatermarkHold());
+    assertEquals(new Instant(29), tester.getNextTimer(TimeDomain.EVENT_TIME));
+
+    // Trigger the garbage collection timer.
+    tester.advanceInputWatermark(new Instant(30));
+
+    // Everything should be cleaned up.
+    assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20))));
+    tester.assertHasOnlyGlobalAndFinishedSetsFor();
+  }
+
+  @Test
+  public void testPaneInfoAllStates() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    tester.advanceInputWatermark(new Instant(0));
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    injectElement(tester, 1);
+    assertThat(tester.extractOutput(), contains(
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY))));
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    injectElement(tester, 2);
+    assertThat(tester.extractOutput(), contains(
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1))));
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.advanceInputWatermark(new Instant(15));
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    injectElement(tester, 3);
+    assertThat(tester.extractOutput(), contains(
+        WindowMatchers.valueWithPaneInfo(
+            PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))));
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    injectElement(tester, 4);
+    assertThat(tester.extractOutput(), contains(
+        WindowMatchers.valueWithPaneInfo(
+            PaneInfo.createPane(false, false, Timing.LATE, 3, 1))));
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    injectElement(tester, 5);
+    assertThat(tester.extractOutput(), contains(
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 4, 2))));
+  }
+
+  @Test
+  public void testPaneInfoAllStatesAfterWatermark() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+            .withTrigger(Repeatedly.forever(AfterFirst.of(
+                AfterPane.elementCountAtLeast(2),
+                AfterWatermark.pastEndOfWindow())))
+            .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(
+        output,
+        contains(WindowMatchers.valueWithPaneInfo(
+            PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))));
+    assertThat(
+        output,
+        contains(
+            WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10)));
+
+    tester.advanceInputWatermark(new Instant(50));
+
+    // We should get the ON_TIME pane even though it is empty,
+    // because we have an AfterWatermark.pastEndOfWindow() trigger.
+    output = tester.extractOutput();
+    assertThat(
+        output,
+        contains(WindowMatchers.valueWithPaneInfo(
+            PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0))));
+    assertThat(
+        output,
+        contains(
+            WindowMatchers.isSingleWindowedValue(emptyIterable(), 9, 0, 10)));
+
+    // We should get the final pane even though it is empty.
+    tester.advanceInputWatermark(new Instant(150));
+    output = tester.extractOutput();
+    assertThat(
+        output,
+        contains(
+            WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 2, 1))));
+    assertThat(
+        output,
+        contains(
+            WindowMatchers.isSingleWindowedValue(emptyIterable(), 9, 0, 10)));
+  }
+
+  @Test
+  public void noEmptyPanesFinalIfNonEmpty() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+            .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
+                AfterPane.elementCountAtLeast(2),
+                AfterWatermark.pastEndOfWindow())))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(2, new Instant(2)));
+    tester.advanceInputWatermark(new Instant(20));
+    tester.advanceInputWatermark(new Instant(250));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output, contains(
+        // Trigger with 2 elements
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10),
+        // Trigger for the empty on time pane
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
+  }
+
+  @Test
+  public void noEmptyPanesFinalAlways() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+            .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
+                AfterPane.elementCountAtLeast(2),
+                AfterWatermark.pastEndOfWindow())))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(2, new Instant(2)));
+    tester.advanceInputWatermark(new Instant(20));
+    tester.advanceInputWatermark(new Instant(250));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output, contains(
+        // Trigger with 2 elements
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10),
+        // Trigger for the empty on time pane
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10),
+        // Trigger for the final pane
+        WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
+  }
+
+  @Test
+  public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+            .withTrigger(Repeatedly.forever(AfterFirst.of(
+                AfterPane.elementCountAtLeast(2),
+                AfterWatermark.pastEndOfWindow())))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
+            .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(
+        output,
+        contains(WindowMatchers.valueWithPaneInfo(
+            PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))));
+    assertThat(
+        output,
+        contains(
+            WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10)));
+
+    tester.advanceInputWatermark(new Instant(50));
+
+    // We should get the ON_TIME pane even though it is empty,
+    // because we have an AfterWatermark.pastEndOfWindow() trigger.
+    output = tester.extractOutput();
+    assertThat(
+        output,
+        contains(WindowMatchers.valueWithPaneInfo(
+            PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0))));
+    assertThat(
+        output,
+        contains(
+            WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
+
+    // We should get the final pane even though it is empty.
+    tester.advanceInputWatermark(new Instant(150));
+    output = tester.extractOutput();
+    assertThat(
+        output,
+        contains(
+            WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 2, 1))));
+    assertThat(
+        output,
+        contains(
+            WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
+  }
+
+  @Test
+  public void testPaneInfoFinalAndOnTime() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+            .withTrigger(
+                Repeatedly.forever(AfterPane.elementCountAtLeast(2))
+                    .orFinally(AfterWatermark.pastEndOfWindow()))
+            .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+            .withAllowedLateness(Duration.millis(100))
+            .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
+
+    tester.advanceInputWatermark(new Instant(0));
+
+    // Should trigger due to element count
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(WindowMatchers.valueWithPaneInfo(
+            PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))));
+
+    tester.advanceInputWatermark(new Instant(150));
+    assertThat(tester.extractOutput(), contains(
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.ON_TIME, 1, 0))));
+  }
+
+  @Test
+  public void testPaneInfoSkipToFinish() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    tester.advanceInputWatermark(new Instant(0));
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    injectElement(tester, 1);
+    assertThat(tester.extractOutput(), contains(
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.EARLY))));
+  }
+
+  @Test
+  public void testPaneInfoSkipToNonSpeculativeAndFinish() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    tester.advanceInputWatermark(new Instant(15));
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    injectElement(tester, 1);
+    assertThat(tester.extractOutput(), contains(
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.LATE))));
+  }
+
+  @Test
+  public void testMergeBeforeFinalizing() throws Exception {
+    // Verify that we merge windows before producing output so users don't see undesired
+    // unmerged windows.
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(0),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // All on time data, verify watermark hold.
+    // These two windows should pre-merge immediately to [1, 20)
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)), // in [1, 11)
+        TimestampedValue.of(10, new Instant(10))); // in [10, 20)
+
+    // And this should fire the end-of-window timer
+    tester.advanceInputWatermark(new Instant(100));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output.size(), equalTo(1));
+    assertThat(output.get(0),
+        isSingleWindowedValue(containsInAnyOrder(1, 10),
+            1, // timestamp
+            1, // window start
+            20)); // window end
+    assertThat(
+        output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
+  }
+
+  /**
+   * It is possible for a session window's trigger to be closed at the point at which
+   * the (merged) session window is garbage collected. Make sure we don't accidentally
+   * assume the window is still active.
+   */
+  @Test
+  public void testMergingWithCloseBeforeGC() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // Two elements in two overlapping session windows.
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)), // in [1, 11)
+        TimestampedValue.of(10, new Instant(10))); // in [10, 20)
+
+    // Close the trigger, but the gargbage collection timer is still pending.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    tester.advanceInputWatermark(new Instant(30));
+
+    // Now the garbage collection timer will fire, finding the trigger already closed.
+    tester.advanceInputWatermark(new Instant(100));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output.size(), equalTo(1));
+    assertThat(output.get(0),
+        isSingleWindowedValue(containsInAnyOrder(1, 10),
+            1, // timestamp
+            1, // window start
+            20)); // window end
+    assertThat(
+        output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
+  }
+
+  /**
+   * Ensure a closed trigger has its state recorded in the merge result window.
+   */
+  @Test
+  public void testMergingWithCloseTrigger() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+                                    AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+                                    ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // Create a new merged session window.
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)),
+                          TimestampedValue.of(2, new Instant(2)));
+
+    // Force the trigger to be closed for the merged window.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    tester.advanceInputWatermark(new Instant(13));
+
+    // Trigger is now closed.
+    assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
+
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+
+    // Revisit the same session window.
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)),
+                          TimestampedValue.of(2, new Instant(2)));
+
+    // Trigger is still closed.
+    assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12))));
+  }
+
+  /**
+   * If a later event tries to reuse an earlier session window which has been closed, we
+   * should reject that element and not fail due to the window no longer being active.
+   */
+  @Test
+  public void testMergingWithReusedWindow() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+                                    AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+                                    ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // One elements in one session window.
+    tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.
+
+    // Close the trigger, but the gargbage collection timer is still pending.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    tester.advanceInputWatermark(new Instant(15));
+
+    // Another element in the same session window.
+    // Should be discarded with 'window closed'.
+    tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.
+
+    // And nothing should be left in the active window state.
+    assertTrue(tester.hasNoActiveWindows());
+
+    // Now the garbage collection timer will fire, finding the trigger already closed.
+    tester.advanceInputWatermark(new Instant(100));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output.size(), equalTo(1));
+    assertThat(output.get(0),
+               isSingleWindowedValue(containsInAnyOrder(1),
+                                     1, // timestamp
+                                     1, // window start
+                                     11)); // window end
+    assertThat(
+        output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
+  }
+
+  /**
+   * When a merged window's trigger is closed we record that state using the merged window rather
+   * than the original windows.
+   */
+  @Test
+  public void testMergingWithClosedRepresentative() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+                                    AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+                                    ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // 2 elements into merged session window.
+    // Close the trigger, but the garbage collection timer is still pending.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)),       // in [1, 11), gc at 21.
+                          TimestampedValue.of(8, new Instant(8)));      // in [8, 18), gc at 28.
+
+    // More elements into the same merged session window.
+    // It has not yet been gced.
+    // Should be discarded with 'window closed'.
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)),      // in [1, 11), gc at 21.
+                          TimestampedValue.of(2, new Instant(2)),      // in [2, 12), gc at 22.
+                          TimestampedValue.of(8, new Instant(8)));     // in [8, 18), gc at 28.
+
+    // Now the garbage collection timer will fire, finding the trigger already closed.
+    tester.advanceInputWatermark(new Instant(100));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+
+    assertThat(output.size(), equalTo(1));
+    assertThat(output.get(0),
+               isSingleWindowedValue(containsInAnyOrder(1, 8),
+                                     1, // timestamp
+                                     1, // window start
+                                     18)); // window end
+    assertThat(
+        output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
+  }
+
+  /**
+   * If an element for a closed session window ends up being merged into other still-open
+   * session windows, the resulting session window is not 'poisoned'.
+   */
+  @Test
+  public void testMergingWithClosedDoesNotPoison() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // 1 element, force its trigger to close.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    tester.injectElements(TimestampedValue.of(2, new Instant(2)));
+
+    // 3 elements, one already closed.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false);
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(2, new Instant(2)),
+        TimestampedValue.of(3, new Instant(3)));
+
+    tester.advanceInputWatermark(new Instant(100));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output.size(), equalTo(2));
+    assertThat(output.get(0),
+        isSingleWindowedValue(containsInAnyOrder(2),
+            2, // timestamp
+            2, // window start
+            12)); // window end
+    assertThat(
+        output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
+    assertThat(output.get(1),
+        isSingleWindowedValue(containsInAnyOrder(1, 2, 3),
+            1, // timestamp
+            1, // window start
+            13)); // window end
+    assertThat(
+        output.get(1).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
+  }
+
+  /**
+   * Tests that when data is assigned to multiple windows but some of those windows have
+   * had their triggers finish, then the data is dropped and counted accurately.
+   */
+  @Test
+  public void testDropDataMultipleWindowsFinishedTrigger() throws Exception {
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
+        WindowingStrategy.of(
+            SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
+        .withTrigger(AfterWatermark.pastEndOfWindow())
+        .withAllowedLateness(Duration.millis(1000)),
+        new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());
+
+    tester.injectElements(
+        // assigned to [-60, 40), [-30, 70), [0, 100)
+        TimestampedValue.of(10, new Instant(23)),
+        // assigned to [-30, 70), [0, 100), [30, 130)
+        TimestampedValue.of(12, new Instant(40)));
+
+    assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
+
+    tester.advanceInputWatermark(new Instant(70));
+    tester.injectElements(
+        // assigned to [-30, 70), [0, 100), [30, 130)
+        // but [-30, 70) is closed by the trigger
+        TimestampedValue.of(14, new Instant(60)));
+
+    assertEquals(1, tester.getElementsDroppedDueToClosedWindow());
+
+    tester.advanceInputWatermark(new Instant(130));
+    // assigned to [-30, 70), [0, 100), [30, 130)
+    // but they are all closed
+    tester.injectElements(TimestampedValue.of(16, new Instant(40)));
+
+    assertEquals(4, tester.getElementsDroppedDueToClosedWindow());
+  }
+
+  @Test
+  public void testIdempotentEmptyPanesDiscarding() throws Exception {
+    // Test uninteresting (empty) panes don't increment the index or otherwise
+    // modify PaneInfo.
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // Inject a couple of on-time elements and fire at the window end.
+    injectElement(tester, 1);
+    injectElement(tester, 2);
+    tester.advanceInputWatermark(new Instant(12));
+
+    // Fire the on-time pane
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
+
+    // Fire another timer (with no data, so it's an uninteresting pane that should not be output).
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
+
+    // Finish it off with another datum.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    injectElement(tester, 3);
+
+    // The intermediate trigger firing shouldn't result in any output.
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output.size(), equalTo(2));
+
+    // The on-time pane is as expected.
+    assertThat(output.get(0), isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10));
+
+    // The late pane has the correct indices.
+    assertThat(output.get(1).getValue(), contains(3));
+    assertThat(
+        output.get(1).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
+
+    assertTrue(tester.isMarkedFinished(firstWindow));
+    tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
+
+    assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
+  }
+
+  @Test
+  public void testIdempotentEmptyPanesAccumulating() throws Exception {
+    // Test uninteresting (empty) panes don't increment the index or otherwise
+    // modify PaneInfo.
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger,
+            AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // Inject a couple of on-time elements and fire at the window end.
+    injectElement(tester, 1);
+    injectElement(tester, 2);
+    tester.advanceInputWatermark(new Instant(12));
+
+    // Trigger the on-time pane
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output.size(), equalTo(1));
+    assertThat(output.get(0), isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10));
+    assertThat(output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
+
+    // Fire another timer with no data; the empty pane should not be output even though the
+    // trigger is ready to fire
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME);
+    assertThat(tester.extractOutput().size(), equalTo(0));
+
+    // Finish it off with another datum, which is late
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    injectElement(tester, 3);
+    output = tester.extractOutput();
+    assertThat(output.size(), equalTo(1));
+
+    // The late pane has the correct indices.
+    assertThat(output.get(0).getValue(), containsInAnyOrder(1, 2, 3));
+    assertThat(output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
+
+    assertTrue(tester.isMarkedFinished(firstWindow));
+    tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
+
+    assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
+  }
+
+  /**
+   * Test that we receive an empty on-time pane when an or-finally waiting for the watermark fires.
+   * Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late
+   * when the on-time pane is empty.
+   */
+  @Test
+  public void testEmptyOnTimeFromOrFinally() throws Exception {
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)),
+            AfterEach.<IntervalWindow>inOrder(
+                Repeatedly
+                    .forever(
+                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+                            new Duration(5)))
+                    .orFinally(AfterWatermark.pastEndOfWindow()),
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+                        new Duration(25)))),
+            AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
+            VarIntCoder.of(), Duration.millis(100));
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.advanceProcessingTime(new Instant(0));
+
+    // Processing time timer for 5
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(1, new Instant(3)),
+        TimestampedValue.of(1, new Instant(7)),
+        TimestampedValue.of(1, new Instant(5)));
+
+    // Should fire early pane
+    tester.advanceProcessingTime(new Instant(6));
+
+    // Should fire empty on time pane
+    tester.advanceInputWatermark(new Instant(11));
+    List<WindowedValue<Integer>> output = tester.extractOutput();
+    assertEquals(2, output.size());
+
+    assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
+    assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(4, 9, 0, 10));
+
+    assertThat(
+        output.get(0),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
+    assertThat(
+        output.get(1),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0)));
+  }
+
+  /**
+   * Tests for processing time firings after the watermark passes the end of the window.
+   * Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late
+   * when the on-time pane is non-empty.
+   */
+  @Test
+  public void testProcessingTime() throws Exception {
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)),
+            AfterEach.<IntervalWindow>inOrder(
+                Repeatedly
+                    .forever(
+                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+                            new Duration(5)))
+                    .orFinally(AfterWatermark.pastEndOfWindow()),
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+                        new Duration(25)))),
+            AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().<String>asKeyedFn(),
+            VarIntCoder.of(), Duration.millis(100));
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.advanceProcessingTime(new Instant(0));
+
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)),
+        TimestampedValue.of(1, new Instant(3)), TimestampedValue.of(1, new Instant(7)),
+        TimestampedValue.of(1, new Instant(5)));
+    // 4 elements all at processing time 0
+
+    tester.advanceProcessingTime(new Instant(6)); // fire [1,3,7,5] since 6 > 0 + 5
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(8)),
+        TimestampedValue.of(1, new Instant(4)));
+    // 6 elements
+
+    tester.advanceInputWatermark(new Instant(11)); // fire [1,3,7,5,8,4] since 11 > 9
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(8)),
+        TimestampedValue.of(1, new Instant(4)),
+        TimestampedValue.of(1, new Instant(5)));
+    // 9 elements
+
+    tester.advanceInputWatermark(new Instant(12));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(3)));
+    // 10 elements
+
+    tester.advanceProcessingTime(new Instant(15));
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(5)));
+    // 11 elements
+    tester.advanceProcessingTime(new Instant(32)); // fire since 32 > 6 + 25
+
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(3)));
+    // 12 elements
+    // fire [1,3,7,5,8,4,8,4,5,3,5,3] since 125 > 6 + 25
+    tester.advanceInputWatermark(new Instant(125));
+
+    List<WindowedValue<Integer>> output = tester.extractOutput();
+    assertEquals(4, output.size());
+
+    assertThat(output.get(0), WindowMatchers.isSingleWindowedValue(4, 1, 0, 10));
+    assertThat(output.get(1), WindowMatchers.isSingleWindowedValue(6, 4, 0, 10));
+    assertThat(output.get(2), WindowMatchers.isSingleWindowedValue(11, 9, 0, 10));
+    assertThat(output.get(3), WindowMatchers.isSingleWindowedValue(12, 9, 0, 10));
+
+    assertThat(
+        output.get(0),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
+    assertThat(
+        output.get(1),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0)));
+    assertThat(
+        output.get(2),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 2, 1)));
+    assertThat(
+        output.get(3),
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 3, 2)));
+  }
+
+  /**
+   * We should fire a non-empty ON_TIME pane in the GlobalWindow when the watermark moves to
+   * end-of-time.
+   */
+  @Test
+  public void fireNonEmptyOnDrainInGlobalWindow() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, GlobalWindow> tester =
+        ReduceFnTester.nonCombining(
+            WindowingStrategy.of(new GlobalWindows())
+                             .withTrigger(Repeatedly.<GlobalWindow>forever(
+                                 AfterPane.elementCountAtLeast(3)))
+                             .withMode(AccumulationMode.DISCARDING_FIRED_PANES));
+
+    tester.advanceInputWatermark(new Instant(0));
+
+    final int n = 20;
+    for (int i = 0; i < n; i++) {
+      tester.injectElements(TimestampedValue.of(i, new Instant(i)));
+    }
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertEquals(n / 3, output.size());
+    for (int i = 0; i < output.size(); i++) {
+      assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
+      assertEquals(i, output.get(i).getPane().getIndex());
+      assertEquals(3, Iterables.size(output.get(i).getValue()));
+    }
+
+    tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    output = tester.extractOutput();
+    assertEquals(1, output.size());
+    assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
+    assertEquals(n / 3, output.get(0).getPane().getIndex());
+    assertEquals(n - ((n / 3) * 3), Iterables.size(output.get(0).getValue()));
+  }
+
+  /**
+   * We should fire an empty ON_TIME pane in the GlobalWindow when the watermark moves to
+   * end-of-time.
+   */
+  @Test
+  public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, GlobalWindow> tester =
+        ReduceFnTester.nonCombining(
+            WindowingStrategy.of(new GlobalWindows())
+                             .withTrigger(Repeatedly.<GlobalWindow>forever(
+                                 AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
+                                     new Duration(3))))
+                             .withMode(AccumulationMode.DISCARDING_FIRED_PANES));
+
+    final int n = 20;
+    for (int i = 0; i < n; i++) {
+      tester.advanceProcessingTime(new Instant(i));
+      tester.injectElements(TimestampedValue.of(i, new Instant(i)));
+    }
+    tester.advanceProcessingTime(new Instant(n + 4));
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertEquals((n + 3) / 4, output.size());
+    for (int i = 0; i < output.size(); i++) {
+      assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
+      assertEquals(i, output.get(i).getPane().getIndex());
+      assertEquals(4, Iterables.size(output.get(i).getValue()));
+    }
+
+    tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    output = tester.extractOutput();
+    assertEquals(1, output.size());
+    assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
+    assertEquals((n + 3) / 4, output.get(0).getPane().getIndex());
+    assertEquals(0, Iterables.size(output.get(0).getValue()));
+  }
+
+  /**
+   * Late elements should still have a garbage collection hold set so that they
+   * can make a late pane rather than be dropped due to lateness.
+   */
+  @Test
+  public void setGarbageCollectionHoldOnLateElements() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(
+            FixedWindows.of(Duration.millis(10)),
+            AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(2)),
+            AccumulationMode.DISCARDING_FIRED_PANES,
+            Duration.millis(100),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    tester.advanceInputWatermark(new Instant(0));
+    tester.advanceOutputWatermark(new Instant(0));
+    tester.injectElements(TimestampedValue.of(1,  new Instant(1)));
+
+    // Fire ON_TIME pane @ 9 with 1
+
+    tester.advanceInputWatermark(new Instant(109));
+    tester.advanceOutputWatermark(new Instant(109));
+    tester.injectElements(TimestampedValue.of(2,  new Instant(2)));
+    // We should have set a garbage collection hold for the final pane.
+    Instant hold = tester.getWatermarkHold();
+    assertEquals(new Instant(109), hold);
+
+    tester.advanceInputWatermark(new Instant(110));
+    tester.advanceOutputWatermark(new Instant(110));
+
+    // Fire final LATE pane @ 9 with 2
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertEquals(2, output.size());
+  }
+
+  private static class SumAndVerifyContextFn extends CombineFnWithContext<Integer, int[], Integer> {
+
+    private final PCollectionView<Integer> view;
+    private final int expectedValue;
+
+    private SumAndVerifyContextFn(PCollectionView<Integer> view, int expectedValue) {
+      this.view = view;
+      this.expectedValue = expectedValue;
+    }
+    @Override
+    public int[] createAccumulator(Context c) {
+      checkArgument(
+          c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
+      checkArgument(c.sideInput(view) == expectedValue);
+      return wrap(0);
+    }
+
+    @Override
+    public int[] addInput(int[] accumulator, Integer input, Context c) {
+      checkArgument(
+          c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
+      checkArgument(c.sideInput(view) == expectedValue);
+      accumulator[0] += input.intValue();
+      return accumulator;
+    }
+
+    @Override
+    public int[] mergeAccumulators(Iterable<int[]> accumulators, Context c) {
+      checkArgument(
+          c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
+      checkArgument(c.sideInput(view) == expectedValue);
+      Iterator<int[]> iter = accumulators.iterator();
+      if (!iter.hasNext()) {
+        return createAccumulator(c);
+      } else {
+        int[] running = iter.next();
+        while (iter.hasNext()) {
+          running[0] += iter.next()[0];
+        }
+        return running;
+      }
+    }
+
+    @Override
+    public Integer extractOutput(int[] accumulator, Context c) {
+      checkArgument(
+          c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue);
+      checkArgument(c.sideInput(view) == expectedValue);
+      return accumulator[0];
+    }
+
+    private int[] wrap(int value) {
+      return new int[] { value };
+    }
+  }
+
+  /**
+   * A {@link PipelineOptions} to test combining with context.
+   */
+  public interface TestOptions extends PipelineOptions {
+    Integer getValue();
+    void setValue(Integer value);
+  }
+}