You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:35 UTC

[40/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2 spaces

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
index 01f9c32..fb240f4 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
@@ -43,464 +43,464 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class GroupAlsoByWindowTest {
 
-	private final Combine.CombineFn combiner = new Sum.SumIntegerFn();
-
-	private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy =
-			WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5)))
-					.withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
-
-	private final WindowingStrategy sessionWindowingStrategy =
-			WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2)))
-					.withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
-					.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
-					.withAllowedLateness(Duration.standardSeconds(100));
-
-	private final WindowingStrategy fixedWindowingStrategy =
-			WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)));
-
-	private final WindowingStrategy fixedWindowWithCountTriggerStrategy =
-			fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5));
-
-	private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy =
-			fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow());
-
-	private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy =
-		fixedWindowingStrategy.withTrigger(
-			AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5))
-				.withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger());
-
-	/**
-	 * The default accumulation mode is
-	 * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}.
-	 * This strategy changes it to
-	 * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES}
-	 */
-	private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc =
-			fixedWindowWithCompoundTriggerStrategy
-					.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
-
-	@Test
-	public void testWithLateness() throws Exception {
-		WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2)))
-				.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
-				.withAllowedLateness(Duration.millis(1000));
-		long initialTime = 0L;
-		Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
-		KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
-
-		FlinkGroupAlsoByWindowWrapper gbwOperaror =
-				FlinkGroupAlsoByWindowWrapper.createForTesting(
-						pipeline.getOptions(),
-						pipeline.getCoderRegistry(),
-						strategy,
-						inputCoder,
-						combiner.<String>asKeyedFn());
-
-		OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(gbwOperaror);
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processWatermark(new Watermark(initialTime + 2000));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processWatermark(new Watermark(initialTime + 4000));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key1", 4),
-						new Instant(initialTime + 1),
-						new IntervalWindow(new Instant(0), new Instant(2000)),
-						PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
-				, initialTime + 1));
-		expectedOutput.add(new Watermark(initialTime + 2000));
-
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key1", 5),
-						new Instant(initialTime + 1999),
-						new IntervalWindow(new Instant(0), new Instant(2000)),
-						PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1))
-				, initialTime + 1999));
-
-
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key1", 6),
-						new Instant(initialTime + 1999),
-						new IntervalWindow(new Instant(0), new Instant(2000)),
-						PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2))
-				, initialTime + 1999));
-		expectedOutput.add(new Watermark(initialTime + 4000));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-		testHarness.close();
-	}
-
-	@Test
-	public void testSessionWindows() throws Exception {
-		WindowingStrategy strategy = sessionWindowingStrategy;
-
-		long initialTime = 0L;
-		Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
-		KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
-
-		FlinkGroupAlsoByWindowWrapper gbwOperaror =
-				FlinkGroupAlsoByWindowWrapper.createForTesting(
-						pipeline.getOptions(),
-						pipeline.getCoderRegistry(),
-						strategy,
-						inputCoder,
-						combiner.<String>asKeyedFn());
-
-		OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(gbwOperaror);
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processWatermark(new Watermark(initialTime + 6000));
-
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20));
-
-		testHarness.processWatermark(new Watermark(initialTime + 12000));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key1", 6),
-						new Instant(initialTime + 1),
-						new IntervalWindow(new Instant(1), new Instant(5700)),
-						PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
-				, initialTime + 1));
-		expectedOutput.add(new Watermark(initialTime + 6000));
-
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key1", 11),
-						new Instant(initialTime + 6700),
-						new IntervalWindow(new Instant(1), new Instant(10900)),
-						PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
-				, initialTime + 6700));
-		expectedOutput.add(new Watermark(initialTime + 12000));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-		testHarness.close();
-	}
-
-	@Test
-	public void testSlidingWindows() throws Exception {
-		WindowingStrategy strategy = slidingWindowWithAfterWatermarkTriggerStrategy;
-		long initialTime = 0L;
-		OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-				createTestingOperatorAndState(strategy, initialTime);
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-		testHarness.processWatermark(new Watermark(initialTime + 25000));
-
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key1", 6),
-						new Instant(initialTime + 5000),
-						new IntervalWindow(new Instant(0), new Instant(10000)),
-						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime + 5000));
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key1", 6),
-						new Instant(initialTime + 1),
-						new IntervalWindow(new Instant(-5000), new Instant(5000)),
-						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime + 1));
-		expectedOutput.add(new Watermark(initialTime + 10000));
-
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key1", 11),
-						new Instant(initialTime + 15000),
-						new IntervalWindow(new Instant(10000), new Instant(20000)),
-						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime + 15000));
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key1", 3),
-						new Instant(initialTime + 10000),
-						new IntervalWindow(new Instant(5000), new Instant(15000)),
-						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime + 10000));
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key2", 1),
-						new Instant(initialTime + 19500),
-						new IntervalWindow(new Instant(10000), new Instant(20000)),
-						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime + 19500));
-		expectedOutput.add(new Watermark(initialTime + 20000));
-
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key2", 1),
-						new Instant(initialTime + 20000),
-						/**
-						 * this is 20000 and not 19500 because of a convention in dataflow where
-						 * timestamps of windowed values in a window cannot be smaller than the
-						 * end of a previous window. Checkout the documentation of the
-						 * {@link WindowFn#getOutputTime(Instant, BoundedWindow)}
-						 */
-						new IntervalWindow(new Instant(15000), new Instant(25000)),
-						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime + 20000));
-		expectedOutput.add(new StreamRecord<>(
-				WindowedValue.of(KV.of("key1", 8),
-						new Instant(initialTime + 20000),
-						new IntervalWindow(new Instant(15000), new Instant(25000)),
-						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime + 20000));
-		expectedOutput.add(new Watermark(initialTime + 25000));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-		testHarness.close();
-	}
-
-	@Test
-	public void testAfterWatermarkProgram() throws Exception {
-		WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy;
-		long initialTime = 0L;
-		OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-				createTestingOperatorAndState(strategy, initialTime);
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
-				new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1));
-		expectedOutput.add(new Watermark(initialTime + 10000));
-
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
-				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000));
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
-		expectedOutput.add(new Watermark(initialTime + 20000));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-		testHarness.close();
-	}
-
-	@Test
-	public void testAfterCountProgram() throws Exception {
-		WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy;
-
-		long initialTime = 0L;
-		OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-				createTestingOperatorAndState(strategy, initialTime);
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1));
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000));
-		expectedOutput.add(new Watermark(initialTime + 10000));
-
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500));
-		expectedOutput.add(new Watermark(initialTime + 20000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testCompoundProgram() throws Exception {
-		WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy;
-
-		long initialTime = 0L;
-		OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-				createTestingOperatorAndState(strategy, initialTime);
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		/**
-		 * PaneInfo are:
-		 * 		isFirst (pane in window),
-		 * 		isLast, Timing (of triggering),
-		 * 		index (of pane in the window),
-		 * 		onTimeIndex (if it the 1st,2nd, ... pane that was fired on time)
-		 * */
-
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
-
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
-				new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
-
-		expectedOutput.add(new Watermark(initialTime + 10000));
-
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
-
-		expectedOutput.add(new Watermark(initialTime + 20000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testCompoundAccumulatingPanesProgram() throws Exception {
-		WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc;
-		long initialTime = 0L;
-		OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-				createTestingOperatorAndState(strategy, initialTime);
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
-
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
-				new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
-
-		expectedOutput.add(new Watermark(initialTime + 10000));
-
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
-		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
-
-		expectedOutput.add(new Watermark(initialTime + 20000));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-		testHarness.close();
-	}
-
-	private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception {
-		Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
-		KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
-
-		FlinkGroupAlsoByWindowWrapper gbwOperaror =
-				FlinkGroupAlsoByWindowWrapper.createForTesting(
-						pipeline.getOptions(),
-						pipeline.getCoderRegistry(),
-						strategy,
-						inputCoder,
-						combiner.<String>asKeyedFn());
-
-		OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(gbwOperaror);
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-
-		testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-
-		testHarness.processWatermark(new Watermark(initialTime + 10000));
-		testHarness.processWatermark(new Watermark(initialTime + 20000));
-
-		return testHarness;
-	}
-
-	private static class ResultSortComparator implements Comparator<Object> {
-		@Override
-		public int compare(Object o1, Object o2) {
-			if (o1 instanceof Watermark && o2 instanceof Watermark) {
-				Watermark w1 = (Watermark) o1;
-				Watermark w2 = (Watermark) o2;
-				return (int) (w1.getTimestamp() - w2.getTimestamp());
-			} else {
-				StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
-				StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
-
-				int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis());
-				if (comparison != 0) {
-					return comparison;
-				}
-
-				comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
-				if(comparison == 0) {
-					comparison = Integer.compare(
-							sr0.getValue().getValue().getValue(),
-							sr1.getValue().getValue().getValue());
-				}
-				if(comparison == 0) {
-					Collection windowsA = sr0.getValue().getWindows();
-					Collection windowsB = sr1.getValue().getWindows();
-
-					if(windowsA.size() != 1 || windowsB.size() != 1) {
-						throw new IllegalStateException("A value cannot belong to more than one windows after grouping.");
-					}
-
-					BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next();
-					BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next();
-					comparison = Long.compare(windowA.maxTimestamp().getMillis(), windowB.maxTimestamp().getMillis());
-				}
-				return comparison;
-			}
-		}
-	}
-
-	private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy,
-												   T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-		final Instant inputTimestamp = timestamp;
-		final WindowFn windowFn = strategy.getWindowFn();
-
-		if (timestamp == null) {
-			timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-		}
-
-		if (windows == null) {
-			try {
-				windows = windowFn.assignWindows(windowFn.new AssignContext() {
-					@Override
-					public Object element() {
-						throw new UnsupportedOperationException(
-								"WindowFn attempted to access input element when none was available");
-					}
-
-					@Override
-					public Instant timestamp() {
-						if (inputTimestamp == null) {
-							throw new UnsupportedOperationException(
-									"WindowFn attempted to access input timestamp when none was available");
-						}
-						return inputTimestamp;
-					}
-
-					@Override
-					public Collection<? extends BoundedWindow> windows() {
-						throw new UnsupportedOperationException(
-								"WindowFn attempted to access input windows when none were available");
-					}
-				});
-			} catch (Exception e) {
-				throw UserCodeException.wrap(e);
-			}
-		}
-
-		return WindowedValue.of(output, timestamp, windows, pane);
-	}
+  private final Combine.CombineFn combiner = new Sum.SumIntegerFn();
+
+  private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy =
+      WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5)))
+          .withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+  private final WindowingStrategy sessionWindowingStrategy =
+      WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2)))
+          .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+          .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+          .withAllowedLateness(Duration.standardSeconds(100));
+
+  private final WindowingStrategy fixedWindowingStrategy =
+      WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)));
+
+  private final WindowingStrategy fixedWindowWithCountTriggerStrategy =
+      fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5));
+
+  private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy =
+      fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow());
+
+  private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy =
+    fixedWindowingStrategy.withTrigger(
+      AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5))
+        .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger());
+
+  /**
+   * The default accumulation mode is
+   * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}.
+   * This strategy changes it to
+   * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES}
+   */
+  private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc =
+      fixedWindowWithCompoundTriggerStrategy
+          .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+  @Test
+  public void testWithLateness() throws Exception {
+    WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2)))
+        .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+        .withAllowedLateness(Duration.millis(1000));
+    long initialTime = 0L;
+    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
+
+    FlinkGroupAlsoByWindowWrapper gbwOperaror =
+        FlinkGroupAlsoByWindowWrapper.createForTesting(
+            pipeline.getOptions(),
+            pipeline.getCoderRegistry(),
+            strategy,
+            inputCoder,
+            combiner.<String>asKeyedFn());
+
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processWatermark(new Watermark(initialTime + 2000));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processWatermark(new Watermark(initialTime + 4000));
+
+    ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 4),
+            new Instant(initialTime + 1),
+            new IntervalWindow(new Instant(0), new Instant(2000)),
+            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+        , initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 2000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 5),
+            new Instant(initialTime + 1999),
+            new IntervalWindow(new Instant(0), new Instant(2000)),
+            PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1))
+        , initialTime + 1999));
+
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 1999),
+            new IntervalWindow(new Instant(0), new Instant(2000)),
+            PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2))
+        , initialTime + 1999));
+    expectedOutput.add(new Watermark(initialTime + 4000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testSessionWindows() throws Exception {
+    WindowingStrategy strategy = sessionWindowingStrategy;
+
+    long initialTime = 0L;
+    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
+
+    FlinkGroupAlsoByWindowWrapper gbwOperaror =
+        FlinkGroupAlsoByWindowWrapper.createForTesting(
+            pipeline.getOptions(),
+            pipeline.getCoderRegistry(),
+            strategy,
+            inputCoder,
+            combiner.<String>asKeyedFn());
+
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processWatermark(new Watermark(initialTime + 6000));
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+    testHarness.processWatermark(new Watermark(initialTime + 12000));
+
+    ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 1),
+            new IntervalWindow(new Instant(1), new Instant(5700)),
+            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+        , initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 6000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 11),
+            new Instant(initialTime + 6700),
+            new IntervalWindow(new Instant(1), new Instant(10900)),
+            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+        , initialTime + 6700));
+    expectedOutput.add(new Watermark(initialTime + 12000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testSlidingWindows() throws Exception {
+    WindowingStrategy strategy = slidingWindowWithAfterWatermarkTriggerStrategy;
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+    testHarness.processWatermark(new Watermark(initialTime + 25000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 5000),
+            new IntervalWindow(new Instant(0), new Instant(10000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 5000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 1),
+            new IntervalWindow(new Instant(-5000), new Instant(5000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 11),
+            new Instant(initialTime + 15000),
+            new IntervalWindow(new Instant(10000), new Instant(20000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 15000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 3),
+            new Instant(initialTime + 10000),
+            new IntervalWindow(new Instant(5000), new Instant(15000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key2", 1),
+            new Instant(initialTime + 19500),
+            new IntervalWindow(new Instant(10000), new Instant(20000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 19500));
+    expectedOutput.add(new Watermark(initialTime + 20000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key2", 1),
+            new Instant(initialTime + 20000),
+            /**
+             * this is 20000 and not 19500 because of a convention in dataflow where
+             * timestamps of windowed values in a window cannot be smaller than the
+             * end of a previous window. Checkout the documentation of the
+             * {@link WindowFn#getOutputTime(Instant, BoundedWindow)}
+             */
+            new IntervalWindow(new Instant(15000), new Instant(25000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 20000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 8),
+            new Instant(initialTime + 20000),
+            new IntervalWindow(new Instant(15000), new Instant(25000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 20000));
+    expectedOutput.add(new Watermark(initialTime + 25000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testAfterWatermarkProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy;
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+    expectedOutput.add(new Watermark(initialTime + 20000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testAfterCountProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy;
+
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000));
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500));
+    expectedOutput.add(new Watermark(initialTime + 20000));
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+    testHarness.close();
+  }
+
+  @Test
+  public void testCompoundProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy;
+
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+    /**
+     * PaneInfo are:
+     *     isFirst (pane in window),
+     *     isLast, Timing (of triggering),
+     *     index (of pane in the window),
+     *     onTimeIndex (if it the 1st,2nd, ... pane that was fired on time)
+     * */
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
+        new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
+
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+
+    expectedOutput.add(new Watermark(initialTime + 20000));
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+    testHarness.close();
+  }
+
+  @Test
+  public void testCompoundAccumulatingPanesProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc;
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
+        new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
+
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+
+    expectedOutput.add(new Watermark(initialTime + 20000));
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+    testHarness.close();
+  }
+
+  private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception {
+    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
+
+    FlinkGroupAlsoByWindowWrapper gbwOperaror =
+        FlinkGroupAlsoByWindowWrapper.createForTesting(
+            pipeline.getOptions(),
+            pipeline.getCoderRegistry(),
+            strategy,
+            inputCoder,
+            combiner.<String>asKeyedFn());
+
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+    testHarness.processWatermark(new Watermark(initialTime + 10000));
+    testHarness.processWatermark(new Watermark(initialTime + 20000));
+
+    return testHarness;
+  }
+
+  private static class ResultSortComparator implements Comparator<Object> {
+    @Override
+    public int compare(Object o1, Object o2) {
+      if (o1 instanceof Watermark && o2 instanceof Watermark) {
+        Watermark w1 = (Watermark) o1;
+        Watermark w2 = (Watermark) o2;
+        return (int) (w1.getTimestamp() - w2.getTimestamp());
+      } else {
+        StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
+        StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
+
+        int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis());
+        if (comparison != 0) {
+          return comparison;
+        }
+
+        comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
+        if(comparison == 0) {
+          comparison = Integer.compare(
+              sr0.getValue().getValue().getValue(),
+              sr1.getValue().getValue().getValue());
+        }
+        if(comparison == 0) {
+          Collection windowsA = sr0.getValue().getWindows();
+          Collection windowsB = sr1.getValue().getWindows();
+
+          if(windowsA.size() != 1 || windowsB.size() != 1) {
+            throw new IllegalStateException("A value cannot belong to more than one windows after grouping.");
+          }
+
+          BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next();
+          BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next();
+          comparison = Long.compare(windowA.maxTimestamp().getMillis(), windowB.maxTimestamp().getMillis());
+        }
+        return comparison;
+      }
+    }
+  }
+
+  private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy,
+                           T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+    final Instant inputTimestamp = timestamp;
+    final WindowFn windowFn = strategy.getWindowFn();
+
+    if (timestamp == null) {
+      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    if (windows == null) {
+      try {
+        windows = windowFn.assignWindows(windowFn.new AssignContext() {
+          @Override
+          public Object element() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input element when none was available");
+          }
+
+          @Override
+          public Instant timestamp() {
+            if (inputTimestamp == null) {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input timestamp when none was available");
+            }
+            return inputTimestamp;
+          }
+
+          @Override
+          public Collection<? extends BoundedWindow> windows() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input windows when none were available");
+          }
+        });
+      } catch (Exception e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+
+    return WindowedValue.of(output, timestamp, windows, pane);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
index 5a412aa..52e9e25 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
@@ -39,83 +39,83 @@ import java.util.Arrays;
 public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable {
 
 
-	protected String resultPath;
+  protected String resultPath;
 
-	static final String[] EXPECTED_RESULT = new String[] {
-			"k: null v: user1 user1 user1 user2 user2 user2 user2 user3"
-	};
+  static final String[] EXPECTED_RESULT = new String[] {
+      "k: null v: user1 user1 user1 user2 user2 user2 user2 user3"
+  };
 
-	public GroupByNullKeyTest(){
-	}
+  public GroupByNullKeyTest(){
+  }
 
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-	}
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+  }
 
-	public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> {
-		private static final long serialVersionUID = 0;
+  public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> {
+    private static final long serialVersionUID = 0;
 
-		@Override
-		public void processElement(ProcessContext c) {
-			KV<Integer, String> record = c.element();
-			long now = System.currentTimeMillis();
-			int timestamp = record.getKey();
-			String userName = record.getValue();
-			if (userName != null) {
-				// Sets the implicit timestamp field to be used in windowing.
-				c.outputWithTimestamp(userName, new Instant(timestamp + now));
-			}
-		}
-	}
+    @Override
+    public void processElement(ProcessContext c) {
+      KV<Integer, String> record = c.element();
+      long now = System.currentTimeMillis();
+      int timestamp = record.getKey();
+      String userName = record.getValue();
+      if (userName != null) {
+        // Sets the implicit timestamp field to be used in windowing.
+        c.outputWithTimestamp(userName, new Instant(timestamp + now));
+      }
+    }
+  }
 
-	@Override
-	protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
-		Pipeline p = FlinkTestPipeline.createForStreaming();
+    Pipeline p = FlinkTestPipeline.createForStreaming();
 
-		PCollection<String> output =
-			p.apply(Create.of(Arrays.asList(
-					KV.<Integer, String>of(0, "user1"),
-					KV.<Integer, String>of(1, "user1"),
-					KV.<Integer, String>of(2, "user1"),
-					KV.<Integer, String>of(10, "user2"),
-					KV.<Integer, String>of(1, "user2"),
-					KV.<Integer, String>of(15000, "user2"),
-					KV.<Integer, String>of(12000, "user2"),
-					KV.<Integer, String>of(25000, "user3"))))
-					.apply(ParDo.of(new ExtractUserAndTimestamp()))
-					.apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
-							.triggering(AfterWatermark.pastEndOfWindow())
-							.withAllowedLateness(Duration.ZERO)
-							.discardingFiredPanes())
+    PCollection<String> output =
+      p.apply(Create.of(Arrays.asList(
+          KV.<Integer, String>of(0, "user1"),
+          KV.<Integer, String>of(1, "user1"),
+          KV.<Integer, String>of(2, "user1"),
+          KV.<Integer, String>of(10, "user2"),
+          KV.<Integer, String>of(1, "user2"),
+          KV.<Integer, String>of(15000, "user2"),
+          KV.<Integer, String>of(12000, "user2"),
+          KV.<Integer, String>of(25000, "user3"))))
+          .apply(ParDo.of(new ExtractUserAndTimestamp()))
+          .apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
+              .triggering(AfterWatermark.pastEndOfWindow())
+              .withAllowedLateness(Duration.ZERO)
+              .discardingFiredPanes())
 
-					.apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
-						@Override
-						public void processElement(ProcessContext c) throws Exception {
-							String elem = c.element();
-							c.output(KV.<Void, String>of((Void) null, elem));
-						}
-					}))
-					.apply(GroupByKey.<Void, String>create())
-					.apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
-						@Override
-						public void processElement(ProcessContext c) throws Exception {
-							KV<Void, Iterable<String>> elem = c.element();
-							StringBuilder str = new StringBuilder();
-							str.append("k: " + elem.getKey() + " v:");
-							for (String v : elem.getValue()) {
-								str.append(" " + v);
-							}
-							c.output(str.toString());
-						}
-					}));
-		output.apply(TextIO.Write.to(resultPath));
-		p.run();
-	}
+          .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
+            @Override
+            public void processElement(ProcessContext c) throws Exception {
+              String elem = c.element();
+              c.output(KV.<Void, String>of((Void) null, elem));
+            }
+          }))
+          .apply(GroupByKey.<Void, String>create())
+          .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
+            @Override
+            public void processElement(ProcessContext c) throws Exception {
+              KV<Void, Iterable<String>> elem = c.element();
+              StringBuilder str = new StringBuilder();
+              str.append("k: " + elem.getKey() + " v:");
+              for (String v : elem.getValue()) {
+                str.append(" " + v);
+              }
+              c.output(str.toString());
+            }
+          }));
+    output.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
index 7489fcc..d5b1043 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
@@ -43,261 +43,261 @@ import static org.junit.Assert.assertEquals;
 
 public class StateSerializationTest {
 
-	private static final StateNamespace NAMESPACE_1 = StateNamespaces.global();
-	private static final String KEY_PREFIX = "TEST_";
-
-	// TODO: This can be replaced with the standard Sum.SumIntererFn once the state no longer needs
-	// to create a StateTag at the point of restoring state. Currently StateTags are compared strictly
-	// by type and combiners always use KeyedCombineFnWithContext rather than KeyedCombineFn or CombineFn.
-	private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer> SUM_COMBINER =
-		new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer>() {
-			@Override
-			public int[] createAccumulator(Object key, CombineWithContext.Context c) {
-				return new int[1];
-			}
-
-			@Override
-			public int[] addInput(Object key, int[] accumulator, Integer value, CombineWithContext.Context c) {
-				accumulator[0] += value;
-				return accumulator;
-			}
-
-			@Override
-			public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, CombineWithContext.Context c) {
-				int[] r = new int[1];
-				for (int[] a : accumulators) {
-					r[0] += a[0];
-				}
-				return r;
-			}
-
-			@Override
-			public Integer extractOutput(Object key, int[] accumulator, CombineWithContext.Context c) {
-				return accumulator[0];
-			}
-		};
-
-	private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of(
-		VarIntCoder.of(),
-		new DelegateCoder.CodingFunction<int[], Integer>() {
-			@Override
-			public Integer apply(int[] accumulator) {
-				return accumulator[0];
-			}
-		},
-		new DelegateCoder.CodingFunction<Integer, int[]>() {
-			@Override
-			public int[] apply(Integer value) {
-				int[] a = new int[1];
-				a[0] = value;
-				return a;
-			}
-		});
-
-	private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
-		StateTags.value("stringValue", StringUtf8Coder.of());
-	private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR =
-		StateTags.value("stringValue", VarIntCoder.of());
-	private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR =
-		StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, SUM_COMBINER);
-	private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
-		StateTags.bag("stringBag", StringUtf8Coder.of());
-	private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_BAG_ADDR =
-		StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
-
-	private Map<String, FlinkStateInternals<String>> statePerKey = new HashMap<>();
-
-	private Map<String, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
-
-	private void initializeStateAndTimers() throws CannotProvideCoderException {
-		for (int i = 0; i < 10; i++) {
-			String key = KEY_PREFIX + i;
-
-			FlinkStateInternals state = initializeStateForKey(key);
-			Set<TimerInternals.TimerData> timers = new HashSet<>();
-			for (int j = 0; j < 5; j++) {
-				TimerInternals.TimerData timer = TimerInternals
-					.TimerData.of(NAMESPACE_1,
-						new Instant(1000 + i + j), TimeDomain.values()[j % 3]);
-				timers.add(timer);
-			}
-
-			statePerKey.put(key, state);
-			activeTimers.put(key, timers);
-		}
-	}
-
-	private FlinkStateInternals<String> initializeStateForKey(String key) throws CannotProvideCoderException {
-		FlinkStateInternals<String> state = createState(key);
-
-		ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
-		value.write("test");
-
-		ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
-		value2.write(4);
-		value2.write(5);
-
-		AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-		combiningValue.add(1);
-		combiningValue.add(2);
-
-		WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
-		watermark.add(new Instant(1000));
-
-		BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
-		bag.add("v1");
-		bag.add("v2");
-		bag.add("v3");
-		bag.add("v4");
-		return state;
-	}
-
-	private boolean restoreAndTestState(DataInputView in) throws Exception {
-		StateCheckpointReader reader = new StateCheckpointReader(in);
-		final ClassLoader userClassloader = this.getClass().getClassLoader();
-		Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder();
-		Coder<String> keyCoder = StringUtf8Coder.of();
-
-		boolean comparisonRes = true;
-
-		for (String key : statePerKey.keySet()) {
-			comparisonRes &= checkStateForKey(key);
-		}
-
-		// restore the timers
-		Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
-		if (activeTimers.size() != restoredTimersPerKey.size()) {
-			return false;
-		}
-
-		for (String key : statePerKey.keySet()) {
-			Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key);
-			Set<TimerInternals.TimerData> restoredTimers = restoredTimersPerKey.get(key);
-			comparisonRes &= checkTimersForKey(originalTimers, restoredTimers);
-		}
-
-		// restore the state
-		Map<String, FlinkStateInternals<String>> restoredPerKeyState =
-			StateCheckpointUtils.decodeState(reader, OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, userClassloader);
-		if (restoredPerKeyState.size() != statePerKey.size()) {
-			return false;
-		}
-
-		for (String key : statePerKey.keySet()) {
-			FlinkStateInternals<String> originalState = statePerKey.get(key);
-			FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key);
-			comparisonRes &= checkStateForKey(originalState, restoredState);
-		}
-		return comparisonRes;
-	}
-
-	private boolean checkStateForKey(String key) throws CannotProvideCoderException {
-		FlinkStateInternals<String> state = statePerKey.get(key);
-
-		ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
-		boolean comp = value.read().equals("test");
-
-		ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
-		comp &= value2.read().equals(5);
-
-		AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-		comp &= combiningValue.read().equals(3);
-
-		WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
-		comp &= watermark.read().equals(new Instant(1000));
-
-		BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
-		Iterator<String> it = bag.read().iterator();
-		int i = 0;
-		while (it.hasNext()) {
-			comp &= it.next().equals("v" + (++i));
-		}
-		return comp;
-	}
-
-	private void storeState(AbstractStateBackend.CheckpointStateOutputView out) throws Exception {
-		StateCheckpointWriter checkpointBuilder = StateCheckpointWriter.create(out);
-		Coder<String> keyCoder = StringUtf8Coder.of();
-
-		// checkpoint the timers
-		StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, keyCoder);
-
-		// checkpoint the state
-		StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder);
-	}
-
-	private boolean checkTimersForKey(Set<TimerInternals.TimerData> originalTimers, Set<TimerInternals.TimerData> restoredTimers) {
-		boolean comp = true;
-		if (restoredTimers == null) {
-			return false;
-		}
-
-		if (originalTimers.size() != restoredTimers.size()) {
-			return false;
-		}
-
-		for (TimerInternals.TimerData timer : originalTimers) {
-			comp &= restoredTimers.contains(timer);
-		}
-		return comp;
-	}
-
-	private boolean checkStateForKey(FlinkStateInternals<String> originalState, FlinkStateInternals<String> restoredState) throws CannotProvideCoderException {
-		if (restoredState == null) {
-			return false;
-		}
-
-		ValueState<String> orValue = originalState.state(NAMESPACE_1, STRING_VALUE_ADDR);
-		ValueState<String> resValue = restoredState.state(NAMESPACE_1, STRING_VALUE_ADDR);
-		boolean comp = orValue.read().equals(resValue.read());
-
-		ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, INT_VALUE_ADDR);
-		ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, INT_VALUE_ADDR);
-		comp &= orIntValue.read().equals(resIntValue.read());
-
-		AccumulatorCombiningState<Integer, int[], Integer> combOrValue = originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-		AccumulatorCombiningState<Integer, int[], Integer> combResValue = restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-		comp &= combOrValue.read().equals(combResValue.read());
-
-		WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
-		WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
-		comp &= orWatermark.read().equals(resWatermark.read());
-
-		BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR);
-		BagState<String> resBag = restoredState.state(NAMESPACE_1, STRING_BAG_ADDR);
-
-		Iterator<String> orIt = orBag.read().iterator();
-		Iterator<String> resIt = resBag.read().iterator();
-
-		while (orIt.hasNext() && resIt.hasNext()) {
-			comp &= orIt.next().equals(resIt.next());
-		}
-
-		return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && resIt.hasNext())) && comp;
-	}
-
-	private FlinkStateInternals<String> createState(String key) throws CannotProvideCoderException {
-		return new FlinkStateInternals<>(
-			key,
-			StringUtf8Coder.of(),
-			IntervalWindow.getCoder(),
-			OutputTimeFns.outputAtEarliestInputTimestamp());
-	}
-
-	@Test
-	public void test() throws Exception {
-		StateSerializationTest test = new StateSerializationTest();
-		test.initializeStateAndTimers();
+  private static final StateNamespace NAMESPACE_1 = StateNamespaces.global();
+  private static final String KEY_PREFIX = "TEST_";
+
+  // TODO: This can be replaced with the standard Sum.SumIntererFn once the state no longer needs
+  // to create a StateTag at the point of restoring state. Currently StateTags are compared strictly
+  // by type and combiners always use KeyedCombineFnWithContext rather than KeyedCombineFn or CombineFn.
+  private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer> SUM_COMBINER =
+    new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer>() {
+      @Override
+      public int[] createAccumulator(Object key, CombineWithContext.Context c) {
+        return new int[1];
+      }
+
+      @Override
+      public int[] addInput(Object key, int[] accumulator, Integer value, CombineWithContext.Context c) {
+        accumulator[0] += value;
+        return accumulator;
+      }
+
+      @Override
+      public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, CombineWithContext.Context c) {
+        int[] r = new int[1];
+        for (int[] a : accumulators) {
+          r[0] += a[0];
+        }
+        return r;
+      }
+
+      @Override
+      public Integer extractOutput(Object key, int[] accumulator, CombineWithContext.Context c) {
+        return accumulator[0];
+      }
+    };
+
+  private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of(
+    VarIntCoder.of(),
+    new DelegateCoder.CodingFunction<int[], Integer>() {
+      @Override
+      public Integer apply(int[] accumulator) {
+        return accumulator[0];
+      }
+    },
+    new DelegateCoder.CodingFunction<Integer, int[]>() {
+      @Override
+      public int[] apply(Integer value) {
+        int[] a = new int[1];
+        a[0] = value;
+        return a;
+      }
+    });
+
+  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+    StateTags.value("stringValue", StringUtf8Coder.of());
+  private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR =
+    StateTags.value("stringValue", VarIntCoder.of());
+  private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR =
+    StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, SUM_COMBINER);
+  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+    StateTags.bag("stringBag", StringUtf8Coder.of());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_BAG_ADDR =
+    StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+
+  private Map<String, FlinkStateInternals<String>> statePerKey = new HashMap<>();
+
+  private Map<String, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+
+  private void initializeStateAndTimers() throws CannotProvideCoderException {
+    for (int i = 0; i < 10; i++) {
+      String key = KEY_PREFIX + i;
+
+      FlinkStateInternals state = initializeStateForKey(key);
+      Set<TimerInternals.TimerData> timers = new HashSet<>();
+      for (int j = 0; j < 5; j++) {
+        TimerInternals.TimerData timer = TimerInternals
+          .TimerData.of(NAMESPACE_1,
+            new Instant(1000 + i + j), TimeDomain.values()[j % 3]);
+        timers.add(timer);
+      }
+
+      statePerKey.put(key, state);
+      activeTimers.put(key, timers);
+    }
+  }
+
+  private FlinkStateInternals<String> initializeStateForKey(String key) throws CannotProvideCoderException {
+    FlinkStateInternals<String> state = createState(key);
+
+    ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    value.write("test");
+
+    ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
+    value2.write(4);
+    value2.write(5);
+
+    AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    combiningValue.add(1);
+    combiningValue.add(2);
+
+    WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+    watermark.add(new Instant(1000));
+
+    BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
+    bag.add("v1");
+    bag.add("v2");
+    bag.add("v3");
+    bag.add("v4");
+    return state;
+  }
+
+  private boolean restoreAndTestState(DataInputView in) throws Exception {
+    StateCheckpointReader reader = new StateCheckpointReader(in);
+    final ClassLoader userClassloader = this.getClass().getClassLoader();
+    Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder();
+    Coder<String> keyCoder = StringUtf8Coder.of();
+
+    boolean comparisonRes = true;
+
+    for (String key : statePerKey.keySet()) {
+      comparisonRes &= checkStateForKey(key);
+    }
+
+    // restore the timers
+    Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
+    if (activeTimers.size() != restoredTimersPerKey.size()) {
+      return false;
+    }
+
+    for (String key : statePerKey.keySet()) {
+      Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key);
+      Set<TimerInternals.TimerData> restoredTimers = restoredTimersPerKey.get(key);
+      comparisonRes &= checkTimersForKey(originalTimers, restoredTimers);
+    }
+
+    // restore the state
+    Map<String, FlinkStateInternals<String>> restoredPerKeyState =
+      StateCheckpointUtils.decodeState(reader, OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, userClassloader);
+    if (restoredPerKeyState.size() != statePerKey.size()) {
+      return false;
+    }
+
+    for (String key : statePerKey.keySet()) {
+      FlinkStateInternals<String> originalState = statePerKey.get(key);
+      FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key);
+      comparisonRes &= checkStateForKey(originalState, restoredState);
+    }
+    return comparisonRes;
+  }
+
+  private boolean checkStateForKey(String key) throws CannotProvideCoderException {
+    FlinkStateInternals<String> state = statePerKey.get(key);
+
+    ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    boolean comp = value.read().equals("test");
+
+    ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
+    comp &= value2.read().equals(5);
+
+    AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    comp &= combiningValue.read().equals(3);
+
+    WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+    comp &= watermark.read().equals(new Instant(1000));
+
+    BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
+    Iterator<String> it = bag.read().iterator();
+    int i = 0;
+    while (it.hasNext()) {
+      comp &= it.next().equals("v" + (++i));
+    }
+    return comp;
+  }
+
+  private void storeState(AbstractStateBackend.CheckpointStateOutputView out) throws Exception {
+    StateCheckpointWriter checkpointBuilder = StateCheckpointWriter.create(out);
+    Coder<String> keyCoder = StringUtf8Coder.of();
+
+    // checkpoint the timers
+    StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, keyCoder);
+
+    // checkpoint the state
+    StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder);
+  }
+
+  private boolean checkTimersForKey(Set<TimerInternals.TimerData> originalTimers, Set<TimerInternals.TimerData> restoredTimers) {
+    boolean comp = true;
+    if (restoredTimers == null) {
+      return false;
+    }
+
+    if (originalTimers.size() != restoredTimers.size()) {
+      return false;
+    }
+
+    for (TimerInternals.TimerData timer : originalTimers) {
+      comp &= restoredTimers.contains(timer);
+    }
+    return comp;
+  }
+
+  private boolean checkStateForKey(FlinkStateInternals<String> originalState, FlinkStateInternals<String> restoredState) throws CannotProvideCoderException {
+    if (restoredState == null) {
+      return false;
+    }
+
+    ValueState<String> orValue = originalState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    ValueState<String> resValue = restoredState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    boolean comp = orValue.read().equals(resValue.read());
+
+    ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, INT_VALUE_ADDR);
+    ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, INT_VALUE_ADDR);
+    comp &= orIntValue.read().equals(resIntValue.read());
+
+    AccumulatorCombiningState<Integer, int[], Integer> combOrValue = originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> combResValue = restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    comp &= combOrValue.read().equals(combResValue.read());
+
+    WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+    WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+    comp &= orWatermark.read().equals(resWatermark.read());
+
+    BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> resBag = restoredState.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    Iterator<String> orIt = orBag.read().iterator();
+    Iterator<String> resIt = resBag.read().iterator();
+
+    while (orIt.hasNext() && resIt.hasNext()) {
+      comp &= orIt.next().equals(resIt.next());
+    }
+
+    return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && resIt.hasNext())) && comp;
+  }
+
+  private FlinkStateInternals<String> createState(String key) throws CannotProvideCoderException {
+    return new FlinkStateInternals<>(
+      key,
+      StringUtf8Coder.of(),
+      IntervalWindow.getCoder(),
+      OutputTimeFns.outputAtEarliestInputTimestamp());
+  }
+
+  @Test
+  public void test() throws Exception {
+    StateSerializationTest test = new StateSerializationTest();
+    test.initializeStateAndTimers();
 
-		MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new MemoryStateBackend.MemoryCheckpointOutputStream(32048);
-		AbstractStateBackend.CheckpointStateOutputView out = new AbstractStateBackend.CheckpointStateOutputView(memBackend);
-
-		test.storeState(out);
+    MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new MemoryStateBackend.MemoryCheckpointOutputStream(32048);
+    AbstractStateBackend.CheckpointStateOutputView out = new AbstractStateBackend.CheckpointStateOutputView(memBackend);
+
+    test.storeState(out);
 
-		byte[] contents = memBackend.closeAndGetBytes();
-		DataInputView in = new DataInputDeserializer(contents, 0, contents.length);
+    byte[] contents = memBackend.closeAndGetBytes();
+    DataInputView in = new DataInputDeserializer(contents, 0, contents.length);
 
-		assertEquals(test.restoreAndTestState(in), true);
-	}
+    assertEquals(test.restoreAndTestState(in), true);
+  }
 
 }