You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "David Anderson (Jira)" <ji...@apache.org> on 2022/06/22 09:58:00 UTC

[jira] [Updated] (FLINK-28019) Error in RetractableTopNFunction when retracting a stale record with state ttl enabled

     [ https://issues.apache.org/jira/browse/FLINK-28019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

David Anderson updated FLINK-28019:
-----------------------------------
    Summary: Error in RetractableTopNFunction when retracting a stale record with state ttl enabled  (was: Error occurred when retract a staled record when enable state ttl in RetractableTopNFunction)

> Error in RetractableTopNFunction when retracting a stale record with state ttl enabled
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-28019
>                 URL: https://issues.apache.org/jira/browse/FLINK-28019
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.15.0, 1.14.4
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.0, 1.15.1, 1.14.6
>
>
> We found an error occurred when retract a staled record when enable state ttl in RetractableTopNFunction, a reproduce case:
> {code}
>     @Test
>     public void testRetractAnStaledRecordWithRowNumber() throws Exception {
>         StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000);
>         AbstractTopNFunction func =
>                 new RetractableTopNFunction(
>                         ttlConfig,
>                         InternalTypeInfo.ofFields(
>                                 VarCharType.STRING_TYPE, new BigIntType(), new IntType()),
>                         comparableRecordComparator,
>                         sortKeySelector,
>                         RankType.ROW_NUMBER,
>                         new ConstantRankRange(1, 2),
>                         generatedEqualiser,
>                         true,
>                         true);
>         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
>         testHarness.open();
>         testHarness.setStateTtlProcessingTime(0);
>         testHarness.processElement(insertRecord("a", 1L, 10));
>         testHarness.setStateTtlProcessingTime(1001);
>         testHarness.processElement(insertRecord("a", 2L, 11));
>         testHarness.processElement(deleteRecord("a", 1L, 10));
>         testHarness.close();
>         List<Object> expectedOutput = new ArrayList<>();
>         expectedOutput.add(insertRecord("a", 1L, 10, 1L));
>         expectedOutput.add(insertRecord("a", 2L, 11, 1L));
>         // the following delete record should not be sent because the left row is null which is
>         // illegal.
>         // -D{row1=null, row2=+I(1)};
>         assertorWithRowNumber.assertOutputEquals(
>                 "output wrong.", expectedOutput, testHarness.getOutput());
>     }
> {code}
> the reason is the uncomplete path when deal with staled records.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)