You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:35 UTC
[40/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2
spaces
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
index 01f9c32..fb240f4 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
@@ -43,464 +43,464 @@ import java.util.concurrent.ConcurrentLinkedQueue;
public class GroupAlsoByWindowTest {
- private final Combine.CombineFn combiner = new Sum.SumIntegerFn();
-
- private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy =
- WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5)))
- .withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
-
- private final WindowingStrategy sessionWindowingStrategy =
- WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2)))
- .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
- .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
- .withAllowedLateness(Duration.standardSeconds(100));
-
- private final WindowingStrategy fixedWindowingStrategy =
- WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)));
-
- private final WindowingStrategy fixedWindowWithCountTriggerStrategy =
- fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5));
-
- private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy =
- fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow());
-
- private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy =
- fixedWindowingStrategy.withTrigger(
- AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5))
- .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger());
-
- /**
- * The default accumulation mode is
- * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}.
- * This strategy changes it to
- * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES}
- */
- private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc =
- fixedWindowWithCompoundTriggerStrategy
- .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
-
- @Test
- public void testWithLateness() throws Exception {
- WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2)))
- .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
- .withAllowedLateness(Duration.millis(1000));
- long initialTime = 0L;
- Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
- KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
-
- FlinkGroupAlsoByWindowWrapper gbwOperaror =
- FlinkGroupAlsoByWindowWrapper.createForTesting(
- pipeline.getOptions(),
- pipeline.getCoderRegistry(),
- strategy,
- inputCoder,
- combiner.<String>asKeyedFn());
-
- OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
- new OneInputStreamOperatorTestHarness<>(gbwOperaror);
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processWatermark(new Watermark(initialTime + 2000));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processWatermark(new Watermark(initialTime + 4000));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key1", 4),
- new Instant(initialTime + 1),
- new IntervalWindow(new Instant(0), new Instant(2000)),
- PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
- , initialTime + 1));
- expectedOutput.add(new Watermark(initialTime + 2000));
-
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key1", 5),
- new Instant(initialTime + 1999),
- new IntervalWindow(new Instant(0), new Instant(2000)),
- PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1))
- , initialTime + 1999));
-
-
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key1", 6),
- new Instant(initialTime + 1999),
- new IntervalWindow(new Instant(0), new Instant(2000)),
- PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2))
- , initialTime + 1999));
- expectedOutput.add(new Watermark(initialTime + 4000));
-
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
- testHarness.close();
- }
-
- @Test
- public void testSessionWindows() throws Exception {
- WindowingStrategy strategy = sessionWindowingStrategy;
-
- long initialTime = 0L;
- Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
- KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
-
- FlinkGroupAlsoByWindowWrapper gbwOperaror =
- FlinkGroupAlsoByWindowWrapper.createForTesting(
- pipeline.getOptions(),
- pipeline.getCoderRegistry(),
- strategy,
- inputCoder,
- combiner.<String>asKeyedFn());
-
- OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
- new OneInputStreamOperatorTestHarness<>(gbwOperaror);
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processWatermark(new Watermark(initialTime + 6000));
-
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20));
-
- testHarness.processWatermark(new Watermark(initialTime + 12000));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key1", 6),
- new Instant(initialTime + 1),
- new IntervalWindow(new Instant(1), new Instant(5700)),
- PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
- , initialTime + 1));
- expectedOutput.add(new Watermark(initialTime + 6000));
-
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key1", 11),
- new Instant(initialTime + 6700),
- new IntervalWindow(new Instant(1), new Instant(10900)),
- PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
- , initialTime + 6700));
- expectedOutput.add(new Watermark(initialTime + 12000));
-
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
- testHarness.close();
- }
-
- @Test
- public void testSlidingWindows() throws Exception {
- WindowingStrategy strategy = slidingWindowWithAfterWatermarkTriggerStrategy;
- long initialTime = 0L;
- OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
- createTestingOperatorAndState(strategy, initialTime);
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
- testHarness.processWatermark(new Watermark(initialTime + 25000));
-
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key1", 6),
- new Instant(initialTime + 5000),
- new IntervalWindow(new Instant(0), new Instant(10000)),
- PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime + 5000));
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key1", 6),
- new Instant(initialTime + 1),
- new IntervalWindow(new Instant(-5000), new Instant(5000)),
- PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime + 1));
- expectedOutput.add(new Watermark(initialTime + 10000));
-
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key1", 11),
- new Instant(initialTime + 15000),
- new IntervalWindow(new Instant(10000), new Instant(20000)),
- PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime + 15000));
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key1", 3),
- new Instant(initialTime + 10000),
- new IntervalWindow(new Instant(5000), new Instant(15000)),
- PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime + 10000));
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key2", 1),
- new Instant(initialTime + 19500),
- new IntervalWindow(new Instant(10000), new Instant(20000)),
- PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime + 19500));
- expectedOutput.add(new Watermark(initialTime + 20000));
-
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key2", 1),
- new Instant(initialTime + 20000),
- /**
- * this is 20000 and not 19500 because of a convention in dataflow where
- * timestamps of windowed values in a window cannot be smaller than the
- * end of a previous window. Checkout the documentation of the
- * {@link WindowFn#getOutputTime(Instant, BoundedWindow)}
- */
- new IntervalWindow(new Instant(15000), new Instant(25000)),
- PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime + 20000));
- expectedOutput.add(new StreamRecord<>(
- WindowedValue.of(KV.of("key1", 8),
- new Instant(initialTime + 20000),
- new IntervalWindow(new Instant(15000), new Instant(25000)),
- PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime + 20000));
- expectedOutput.add(new Watermark(initialTime + 25000));
-
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
- testHarness.close();
- }
-
- @Test
- public void testAfterWatermarkProgram() throws Exception {
- WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy;
- long initialTime = 0L;
- OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
- createTestingOperatorAndState(strategy, initialTime);
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
- new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1));
- expectedOutput.add(new Watermark(initialTime + 10000));
-
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
- new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000));
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
- expectedOutput.add(new Watermark(initialTime + 20000));
-
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
- testHarness.close();
- }
-
- @Test
- public void testAfterCountProgram() throws Exception {
- WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy;
-
- long initialTime = 0L;
- OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
- createTestingOperatorAndState(strategy, initialTime);
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1));
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000));
- expectedOutput.add(new Watermark(initialTime + 10000));
-
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500));
- expectedOutput.add(new Watermark(initialTime + 20000));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.close();
- }
-
- @Test
- public void testCompoundProgram() throws Exception {
- WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy;
-
- long initialTime = 0L;
- OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
- createTestingOperatorAndState(strategy, initialTime);
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- /**
- * PaneInfo are:
- * isFirst (pane in window),
- * isLast, Timing (of triggering),
- * index (of pane in the window),
- * onTimeIndex (if it the 1st,2nd, ... pane that was fired on time)
- * */
-
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
-
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
- new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
-
- expectedOutput.add(new Watermark(initialTime + 10000));
-
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
-
- expectedOutput.add(new Watermark(initialTime + 20000));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.close();
- }
-
- @Test
- public void testCompoundAccumulatingPanesProgram() throws Exception {
- WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc;
- long initialTime = 0L;
- OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
- createTestingOperatorAndState(strategy, initialTime);
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
-
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
- new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
-
- expectedOutput.add(new Watermark(initialTime + 10000));
-
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
- expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
-
- expectedOutput.add(new Watermark(initialTime + 20000));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.close();
- }
-
- private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception {
- Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
- KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
-
- FlinkGroupAlsoByWindowWrapper gbwOperaror =
- FlinkGroupAlsoByWindowWrapper.createForTesting(
- pipeline.getOptions(),
- pipeline.getCoderRegistry(),
- strategy,
- inputCoder,
- combiner.<String>asKeyedFn());
-
- OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
- new OneInputStreamOperatorTestHarness<>(gbwOperaror);
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-
- testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-
- testHarness.processWatermark(new Watermark(initialTime + 10000));
- testHarness.processWatermark(new Watermark(initialTime + 20000));
-
- return testHarness;
- }
-
- private static class ResultSortComparator implements Comparator<Object> {
- @Override
- public int compare(Object o1, Object o2) {
- if (o1 instanceof Watermark && o2 instanceof Watermark) {
- Watermark w1 = (Watermark) o1;
- Watermark w2 = (Watermark) o2;
- return (int) (w1.getTimestamp() - w2.getTimestamp());
- } else {
- StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
- StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
-
- int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis());
- if (comparison != 0) {
- return comparison;
- }
-
- comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
- if(comparison == 0) {
- comparison = Integer.compare(
- sr0.getValue().getValue().getValue(),
- sr1.getValue().getValue().getValue());
- }
- if(comparison == 0) {
- Collection windowsA = sr0.getValue().getWindows();
- Collection windowsB = sr1.getValue().getWindows();
-
- if(windowsA.size() != 1 || windowsB.size() != 1) {
- throw new IllegalStateException("A value cannot belong to more than one windows after grouping.");
- }
-
- BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next();
- BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next();
- comparison = Long.compare(windowA.maxTimestamp().getMillis(), windowB.maxTimestamp().getMillis());
- }
- return comparison;
- }
- }
- }
-
- private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy,
- T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
- final Instant inputTimestamp = timestamp;
- final WindowFn windowFn = strategy.getWindowFn();
-
- if (timestamp == null) {
- timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- if (windows == null) {
- try {
- windows = windowFn.assignWindows(windowFn.new AssignContext() {
- @Override
- public Object element() {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input element when none was available");
- }
-
- @Override
- public Instant timestamp() {
- if (inputTimestamp == null) {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input timestamp when none was available");
- }
- return inputTimestamp;
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input windows when none were available");
- }
- });
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
- }
-
- return WindowedValue.of(output, timestamp, windows, pane);
- }
+ private final Combine.CombineFn combiner = new Sum.SumIntegerFn();
+
+ private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy =
+ WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5)))
+ .withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+ private final WindowingStrategy sessionWindowingStrategy =
+ WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2)))
+ .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+ .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withAllowedLateness(Duration.standardSeconds(100));
+
+ private final WindowingStrategy fixedWindowingStrategy =
+ WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)));
+
+ private final WindowingStrategy fixedWindowWithCountTriggerStrategy =
+ fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5));
+
+ private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy =
+ fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow());
+
+ private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy =
+ fixedWindowingStrategy.withTrigger(
+ AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5))
+ .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger());
+
+ /**
+ * The default accumulation mode is
+ * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}.
+ * This strategy changes it to
+ * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES}
+ */
+ private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc =
+ fixedWindowWithCompoundTriggerStrategy
+ .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+ @Test
+ public void testWithLateness() throws Exception {
+ WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2)))
+ .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withAllowedLateness(Duration.millis(1000));
+ long initialTime = 0L;
+ Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+ KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
+
+ FlinkGroupAlsoByWindowWrapper gbwOperaror =
+ FlinkGroupAlsoByWindowWrapper.createForTesting(
+ pipeline.getOptions(),
+ pipeline.getCoderRegistry(),
+ strategy,
+ inputCoder,
+ combiner.<String>asKeyedFn());
+
+ OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+ new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processWatermark(new Watermark(initialTime + 2000));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processWatermark(new Watermark(initialTime + 4000));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key1", 4),
+ new Instant(initialTime + 1),
+ new IntervalWindow(new Instant(0), new Instant(2000)),
+ PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+ , initialTime + 1));
+ expectedOutput.add(new Watermark(initialTime + 2000));
+
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key1", 5),
+ new Instant(initialTime + 1999),
+ new IntervalWindow(new Instant(0), new Instant(2000)),
+ PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1))
+ , initialTime + 1999));
+
+
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key1", 6),
+ new Instant(initialTime + 1999),
+ new IntervalWindow(new Instant(0), new Instant(2000)),
+ PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2))
+ , initialTime + 1999));
+ expectedOutput.add(new Watermark(initialTime + 4000));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+ testHarness.close();
+ }
+
+ @Test
+ public void testSessionWindows() throws Exception {
+ WindowingStrategy strategy = sessionWindowingStrategy;
+
+ long initialTime = 0L;
+ Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+ KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
+
+ FlinkGroupAlsoByWindowWrapper gbwOperaror =
+ FlinkGroupAlsoByWindowWrapper.createForTesting(
+ pipeline.getOptions(),
+ pipeline.getCoderRegistry(),
+ strategy,
+ inputCoder,
+ combiner.<String>asKeyedFn());
+
+ OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+ new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processWatermark(new Watermark(initialTime + 6000));
+
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+ testHarness.processWatermark(new Watermark(initialTime + 12000));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key1", 6),
+ new Instant(initialTime + 1),
+ new IntervalWindow(new Instant(1), new Instant(5700)),
+ PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+ , initialTime + 1));
+ expectedOutput.add(new Watermark(initialTime + 6000));
+
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key1", 11),
+ new Instant(initialTime + 6700),
+ new IntervalWindow(new Instant(1), new Instant(10900)),
+ PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+ , initialTime + 6700));
+ expectedOutput.add(new Watermark(initialTime + 12000));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+ testHarness.close();
+ }
+
+ @Test
+ public void testSlidingWindows() throws Exception {
+ WindowingStrategy strategy = slidingWindowWithAfterWatermarkTriggerStrategy;
+ long initialTime = 0L;
+ OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+ createTestingOperatorAndState(strategy, initialTime);
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ testHarness.processWatermark(new Watermark(initialTime + 25000));
+
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key1", 6),
+ new Instant(initialTime + 5000),
+ new IntervalWindow(new Instant(0), new Instant(10000)),
+ PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+ , initialTime + 5000));
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key1", 6),
+ new Instant(initialTime + 1),
+ new IntervalWindow(new Instant(-5000), new Instant(5000)),
+ PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+ , initialTime + 1));
+ expectedOutput.add(new Watermark(initialTime + 10000));
+
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key1", 11),
+ new Instant(initialTime + 15000),
+ new IntervalWindow(new Instant(10000), new Instant(20000)),
+ PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+ , initialTime + 15000));
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key1", 3),
+ new Instant(initialTime + 10000),
+ new IntervalWindow(new Instant(5000), new Instant(15000)),
+ PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+ , initialTime + 10000));
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key2", 1),
+ new Instant(initialTime + 19500),
+ new IntervalWindow(new Instant(10000), new Instant(20000)),
+ PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+ , initialTime + 19500));
+ expectedOutput.add(new Watermark(initialTime + 20000));
+
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key2", 1),
+ new Instant(initialTime + 20000),
+ /**
+ * this is 20000 and not 19500 because of a convention in dataflow where
+ * timestamps of windowed values in a window cannot be smaller than the
+ * end of a previous window. Checkout the documentation of the
+ * {@link WindowFn#getOutputTime(Instant, BoundedWindow)}
+ */
+ new IntervalWindow(new Instant(15000), new Instant(25000)),
+ PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+ , initialTime + 20000));
+ expectedOutput.add(new StreamRecord<>(
+ WindowedValue.of(KV.of("key1", 8),
+ new Instant(initialTime + 20000),
+ new IntervalWindow(new Instant(15000), new Instant(25000)),
+ PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+ , initialTime + 20000));
+ expectedOutput.add(new Watermark(initialTime + 25000));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+ testHarness.close();
+ }
+
+ @Test
+ public void testAfterWatermarkProgram() throws Exception {
+ WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy;
+ long initialTime = 0L;
+ OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+ createTestingOperatorAndState(strategy, initialTime);
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
+ new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1));
+ expectedOutput.add(new Watermark(initialTime + 10000));
+
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
+ new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000));
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+ expectedOutput.add(new Watermark(initialTime + 20000));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+ testHarness.close();
+ }
+
+ @Test
+ public void testAfterCountProgram() throws Exception {
+ WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy;
+
+ long initialTime = 0L;
+ OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+ createTestingOperatorAndState(strategy, initialTime);
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+ new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1));
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+ new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000));
+ expectedOutput.add(new Watermark(initialTime + 10000));
+
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500));
+ expectedOutput.add(new Watermark(initialTime + 20000));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testCompoundProgram() throws Exception {
+ WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy;
+
+ long initialTime = 0L;
+ OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+ createTestingOperatorAndState(strategy, initialTime);
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ /**
+ * PaneInfo are:
+ * isFirst (pane in window),
+ * isLast, Timing (of triggering),
+ * index (of pane in the window),
+ * onTimeIndex (if it the 1st,2nd, ... pane that was fired on time)
+ * */
+
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+ new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+ new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
+
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
+ new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
+
+ expectedOutput.add(new Watermark(initialTime + 10000));
+
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+
+ expectedOutput.add(new Watermark(initialTime + 20000));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testCompoundAccumulatingPanesProgram() throws Exception {
+ WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc;
+ long initialTime = 0L;
+ OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+ createTestingOperatorAndState(strategy, initialTime);
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+ new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+ new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10),
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
+
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
+ new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
+
+ expectedOutput.add(new Watermark(initialTime + 10000));
+
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
+ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+
+ expectedOutput.add(new Watermark(initialTime + 20000));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+ }
+
+ private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception {
+ Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+ KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
+
+ FlinkGroupAlsoByWindowWrapper gbwOperaror =
+ FlinkGroupAlsoByWindowWrapper.createForTesting(
+ pipeline.getOptions(),
+ pipeline.getCoderRegistry(),
+ strategy,
+ inputCoder,
+ combiner.<String>asKeyedFn());
+
+ OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+ new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+ testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+ testHarness.processWatermark(new Watermark(initialTime + 10000));
+ testHarness.processWatermark(new Watermark(initialTime + 20000));
+
+ return testHarness;
+ }
+
+ private static class ResultSortComparator implements Comparator<Object> {
+ @Override
+ public int compare(Object o1, Object o2) {
+ if (o1 instanceof Watermark && o2 instanceof Watermark) {
+ Watermark w1 = (Watermark) o1;
+ Watermark w2 = (Watermark) o2;
+ return (int) (w1.getTimestamp() - w2.getTimestamp());
+ } else {
+ StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
+ StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
+
+ int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis());
+ if (comparison != 0) {
+ return comparison;
+ }
+
+ comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
+ if(comparison == 0) {
+ comparison = Integer.compare(
+ sr0.getValue().getValue().getValue(),
+ sr1.getValue().getValue().getValue());
+ }
+ if(comparison == 0) {
+ Collection windowsA = sr0.getValue().getWindows();
+ Collection windowsB = sr1.getValue().getWindows();
+
+ if(windowsA.size() != 1 || windowsB.size() != 1) {
+ throw new IllegalStateException("A value cannot belong to more than one windows after grouping.");
+ }
+
+ BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next();
+ BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next();
+ comparison = Long.compare(windowA.maxTimestamp().getMillis(), windowB.maxTimestamp().getMillis());
+ }
+ return comparison;
+ }
+ }
+ }
+
+ private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy,
+ T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ final Instant inputTimestamp = timestamp;
+ final WindowFn windowFn = strategy.getWindowFn();
+
+ if (timestamp == null) {
+ timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ if (windows == null) {
+ try {
+ windows = windowFn.assignWindows(windowFn.new AssignContext() {
+ @Override
+ public Object element() {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input element when none was available");
+ }
+
+ @Override
+ public Instant timestamp() {
+ if (inputTimestamp == null) {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input timestamp when none was available");
+ }
+ return inputTimestamp;
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input windows when none were available");
+ }
+ });
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+ }
+
+ return WindowedValue.of(output, timestamp, windows, pane);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
index 5a412aa..52e9e25 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
@@ -39,83 +39,83 @@ import java.util.Arrays;
public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable {
- protected String resultPath;
+ protected String resultPath;
- static final String[] EXPECTED_RESULT = new String[] {
- "k: null v: user1 user1 user1 user2 user2 user2 user2 user3"
- };
+ static final String[] EXPECTED_RESULT = new String[] {
+ "k: null v: user1 user1 user1 user2 user2 user2 user2 user3"
+ };
- public GroupByNullKeyTest(){
- }
+ public GroupByNullKeyTest(){
+ }
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
- public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> {
- private static final long serialVersionUID = 0;
+ public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> {
+ private static final long serialVersionUID = 0;
- @Override
- public void processElement(ProcessContext c) {
- KV<Integer, String> record = c.element();
- long now = System.currentTimeMillis();
- int timestamp = record.getKey();
- String userName = record.getValue();
- if (userName != null) {
- // Sets the implicit timestamp field to be used in windowing.
- c.outputWithTimestamp(userName, new Instant(timestamp + now));
- }
- }
- }
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<Integer, String> record = c.element();
+ long now = System.currentTimeMillis();
+ int timestamp = record.getKey();
+ String userName = record.getValue();
+ if (userName != null) {
+ // Sets the implicit timestamp field to be used in windowing.
+ c.outputWithTimestamp(userName, new Instant(timestamp + now));
+ }
+ }
+ }
- @Override
- protected void testProgram() throws Exception {
+ @Override
+ protected void testProgram() throws Exception {
- Pipeline p = FlinkTestPipeline.createForStreaming();
+ Pipeline p = FlinkTestPipeline.createForStreaming();
- PCollection<String> output =
- p.apply(Create.of(Arrays.asList(
- KV.<Integer, String>of(0, "user1"),
- KV.<Integer, String>of(1, "user1"),
- KV.<Integer, String>of(2, "user1"),
- KV.<Integer, String>of(10, "user2"),
- KV.<Integer, String>of(1, "user2"),
- KV.<Integer, String>of(15000, "user2"),
- KV.<Integer, String>of(12000, "user2"),
- KV.<Integer, String>of(25000, "user3"))))
- .apply(ParDo.of(new ExtractUserAndTimestamp()))
- .apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
- .triggering(AfterWatermark.pastEndOfWindow())
- .withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes())
+ PCollection<String> output =
+ p.apply(Create.of(Arrays.asList(
+ KV.<Integer, String>of(0, "user1"),
+ KV.<Integer, String>of(1, "user1"),
+ KV.<Integer, String>of(2, "user1"),
+ KV.<Integer, String>of(10, "user2"),
+ KV.<Integer, String>of(1, "user2"),
+ KV.<Integer, String>of(15000, "user2"),
+ KV.<Integer, String>of(12000, "user2"),
+ KV.<Integer, String>of(25000, "user3"))))
+ .apply(ParDo.of(new ExtractUserAndTimestamp()))
+ .apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
+ .triggering(AfterWatermark.pastEndOfWindow())
+ .withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes())
- .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- String elem = c.element();
- c.output(KV.<Void, String>of((Void) null, elem));
- }
- }))
- .apply(GroupByKey.<Void, String>create())
- .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- KV<Void, Iterable<String>> elem = c.element();
- StringBuilder str = new StringBuilder();
- str.append("k: " + elem.getKey() + " v:");
- for (String v : elem.getValue()) {
- str.append(" " + v);
- }
- c.output(str.toString());
- }
- }));
- output.apply(TextIO.Write.to(resultPath));
- p.run();
- }
+ .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ String elem = c.element();
+ c.output(KV.<Void, String>of((Void) null, elem));
+ }
+ }))
+ .apply(GroupByKey.<Void, String>create())
+ .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ KV<Void, Iterable<String>> elem = c.element();
+ StringBuilder str = new StringBuilder();
+ str.append("k: " + elem.getKey() + " v:");
+ for (String v : elem.getValue()) {
+ str.append(" " + v);
+ }
+ c.output(str.toString());
+ }
+ }));
+ output.apply(TextIO.Write.to(resultPath));
+ p.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
index 7489fcc..d5b1043 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
@@ -43,261 +43,261 @@ import static org.junit.Assert.assertEquals;
public class StateSerializationTest {
- private static final StateNamespace NAMESPACE_1 = StateNamespaces.global();
- private static final String KEY_PREFIX = "TEST_";
-
- // TODO: This can be replaced with the standard Sum.SumIntererFn once the state no longer needs
- // to create a StateTag at the point of restoring state. Currently StateTags are compared strictly
- // by type and combiners always use KeyedCombineFnWithContext rather than KeyedCombineFn or CombineFn.
- private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer> SUM_COMBINER =
- new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer>() {
- @Override
- public int[] createAccumulator(Object key, CombineWithContext.Context c) {
- return new int[1];
- }
-
- @Override
- public int[] addInput(Object key, int[] accumulator, Integer value, CombineWithContext.Context c) {
- accumulator[0] += value;
- return accumulator;
- }
-
- @Override
- public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, CombineWithContext.Context c) {
- int[] r = new int[1];
- for (int[] a : accumulators) {
- r[0] += a[0];
- }
- return r;
- }
-
- @Override
- public Integer extractOutput(Object key, int[] accumulator, CombineWithContext.Context c) {
- return accumulator[0];
- }
- };
-
- private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of(
- VarIntCoder.of(),
- new DelegateCoder.CodingFunction<int[], Integer>() {
- @Override
- public Integer apply(int[] accumulator) {
- return accumulator[0];
- }
- },
- new DelegateCoder.CodingFunction<Integer, int[]>() {
- @Override
- public int[] apply(Integer value) {
- int[] a = new int[1];
- a[0] = value;
- return a;
- }
- });
-
- private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
- StateTags.value("stringValue", StringUtf8Coder.of());
- private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR =
- StateTags.value("stringValue", VarIntCoder.of());
- private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR =
- StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, SUM_COMBINER);
- private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
- StateTags.bag("stringBag", StringUtf8Coder.of());
- private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_BAG_ADDR =
- StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
-
- private Map<String, FlinkStateInternals<String>> statePerKey = new HashMap<>();
-
- private Map<String, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
-
- private void initializeStateAndTimers() throws CannotProvideCoderException {
- for (int i = 0; i < 10; i++) {
- String key = KEY_PREFIX + i;
-
- FlinkStateInternals state = initializeStateForKey(key);
- Set<TimerInternals.TimerData> timers = new HashSet<>();
- for (int j = 0; j < 5; j++) {
- TimerInternals.TimerData timer = TimerInternals
- .TimerData.of(NAMESPACE_1,
- new Instant(1000 + i + j), TimeDomain.values()[j % 3]);
- timers.add(timer);
- }
-
- statePerKey.put(key, state);
- activeTimers.put(key, timers);
- }
- }
-
- private FlinkStateInternals<String> initializeStateForKey(String key) throws CannotProvideCoderException {
- FlinkStateInternals<String> state = createState(key);
-
- ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
- value.write("test");
-
- ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
- value2.write(4);
- value2.write(5);
-
- AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- combiningValue.add(1);
- combiningValue.add(2);
-
- WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
- watermark.add(new Instant(1000));
-
- BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
- bag.add("v1");
- bag.add("v2");
- bag.add("v3");
- bag.add("v4");
- return state;
- }
-
- private boolean restoreAndTestState(DataInputView in) throws Exception {
- StateCheckpointReader reader = new StateCheckpointReader(in);
- final ClassLoader userClassloader = this.getClass().getClassLoader();
- Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder();
- Coder<String> keyCoder = StringUtf8Coder.of();
-
- boolean comparisonRes = true;
-
- for (String key : statePerKey.keySet()) {
- comparisonRes &= checkStateForKey(key);
- }
-
- // restore the timers
- Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
- if (activeTimers.size() != restoredTimersPerKey.size()) {
- return false;
- }
-
- for (String key : statePerKey.keySet()) {
- Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key);
- Set<TimerInternals.TimerData> restoredTimers = restoredTimersPerKey.get(key);
- comparisonRes &= checkTimersForKey(originalTimers, restoredTimers);
- }
-
- // restore the state
- Map<String, FlinkStateInternals<String>> restoredPerKeyState =
- StateCheckpointUtils.decodeState(reader, OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, userClassloader);
- if (restoredPerKeyState.size() != statePerKey.size()) {
- return false;
- }
-
- for (String key : statePerKey.keySet()) {
- FlinkStateInternals<String> originalState = statePerKey.get(key);
- FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key);
- comparisonRes &= checkStateForKey(originalState, restoredState);
- }
- return comparisonRes;
- }
-
- private boolean checkStateForKey(String key) throws CannotProvideCoderException {
- FlinkStateInternals<String> state = statePerKey.get(key);
-
- ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
- boolean comp = value.read().equals("test");
-
- ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
- comp &= value2.read().equals(5);
-
- AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- comp &= combiningValue.read().equals(3);
-
- WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
- comp &= watermark.read().equals(new Instant(1000));
-
- BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
- Iterator<String> it = bag.read().iterator();
- int i = 0;
- while (it.hasNext()) {
- comp &= it.next().equals("v" + (++i));
- }
- return comp;
- }
-
- private void storeState(AbstractStateBackend.CheckpointStateOutputView out) throws Exception {
- StateCheckpointWriter checkpointBuilder = StateCheckpointWriter.create(out);
- Coder<String> keyCoder = StringUtf8Coder.of();
-
- // checkpoint the timers
- StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, keyCoder);
-
- // checkpoint the state
- StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder);
- }
-
- private boolean checkTimersForKey(Set<TimerInternals.TimerData> originalTimers, Set<TimerInternals.TimerData> restoredTimers) {
- boolean comp = true;
- if (restoredTimers == null) {
- return false;
- }
-
- if (originalTimers.size() != restoredTimers.size()) {
- return false;
- }
-
- for (TimerInternals.TimerData timer : originalTimers) {
- comp &= restoredTimers.contains(timer);
- }
- return comp;
- }
-
- private boolean checkStateForKey(FlinkStateInternals<String> originalState, FlinkStateInternals<String> restoredState) throws CannotProvideCoderException {
- if (restoredState == null) {
- return false;
- }
-
- ValueState<String> orValue = originalState.state(NAMESPACE_1, STRING_VALUE_ADDR);
- ValueState<String> resValue = restoredState.state(NAMESPACE_1, STRING_VALUE_ADDR);
- boolean comp = orValue.read().equals(resValue.read());
-
- ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, INT_VALUE_ADDR);
- ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, INT_VALUE_ADDR);
- comp &= orIntValue.read().equals(resIntValue.read());
-
- AccumulatorCombiningState<Integer, int[], Integer> combOrValue = originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- AccumulatorCombiningState<Integer, int[], Integer> combResValue = restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
- comp &= combOrValue.read().equals(combResValue.read());
-
- WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
- WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
- comp &= orWatermark.read().equals(resWatermark.read());
-
- BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR);
- BagState<String> resBag = restoredState.state(NAMESPACE_1, STRING_BAG_ADDR);
-
- Iterator<String> orIt = orBag.read().iterator();
- Iterator<String> resIt = resBag.read().iterator();
-
- while (orIt.hasNext() && resIt.hasNext()) {
- comp &= orIt.next().equals(resIt.next());
- }
-
- return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && resIt.hasNext())) && comp;
- }
-
- private FlinkStateInternals<String> createState(String key) throws CannotProvideCoderException {
- return new FlinkStateInternals<>(
- key,
- StringUtf8Coder.of(),
- IntervalWindow.getCoder(),
- OutputTimeFns.outputAtEarliestInputTimestamp());
- }
-
- @Test
- public void test() throws Exception {
- StateSerializationTest test = new StateSerializationTest();
- test.initializeStateAndTimers();
+ private static final StateNamespace NAMESPACE_1 = StateNamespaces.global();
+ private static final String KEY_PREFIX = "TEST_";
+
+ // TODO: This can be replaced with the standard Sum.SumIntererFn once the state no longer needs
+ // to create a StateTag at the point of restoring state. Currently StateTags are compared strictly
+ // by type and combiners always use KeyedCombineFnWithContext rather than KeyedCombineFn or CombineFn.
+ private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer> SUM_COMBINER =
+ new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer>() {
+ @Override
+ public int[] createAccumulator(Object key, CombineWithContext.Context c) {
+ return new int[1];
+ }
+
+ @Override
+ public int[] addInput(Object key, int[] accumulator, Integer value, CombineWithContext.Context c) {
+ accumulator[0] += value;
+ return accumulator;
+ }
+
+ @Override
+ public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, CombineWithContext.Context c) {
+ int[] r = new int[1];
+ for (int[] a : accumulators) {
+ r[0] += a[0];
+ }
+ return r;
+ }
+
+ @Override
+ public Integer extractOutput(Object key, int[] accumulator, CombineWithContext.Context c) {
+ return accumulator[0];
+ }
+ };
+
+ private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of(
+ VarIntCoder.of(),
+ new DelegateCoder.CodingFunction<int[], Integer>() {
+ @Override
+ public Integer apply(int[] accumulator) {
+ return accumulator[0];
+ }
+ },
+ new DelegateCoder.CodingFunction<Integer, int[]>() {
+ @Override
+ public int[] apply(Integer value) {
+ int[] a = new int[1];
+ a[0] = value;
+ return a;
+ }
+ });
+
+ private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+ StateTags.value("stringValue", StringUtf8Coder.of());
+ private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR =
+ StateTags.value("stringValue", VarIntCoder.of());
+ private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR =
+ StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, SUM_COMBINER);
+ private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+ StateTags.bag("stringBag", StringUtf8Coder.of());
+ private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_BAG_ADDR =
+ StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+
+ private Map<String, FlinkStateInternals<String>> statePerKey = new HashMap<>();
+
+ private Map<String, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+
+ private void initializeStateAndTimers() throws CannotProvideCoderException {
+ for (int i = 0; i < 10; i++) {
+ String key = KEY_PREFIX + i;
+
+ FlinkStateInternals state = initializeStateForKey(key);
+ Set<TimerInternals.TimerData> timers = new HashSet<>();
+ for (int j = 0; j < 5; j++) {
+ TimerInternals.TimerData timer = TimerInternals
+ .TimerData.of(NAMESPACE_1,
+ new Instant(1000 + i + j), TimeDomain.values()[j % 3]);
+ timers.add(timer);
+ }
+
+ statePerKey.put(key, state);
+ activeTimers.put(key, timers);
+ }
+ }
+
+ private FlinkStateInternals<String> initializeStateForKey(String key) throws CannotProvideCoderException {
+ FlinkStateInternals<String> state = createState(key);
+
+ ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
+ value.write("test");
+
+ ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
+ value2.write(4);
+ value2.write(5);
+
+ AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ combiningValue.add(1);
+ combiningValue.add(2);
+
+ WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+ watermark.add(new Instant(1000));
+
+ BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
+ bag.add("v1");
+ bag.add("v2");
+ bag.add("v3");
+ bag.add("v4");
+ return state;
+ }
+
+ private boolean restoreAndTestState(DataInputView in) throws Exception {
+ StateCheckpointReader reader = new StateCheckpointReader(in);
+ final ClassLoader userClassloader = this.getClass().getClassLoader();
+ Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder();
+ Coder<String> keyCoder = StringUtf8Coder.of();
+
+ boolean comparisonRes = true;
+
+ for (String key : statePerKey.keySet()) {
+ comparisonRes &= checkStateForKey(key);
+ }
+
+ // restore the timers
+ Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
+ if (activeTimers.size() != restoredTimersPerKey.size()) {
+ return false;
+ }
+
+ for (String key : statePerKey.keySet()) {
+ Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key);
+ Set<TimerInternals.TimerData> restoredTimers = restoredTimersPerKey.get(key);
+ comparisonRes &= checkTimersForKey(originalTimers, restoredTimers);
+ }
+
+ // restore the state
+ Map<String, FlinkStateInternals<String>> restoredPerKeyState =
+ StateCheckpointUtils.decodeState(reader, OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, userClassloader);
+ if (restoredPerKeyState.size() != statePerKey.size()) {
+ return false;
+ }
+
+ for (String key : statePerKey.keySet()) {
+ FlinkStateInternals<String> originalState = statePerKey.get(key);
+ FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key);
+ comparisonRes &= checkStateForKey(originalState, restoredState);
+ }
+ return comparisonRes;
+ }
+
+ private boolean checkStateForKey(String key) throws CannotProvideCoderException {
+ FlinkStateInternals<String> state = statePerKey.get(key);
+
+ ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
+ boolean comp = value.read().equals("test");
+
+ ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
+ comp &= value2.read().equals(5);
+
+ AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ comp &= combiningValue.read().equals(3);
+
+ WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+ comp &= watermark.read().equals(new Instant(1000));
+
+ BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
+ Iterator<String> it = bag.read().iterator();
+ int i = 0;
+ while (it.hasNext()) {
+ comp &= it.next().equals("v" + (++i));
+ }
+ return comp;
+ }
+
+ private void storeState(AbstractStateBackend.CheckpointStateOutputView out) throws Exception {
+ StateCheckpointWriter checkpointBuilder = StateCheckpointWriter.create(out);
+ Coder<String> keyCoder = StringUtf8Coder.of();
+
+ // checkpoint the timers
+ StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, keyCoder);
+
+ // checkpoint the state
+ StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder);
+ }
+
+ private boolean checkTimersForKey(Set<TimerInternals.TimerData> originalTimers, Set<TimerInternals.TimerData> restoredTimers) {
+ boolean comp = true;
+ if (restoredTimers == null) {
+ return false;
+ }
+
+ if (originalTimers.size() != restoredTimers.size()) {
+ return false;
+ }
+
+ for (TimerInternals.TimerData timer : originalTimers) {
+ comp &= restoredTimers.contains(timer);
+ }
+ return comp;
+ }
+
+ private boolean checkStateForKey(FlinkStateInternals<String> originalState, FlinkStateInternals<String> restoredState) throws CannotProvideCoderException {
+ if (restoredState == null) {
+ return false;
+ }
+
+ ValueState<String> orValue = originalState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+ ValueState<String> resValue = restoredState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+ boolean comp = orValue.read().equals(resValue.read());
+
+ ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, INT_VALUE_ADDR);
+ ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, INT_VALUE_ADDR);
+ comp &= orIntValue.read().equals(resIntValue.read());
+
+ AccumulatorCombiningState<Integer, int[], Integer> combOrValue = originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ AccumulatorCombiningState<Integer, int[], Integer> combResValue = restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+ comp &= combOrValue.read().equals(combResValue.read());
+
+ WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+ WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+ comp &= orWatermark.read().equals(resWatermark.read());
+
+ BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR);
+ BagState<String> resBag = restoredState.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+ Iterator<String> orIt = orBag.read().iterator();
+ Iterator<String> resIt = resBag.read().iterator();
+
+ while (orIt.hasNext() && resIt.hasNext()) {
+ comp &= orIt.next().equals(resIt.next());
+ }
+
+ return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && resIt.hasNext())) && comp;
+ }
+
+ private FlinkStateInternals<String> createState(String key) throws CannotProvideCoderException {
+ return new FlinkStateInternals<>(
+ key,
+ StringUtf8Coder.of(),
+ IntervalWindow.getCoder(),
+ OutputTimeFns.outputAtEarliestInputTimestamp());
+ }
+
+ @Test
+ public void test() throws Exception {
+ StateSerializationTest test = new StateSerializationTest();
+ test.initializeStateAndTimers();
- MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new MemoryStateBackend.MemoryCheckpointOutputStream(32048);
- AbstractStateBackend.CheckpointStateOutputView out = new AbstractStateBackend.CheckpointStateOutputView(memBackend);
-
- test.storeState(out);
+ MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new MemoryStateBackend.MemoryCheckpointOutputStream(32048);
+ AbstractStateBackend.CheckpointStateOutputView out = new AbstractStateBackend.CheckpointStateOutputView(memBackend);
+
+ test.storeState(out);
- byte[] contents = memBackend.closeAndGetBytes();
- DataInputView in = new DataInputDeserializer(contents, 0, contents.length);
+ byte[] contents = memBackend.closeAndGetBytes();
+ DataInputView in = new DataInputDeserializer(contents, 0, contents.length);
- assertEquals(test.restoreAndTestState(in), true);
- }
+ assertEquals(test.restoreAndTestState(in), true);
+ }
}