You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jingsong Lee (Jira)" <ji...@apache.org> on 2022/06/16 06:54:00 UTC
[jira] [Closed] (FLINK-28019) Error occurred when retract a staled record when enable state ttl in RetractableTopNFunction
[ https://issues.apache.org/jira/browse/FLINK-28019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jingsong Lee closed FLINK-28019.
--------------------------------
Resolution: Fixed
master: c4d4bb5c28d5319fe567b31464683e3f5f22ba67
> Error occurred when retract a staled record when enable state ttl in RetractableTopNFunction
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-28019
> URL: https://issues.apache.org/jira/browse/FLINK-28019
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.15.0, 1.14.4
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.16.0
>
>
> We found an error occurred when retract a staled record when enable state ttl in RetractableTopNFunction, a reproduce case:
> {code}
> @Test
> public void testRetractAnStaledRecordWithRowNumber() throws Exception {
> StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000);
> AbstractTopNFunction func =
> new RetractableTopNFunction(
> ttlConfig,
> InternalTypeInfo.ofFields(
> VarCharType.STRING_TYPE, new BigIntType(), new IntType()),
> comparableRecordComparator,
> sortKeySelector,
> RankType.ROW_NUMBER,
> new ConstantRankRange(1, 2),
> generatedEqualiser,
> true,
> true);
> OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
> testHarness.open();
> testHarness.setStateTtlProcessingTime(0);
> testHarness.processElement(insertRecord("a", 1L, 10));
> testHarness.setStateTtlProcessingTime(1001);
> testHarness.processElement(insertRecord("a", 2L, 11));
> testHarness.processElement(deleteRecord("a", 1L, 10));
> testHarness.close();
> List<Object> expectedOutput = new ArrayList<>();
> expectedOutput.add(insertRecord("a", 1L, 10, 1L));
> expectedOutput.add(insertRecord("a", 2L, 11, 1L));
> // the following delete record should not be sent because the left row is null which is
> // illegal.
> // -D{row1=null, row2=+I(1)};
> assertorWithRowNumber.assertOutputEquals(
> "output wrong.", expectedOutput, testHarness.getOutput());
> }
> {code}
> the reason is the uncomplete path when deal with staled records.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)