You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/06/09 08:38:06 UTC
[flink] branch release-1.15 updated: [FLINK-27418][table-planner] Fix topN retraction for previously deleted record
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new ce23d6331c4 [FLINK-27418][table-planner] Fix topN retraction for previously deleted record
ce23d6331c4 is described below
commit ce23d6331c448b401066006a794c7cf91aeb5fda
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu Jun 9 16:37:52 2022 +0800
[FLINK-27418][table-planner] Fix topN retraction for previously deleted record
This closes #19912
---
.../operators/rank/RetractableTopNFunction.java | 7 +++-
.../rank/RetractableTopNFunctionTest.java | 46 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
index 6694a12771c..da4d94d994b 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
@@ -18,12 +18,14 @@
package org.apache.flink.table.runtime.operators.rank;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
@@ -80,6 +82,8 @@ public class RetractableTopNFunction extends AbstractTopNFunction {
private final ComparableRecordComparator serializableComparator;
+ private final TypeSerializer<RowData> inputRowSer;
+
public RetractableTopNFunction(
StateTtlConfig ttlConfig,
InternalTypeInfo<RowData> inputRowType,
@@ -102,6 +106,7 @@ public class RetractableTopNFunction extends AbstractTopNFunction {
this.sortKeyType = sortKeySelector.getProducedType();
this.serializableComparator = comparableRecordComparator;
this.generatedEqualiser = generatedEqualiser;
+ this.inputRowSer = inputRowType.createSerializer(new ExecutionConfig());
}
@Override
@@ -320,7 +325,7 @@ public class RetractableTopNFunction extends AbstractTopNFunction {
}
}
if (toDelete != null) {
- collectDelete(out, toDelete);
+ collectDelete(out, inputRowSer.copy(toDelete));
}
if (toCollect != null) {
collectInsert(out, inputRow);
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
index 9ac538068f9..20efa889c18 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
@@ -21,6 +21,10 @@ package org.apache.flink.table.runtime.operators.rank;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Test;
@@ -31,6 +35,7 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+import static org.apache.flink.table.types.logical.VarCharType.MAX_LENGTH;
/** Tests for {@link RetractableTopNFunction}. */
public class RetractableTopNFunctionTest extends TopNFunctionTestBase {
@@ -509,4 +514,45 @@ public class RetractableTopNFunctionTest extends TopNFunctionTestBase {
assertorWithRowNumber.assertOutputEquals(
"output wrong.", expectedOutput, testHarness.getOutput());
}
+
+ @Test
+ public void testRetractAndThenDeleteRecordWithoutRowNumber() throws Exception {
+ AbstractTopNFunction func =
+ new RetractableTopNFunction(
+ ttlConfig,
+ InternalTypeInfo.ofFields(
+ new VarCharType(MAX_LENGTH),
+ new BigIntType(),
+ new IntType(),
+ new IntType()),
+ comparableRecordComparator,
+ sortKeySelector,
+ RankType.ROW_NUMBER,
+ new ConstantRankRange(1, 1),
+ generatedEqualiser,
+ true,
+ false);
+
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
+ testHarness.open();
+ testHarness.processElement(insertRecord("a", 1L, 10, 0));
+ testHarness.processElement(insertRecord("a", 1L, 9, 0));
+ testHarness.processElement(deleteRecord("a", 1L, 10, 0));
+ testHarness.processElement(deleteRecord("a", 1L, 9, 0));
+ testHarness.processElement(insertRecord("a", 1L, 10, 1));
+ testHarness.processElement(insertRecord("a", 1L, 9, 1));
+ testHarness.close();
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("a", 1L, 10, 0));
+ expectedOutput.add(deleteRecord("a", 1L, 10, 0));
+ expectedOutput.add(insertRecord("a", 1L, 9, 0));
+ expectedOutput.add(deleteRecord("a", 1L, 9, 0));
+ expectedOutput.add(insertRecord("a", 1L, 10, 1));
+ expectedOutput.add(deleteRecord("a", 1L, 10, 1));
+ expectedOutput.add(insertRecord("a", 1L, 9, 1));
+
+ assertorWithRowNumber.assertOutputEquals(
+ "output wrong.", expectedOutput, testHarness.getOutput());
+ }
}