You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/06/09 08:38:06 UTC

[flink] branch release-1.15 updated: [FLINK-27418][table-planner] Fix topN retraction for previously deleted record

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new ce23d6331c4 [FLINK-27418][table-planner] Fix topN retraction for previously deleted record
ce23d6331c4 is described below

commit ce23d6331c448b401066006a794c7cf91aeb5fda
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Jun 9 16:37:52 2022 +0800

    [FLINK-27418][table-planner] Fix topN retraction for previously deleted record
    
    This closes #19912
---
 .../operators/rank/RetractableTopNFunction.java    |  7 +++-
 .../rank/RetractableTopNFunctionTest.java          | 46 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
index 6694a12771c..da4d94d994b 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.runtime.operators.rank;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
@@ -80,6 +82,8 @@ public class RetractableTopNFunction extends AbstractTopNFunction {
 
     private final ComparableRecordComparator serializableComparator;
 
+    private final TypeSerializer<RowData> inputRowSer;
+
     public RetractableTopNFunction(
             StateTtlConfig ttlConfig,
             InternalTypeInfo<RowData> inputRowType,
@@ -102,6 +106,7 @@ public class RetractableTopNFunction extends AbstractTopNFunction {
         this.sortKeyType = sortKeySelector.getProducedType();
         this.serializableComparator = comparableRecordComparator;
         this.generatedEqualiser = generatedEqualiser;
+        this.inputRowSer = inputRowType.createSerializer(new ExecutionConfig());
     }
 
     @Override
@@ -320,7 +325,7 @@ public class RetractableTopNFunction extends AbstractTopNFunction {
             }
         }
         if (toDelete != null) {
-            collectDelete(out, toDelete);
+            collectDelete(out, inputRowSer.copy(toDelete));
         }
         if (toCollect != null) {
             collectInsert(out, inputRow);
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
index 9ac538068f9..20efa889c18 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
@@ -21,6 +21,10 @@ package org.apache.flink.table.runtime.operators.rank;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.VarCharType;
 
 import org.junit.Test;
 
@@ -31,6 +35,7 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+import static org.apache.flink.table.types.logical.VarCharType.MAX_LENGTH;
 
 /** Tests for {@link RetractableTopNFunction}. */
 public class RetractableTopNFunctionTest extends TopNFunctionTestBase {
@@ -509,4 +514,45 @@ public class RetractableTopNFunctionTest extends TopNFunctionTestBase {
         assertorWithRowNumber.assertOutputEquals(
                 "output wrong.", expectedOutput, testHarness.getOutput());
     }
+
+    @Test
+    public void testRetractAndThenDeleteRecordWithoutRowNumber() throws Exception {
+        AbstractTopNFunction func =
+                new RetractableTopNFunction(
+                        ttlConfig,
+                        InternalTypeInfo.ofFields(
+                                new VarCharType(MAX_LENGTH),
+                                new BigIntType(),
+                                new IntType(),
+                                new IntType()),
+                        comparableRecordComparator,
+                        sortKeySelector,
+                        RankType.ROW_NUMBER,
+                        new ConstantRankRange(1, 1),
+                        generatedEqualiser,
+                        true,
+                        false);
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
+        testHarness.open();
+        testHarness.processElement(insertRecord("a", 1L, 10, 0));
+        testHarness.processElement(insertRecord("a", 1L, 9, 0));
+        testHarness.processElement(deleteRecord("a", 1L, 10, 0));
+        testHarness.processElement(deleteRecord("a", 1L, 9, 0));
+        testHarness.processElement(insertRecord("a", 1L, 10, 1));
+        testHarness.processElement(insertRecord("a", 1L, 9, 1));
+        testHarness.close();
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("a", 1L, 10, 0));
+        expectedOutput.add(deleteRecord("a", 1L, 10, 0));
+        expectedOutput.add(insertRecord("a", 1L, 9, 0));
+        expectedOutput.add(deleteRecord("a", 1L, 9, 0));
+        expectedOutput.add(insertRecord("a", 1L, 10, 1));
+        expectedOutput.add(deleteRecord("a", 1L, 10, 1));
+        expectedOutput.add(insertRecord("a", 1L, 9, 1));
+
+        assertorWithRowNumber.assertOutputEquals(
+                "output wrong.", expectedOutput, testHarness.getOutput());
+    }
 }