You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "xuyang (Jira)" <ji...@apache.org> on 2024/01/22 03:19:00 UTC

[jira] [Updated] (FLINK-34175) When meeting WindowedSliceAssigner, slice window agg registers an wrong timestamp timer

     [ https://issues.apache.org/jira/browse/FLINK-34175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

xuyang updated FLINK-34175:
---------------------------
    Description: 
The following test added to SlicingWindowAggOperatorTest can re-produce this problem.
{code:java}
private static final RowType INPUT_ROW_TYPE_FROM_WINDOW_TVF =
        new RowType(
                Arrays.asList(
                        new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)),
                        new RowType.RowField("f1", new IntType()),
                        new RowType.RowField("f2", new TimestampType()),
                        new RowType.RowField("f3", new TimestampType()),
                        new RowType.RowField(
                                "f4", new TimestampType(false, TimestampKind.ROWTIME, 3))));

protected static final RowDataSerializer INPUT_ROW_SER_FROM_WINDOW_TVF =
        new RowDataSerializer(INPUT_ROW_TYPE_FROM_WINDOW_TVF); 

@Test
public void test() throws Exception {
    final SliceAssigner innerAssigner =
            SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofSeconds(3));
    final SliceAssigner assigner = SliceAssigners.windowed(3, innerAssigner);
    final SlicingSumAndCountAggsFunction aggsFunction =
            new SlicingSumAndCountAggsFunction(assigner);
    SlicingWindowOperator<RowData, ?> operator =
            (SlicingWindowOperator<RowData, ?>)
                    WindowAggOperatorBuilder.builder()
                            .inputSerializer(INPUT_ROW_SER_FROM_WINDOW_TVF)
                            .shiftTimeZone(shiftTimeZone)
                            .keySerializer(KEY_SER)
                            .assigner(assigner)
                            .aggregate(wrapGenerated(aggsFunction), ACC_SER)
                            .build();


    OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
            createTestHarness(operator);


    testHarness.setup(OUT_SERIALIZER);
    testHarness.open();


    // process elements
    ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();


    // add elements out-of-order
    testHarness.processElement(
            insertRecord(
                    "key2",
                    1,
                    fromEpochMillis(999L),
                    fromEpochMillis(3999L),
                    fromEpochMillis(3998L)));


    testHarness.processWatermark(new Watermark(999));
    expectedOutput.add(new Watermark(999));
    ASSERTER.assertOutputEqualsSorted(
            "Output was not correct.", expectedOutput, testHarness.getOutput());


    testHarness.close();
}{code}

> When meeting WindowedSliceAssigner, slice window agg registers an wrong timestamp timer 
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-34175
>                 URL: https://issues.apache.org/jira/browse/FLINK-34175
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: xuyang
>            Priority: Major
>
> The following test added to SlicingWindowAggOperatorTest can re-produce this problem.
> {code:java}
> private static final RowType INPUT_ROW_TYPE_FROM_WINDOW_TVF =
>         new RowType(
>                 Arrays.asList(
>                         new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)),
>                         new RowType.RowField("f1", new IntType()),
>                         new RowType.RowField("f2", new TimestampType()),
>                         new RowType.RowField("f3", new TimestampType()),
>                         new RowType.RowField(
>                                 "f4", new TimestampType(false, TimestampKind.ROWTIME, 3))));
> protected static final RowDataSerializer INPUT_ROW_SER_FROM_WINDOW_TVF =
>         new RowDataSerializer(INPUT_ROW_TYPE_FROM_WINDOW_TVF); 
> @Test
> public void test() throws Exception {
>     final SliceAssigner innerAssigner =
>             SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofSeconds(3));
>     final SliceAssigner assigner = SliceAssigners.windowed(3, innerAssigner);
>     final SlicingSumAndCountAggsFunction aggsFunction =
>             new SlicingSumAndCountAggsFunction(assigner);
>     SlicingWindowOperator<RowData, ?> operator =
>             (SlicingWindowOperator<RowData, ?>)
>                     WindowAggOperatorBuilder.builder()
>                             .inputSerializer(INPUT_ROW_SER_FROM_WINDOW_TVF)
>                             .shiftTimeZone(shiftTimeZone)
>                             .keySerializer(KEY_SER)
>                             .assigner(assigner)
>                             .aggregate(wrapGenerated(aggsFunction), ACC_SER)
>                             .build();
>     OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
>             createTestHarness(operator);
>     testHarness.setup(OUT_SERIALIZER);
>     testHarness.open();
>     // process elements
>     ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
>     // add elements out-of-order
>     testHarness.processElement(
>             insertRecord(
>                     "key2",
>                     1,
>                     fromEpochMillis(999L),
>                     fromEpochMillis(3999L),
>                     fromEpochMillis(3998L)));
>     testHarness.processWatermark(new Watermark(999));
>     expectedOutput.add(new Watermark(999));
>     ASSERTER.assertOutputEqualsSorted(
>             "Output was not correct.", expectedOutput, testHarness.getOutput());
>     testHarness.close();
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)