You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "xuyang (Jira)" <ji...@apache.org> on 2024/01/22 03:19:00 UTC
[jira] [Updated] (FLINK-34175) When meeting WindowedSliceAssigner, slice window agg registers an wrong timestamp timer
[ https://issues.apache.org/jira/browse/FLINK-34175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
xuyang updated FLINK-34175:
---------------------------
Description:
The following test added to SlicingWindowAggOperatorTest can re-produce this problem.
{code:java}
private static final RowType INPUT_ROW_TYPE_FROM_WINDOW_TVF =
new RowType(
Arrays.asList(
new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)),
new RowType.RowField("f1", new IntType()),
new RowType.RowField("f2", new TimestampType()),
new RowType.RowField("f3", new TimestampType()),
new RowType.RowField(
"f4", new TimestampType(false, TimestampKind.ROWTIME, 3))));
protected static final RowDataSerializer INPUT_ROW_SER_FROM_WINDOW_TVF =
new RowDataSerializer(INPUT_ROW_TYPE_FROM_WINDOW_TVF);
@Test
public void test() throws Exception {
final SliceAssigner innerAssigner =
SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofSeconds(3));
final SliceAssigner assigner = SliceAssigners.windowed(3, innerAssigner);
final SlicingSumAndCountAggsFunction aggsFunction =
new SlicingSumAndCountAggsFunction(assigner);
SlicingWindowOperator<RowData, ?> operator =
(SlicingWindowOperator<RowData, ?>)
WindowAggOperatorBuilder.builder()
.inputSerializer(INPUT_ROW_SER_FROM_WINDOW_TVF)
.shiftTimeZone(shiftTimeZone)
.keySerializer(KEY_SER)
.assigner(assigner)
.aggregate(wrapGenerated(aggsFunction), ACC_SER)
.build();
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
createTestHarness(operator);
testHarness.setup(OUT_SERIALIZER);
testHarness.open();
// process elements
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
// add elements out-of-order
testHarness.processElement(
insertRecord(
"key2",
1,
fromEpochMillis(999L),
fromEpochMillis(3999L),
fromEpochMillis(3998L)));
testHarness.processWatermark(new Watermark(999));
expectedOutput.add(new Watermark(999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
}{code}
> When meeting WindowedSliceAssigner, slice window agg registers an wrong timestamp timer
> ----------------------------------------------------------------------------------------
>
> Key: FLINK-34175
> URL: https://issues.apache.org/jira/browse/FLINK-34175
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: xuyang
> Priority: Major
>
> The following test added to SlicingWindowAggOperatorTest can re-produce this problem.
> {code:java}
> private static final RowType INPUT_ROW_TYPE_FROM_WINDOW_TVF =
> new RowType(
> Arrays.asList(
> new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)),
> new RowType.RowField("f1", new IntType()),
> new RowType.RowField("f2", new TimestampType()),
> new RowType.RowField("f3", new TimestampType()),
> new RowType.RowField(
> "f4", new TimestampType(false, TimestampKind.ROWTIME, 3))));
> protected static final RowDataSerializer INPUT_ROW_SER_FROM_WINDOW_TVF =
> new RowDataSerializer(INPUT_ROW_TYPE_FROM_WINDOW_TVF);
> @Test
> public void test() throws Exception {
> final SliceAssigner innerAssigner =
> SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofSeconds(3));
> final SliceAssigner assigner = SliceAssigners.windowed(3, innerAssigner);
> final SlicingSumAndCountAggsFunction aggsFunction =
> new SlicingSumAndCountAggsFunction(assigner);
> SlicingWindowOperator<RowData, ?> operator =
> (SlicingWindowOperator<RowData, ?>)
> WindowAggOperatorBuilder.builder()
> .inputSerializer(INPUT_ROW_SER_FROM_WINDOW_TVF)
> .shiftTimeZone(shiftTimeZone)
> .keySerializer(KEY_SER)
> .assigner(assigner)
> .aggregate(wrapGenerated(aggsFunction), ACC_SER)
> .build();
> OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
> createTestHarness(operator);
> testHarness.setup(OUT_SERIALIZER);
> testHarness.open();
> // process elements
> ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
> // add elements out-of-order
> testHarness.processElement(
> insertRecord(
> "key2",
> 1,
> fromEpochMillis(999L),
> fromEpochMillis(3999L),
> fromEpochMillis(3998L)));
> testHarness.processWatermark(new Watermark(999));
> expectedOutput.add(new Watermark(999));
> ASSERTER.assertOutputEqualsSorted(
> "Output was not correct.", expectedOutput, testHarness.getOutput());
> testHarness.close();
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)