You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2024/01/19 16:20:00 UTC
[jira] [Updated] (FLINK-34166) KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty
[ https://issues.apache.org/jira/browse/FLINK-34166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-34166:
-----------------------------------
Labels: pull-request-available (was: )
> KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-34166
> URL: https://issues.apache.org/jira/browse/FLINK-34166
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.17.2, 1.18.1
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> KeyedLookupJoinWrapper(when 'table.optimizer.non-deterministic-update.strategy
> ' is set to 'TRY_RESOLVE' and the lookup join exists NDU problemns) incorrectly process delete message for inner join when previous lookup result is empty
> The intermediate delete result
> {code}
> expectedOutput.add(deleteRecord(3, "c", null, null));
> {code}
> in current case KeyedLookupJoinHarnessTest#testTemporalInnerJoinWithFilterLookupKeyContainsPk is incorrect:
> {code}
> @Test
> public void testTemporalInnerJoinWithFilterLookupKeyContainsPk() throws Exception {
> OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
> createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, true);
> testHarness.open();
> testHarness.processElement(insertRecord(1, "a"));
> testHarness.processElement(insertRecord(2, "b"));
> testHarness.processElement(insertRecord(3, "c"));
> testHarness.processElement(insertRecord(4, "d"));
> testHarness.processElement(insertRecord(5, "e"));
> testHarness.processElement(updateBeforeRecord(3, "c"));
> testHarness.processElement(updateAfterRecord(3, "c2"));
> testHarness.processElement(deleteRecord(3, "c2"));
> testHarness.processElement(insertRecord(3, "c3"));
> List<Object> expectedOutput = new ArrayList<>();
> expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
> expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
> expectedOutput.add(deleteRecord(3, "c", null, null));
> expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
> expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
> expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
> assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
> testHarness.close();
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)