You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/02/23 10:46:53 UTC

[flink] branch master updated: [FLINK-21203][table-planner-blink] Prevent emission of identical update records in LastRowFunction

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new eb8f071  [FLINK-21203][table-planner-blink] Prevent emission of identical update records in LastRowFunction
eb8f071 is described below

commit eb8f07160f1ca4841b5e343a57e3de2e928bf993
Author: wangpeibin <wa...@gmail.com>
AuthorDate: Tue Feb 23 18:46:31 2021 +0800

    [FLINK-21203][table-planner-blink] Prevent emission of identical update records in LastRowFunction
    
    This closes #14863
---
 .../exec/stream/StreamExecChangelogNormalize.java  | 13 +++++-
 .../nodes/exec/stream/StreamExecDeduplicate.java   | 15 ++++++-
 .../planner/codegen/EqualiserCodeGenerator.scala   |  6 ++-
 .../deduplicate/DeduplicateFunctionHelper.java     | 47 ++++++++++++++++------
 .../ProcTimeDeduplicateKeepLastRowFunction.java    | 32 +++++++++++++--
 ...imeMiniBatchDeduplicateKeepLastRowFunction.java | 32 +++++++++++++--
 .../ProcTimeDeduplicateFunctionTestBase.java       | 14 +++++++
 ...ProcTimeDeduplicateKeepLastRowFunctionTest.java | 33 +++++++++++++--
 ...iniBatchDeduplicateKeepLastRowFunctionTest.java |  4 +-
 9 files changed, 169 insertions(+), 27 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
index b0163ed..5a1a3a9 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
@@ -33,6 +34,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
 import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
@@ -81,6 +83,11 @@ public class StreamExecChangelogNormalize extends ExecNodeBase<RowData>
                 tableConfig
                         .getConfiguration()
                         .getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+
+        GeneratedRecordEqualiser generatedEqualiser =
+                new EqualiserCodeGenerator(rowTypeInfo.toRowType())
+                        .generateRecordEqualiser("DeduplicateRowEqualiser");
+
         if (isMiniBatchEnabled) {
             TypeSerializer<RowData> rowSerializer =
                     rowTypeInfo.createSerializer(planner.getExecEnv().getConfig());
@@ -91,7 +98,8 @@ public class StreamExecChangelogNormalize extends ExecNodeBase<RowData>
                             stateIdleTime,
                             generateUpdateBefore,
                             true, // generateInsert
-                            false); // inputInsertOnly
+                            false, // inputInsertOnly
+                            generatedEqualiser);
             CountBundleTrigger<RowData> trigger = AggregateUtil.createMiniBatchTrigger(tableConfig);
             operator = new KeyedMapBundleOperator<>(processFunction, trigger);
         } else {
@@ -101,7 +109,8 @@ public class StreamExecChangelogNormalize extends ExecNodeBase<RowData>
                             stateIdleTime,
                             generateUpdateBefore,
                             true, // generateInsert
-                            false); // inputInsertOnly
+                            false, // inputInsertOnly
+                            generatedEqualiser);
             operator = new KeyedProcessOperator<>(processFunction);
         }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
index 633bfe1..55152fc 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
@@ -29,12 +29,14 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
 import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
@@ -106,6 +108,7 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
         final TypeSerializer<RowData> rowSerializer =
                 rowTypeInfo.createSerializer(planner.getExecEnv().getConfig());
         final OneInputStreamOperator<RowData, RowData> operator;
+
         if (isRowtime) {
             operator =
                     new RowtimeDeduplicateOperatorTranslator(
@@ -122,6 +125,7 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
                                     planner.getTableConfig(),
                                     rowTypeInfo,
                                     rowSerializer,
+                                    inputRowType,
                                     keepLastRow,
                                     generateUpdateBefore)
                             .createDeduplicateOperator();
@@ -260,14 +264,19 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
     /** Translator to create process time deduplicate operator. */
     private static class ProcTimeDeduplicateOperatorTranslator
             extends DeduplicateOperatorTranslator {
+        private final GeneratedRecordEqualiser generatedEqualiser;
 
         protected ProcTimeDeduplicateOperatorTranslator(
                 TableConfig tableConfig,
                 InternalTypeInfo<RowData> rowTypeInfo,
                 TypeSerializer<RowData> typeSerializer,
+                RowType inputRowType,
                 boolean keepLastRow,
                 boolean generateUpdateBefore) {
             super(tableConfig, rowTypeInfo, typeSerializer, keepLastRow, generateUpdateBefore);
+            generatedEqualiser =
+                    new EqualiserCodeGenerator(inputRowType)
+                            .generateRecordEqualiser("DeduplicateRowEqualiser");
         }
 
         @Override
@@ -282,7 +291,8 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
                                     getMinRetentionTime(),
                                     generateUpdateBefore,
                                     generateInsert(),
-                                    true);
+                                    true,
+                                    generatedEqualiser);
                     return new KeyedMapBundleOperator<>(processFunction, trigger);
                 } else {
                     ProcTimeMiniBatchDeduplicateKeepFirstRowFunction processFunction =
@@ -298,7 +308,8 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
                                     getMinRetentionTime(),
                                     generateUpdateBefore,
                                     generateInsert(),
-                                    true);
+                                    true,
+                                    generatedEqualiser);
                     return new KeyedProcessOperator<>(processFunction);
                 } else {
                     ProcTimeDeduplicateKeepFirstRowFunction processFunction =
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
index 850d55f..be8d480 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.runtime.generated.{GeneratedRecordEqualiser, Recor
 import org.apache.flink.table.runtime.types.PlannerTypeUtils
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldTypes, isCompositeType}
-import org.apache.flink.table.types.logical.{DistinctType, LogicalType}
+import org.apache.flink.table.types.logical.{DistinctType, LogicalType, RowType}
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
@@ -36,6 +36,10 @@ class EqualiserCodeGenerator(fieldTypes: Array[LogicalType]) {
   private val LEFT_INPUT = "left"
   private val RIGHT_INPUT = "right"
 
+  def this(rowType: RowType) = {
+    this(rowType.getChildren.asScala.toArray)
+  }
+
   def generateRecordEqualiser(name: String): GeneratedRecordEqualiser = {
     // ignore time zone
     val ctx = CodeGeneratorContext(new TableConfig)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
index f491e3b..a068ac5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.operators.deduplicate;
 
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -35,13 +36,17 @@ class DeduplicateFunctionHelper {
      * @param generateUpdateBefore whether need to send UPDATE_BEFORE message for updates
      * @param state state of function, null if generateUpdateBefore is false
      * @param out underlying collector
+     * @param isStateTtlEnabled whether state ttl is disabled
+     * @param equaliser the record equaliser used to equal RowData.
      */
     static void processLastRowOnProcTime(
             RowData currentRow,
             boolean generateUpdateBefore,
             boolean generateInsert,
             ValueState<RowData> state,
-            Collector<RowData> out)
+            Collector<RowData> out,
+            boolean isStateTtlEnabled,
+            RecordEqualiser equaliser)
             throws Exception {
 
         checkInsertOnly(currentRow);
@@ -55,12 +60,20 @@ class DeduplicateFunctionHelper {
                 currentRow.setRowKind(RowKind.INSERT);
                 out.collect(currentRow);
             } else {
-                if (generateUpdateBefore) {
-                    preRow.setRowKind(RowKind.UPDATE_BEFORE);
-                    out.collect(preRow);
+                if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) {
+                    // currentRow is the same as preRow and state cleaning is not enabled.
+                    // We do not emit retraction and update message.
+                    // If state cleaning is enabled, we have to emit messages to prevent too early
+                    // state eviction of downstream operators.
+                    return;
+                } else {
+                    if (generateUpdateBefore) {
+                        preRow.setRowKind(RowKind.UPDATE_BEFORE);
+                        out.collect(preRow);
+                    }
+                    currentRow.setRowKind(RowKind.UPDATE_AFTER);
+                    out.collect(currentRow);
                 }
-                currentRow.setRowKind(RowKind.UPDATE_AFTER);
-                out.collect(currentRow);
             }
         } else {
             // always send UPDATE_AFTER if INSERT is not needed
@@ -86,7 +99,9 @@ class DeduplicateFunctionHelper {
             RowData currentRow,
             boolean generateUpdateBefore,
             ValueState<RowData> state,
-            Collector<RowData> out)
+            Collector<RowData> out,
+            boolean isStateTtlEnabled,
+            RecordEqualiser equaliser)
             throws Exception {
         RowData preRow = state.value();
         RowKind currentKind = currentRow.getRowKind();
@@ -96,12 +111,20 @@ class DeduplicateFunctionHelper {
                 currentRow.setRowKind(RowKind.INSERT);
                 out.collect(currentRow);
             } else {
-                if (generateUpdateBefore) {
-                    preRow.setRowKind(RowKind.UPDATE_BEFORE);
-                    out.collect(preRow);
+                if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) {
+                    // currentRow is the same as preRow and state cleaning is not enabled.
+                    // We do not emit retraction and update message.
+                    // If state cleaning is enabled, we have to emit messages to prevent too early
+                    // state eviction of downstream operators.
+                    return;
+                } else {
+                    if (generateUpdateBefore) {
+                        preRow.setRowKind(RowKind.UPDATE_BEFORE);
+                        out.collect(preRow);
+                    }
+                    currentRow.setRowKind(RowKind.UPDATE_AFTER);
+                    out.collect(currentRow);
                 }
-                currentRow.setRowKind(RowKind.UPDATE_AFTER);
-                out.collect(currentRow);
             }
             // normalize row kind
             currentRow.setRowKind(RowKind.INSERT);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
index 4ac2f1e..979b726 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.table.runtime.operators.deduplicate;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.util.Collector;
 
@@ -33,26 +36,49 @@ public class ProcTimeDeduplicateKeepLastRowFunction
     private final boolean generateUpdateBefore;
     private final boolean generateInsert;
     private final boolean inputIsInsertOnly;
+    private final boolean isStateTtlEnabled;
+    /** The code generated equaliser used to equal RowData. */
+    private final GeneratedRecordEqualiser genRecordEqualiser;
+
+    /** The record equaliser used to equal RowData. */
+    private transient RecordEqualiser equaliser;
 
     public ProcTimeDeduplicateKeepLastRowFunction(
             InternalTypeInfo<RowData> typeInfo,
             long stateRetentionTime,
             boolean generateUpdateBefore,
             boolean generateInsert,
-            boolean inputInsertOnly) {
+            boolean inputInsertOnly,
+            GeneratedRecordEqualiser genRecordEqualiser) {
         super(typeInfo, null, stateRetentionTime);
         this.generateUpdateBefore = generateUpdateBefore;
         this.generateInsert = generateInsert;
         this.inputIsInsertOnly = inputInsertOnly;
+        this.genRecordEqualiser = genRecordEqualiser;
+        this.isStateTtlEnabled = stateRetentionTime > 0;
+    }
+
+    @Override
+    public void open(Configuration configure) throws Exception {
+        super.open(configure);
+        equaliser = genRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
     }
 
     @Override
     public void processElement(RowData input, Context ctx, Collector<RowData> out)
             throws Exception {
         if (inputIsInsertOnly) {
-            processLastRowOnProcTime(input, generateUpdateBefore, generateInsert, state, out);
+            processLastRowOnProcTime(
+                    input,
+                    generateUpdateBefore,
+                    generateInsert,
+                    state,
+                    out,
+                    isStateTtlEnabled,
+                    equaliser);
         } else {
-            processLastRowOnChangelog(input, generateUpdateBefore, state, out);
+            processLastRowOnChangelog(
+                    input, generateUpdateBefore, state, out, isStateTtlEnabled, equaliser);
         }
     }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
index 43af1d2..eb51008 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
@@ -20,6 +20,9 @@ package org.apache.flink.table.runtime.operators.deduplicate;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.util.Collector;
 
@@ -39,6 +42,12 @@ public class ProcTimeMiniBatchDeduplicateKeepLastRowFunction
     private final boolean generateUpdateBefore;
     private final boolean generateInsert;
     private final boolean inputInsertOnly;
+    private final boolean isStateTtlEnabled;
+    // The code generated equaliser used to equal RowData.
+    private final GeneratedRecordEqualiser genRecordEqualiser;
+
+    // The record equaliser used to equal RowData.
+    private transient RecordEqualiser equaliser;
 
     public ProcTimeMiniBatchDeduplicateKeepLastRowFunction(
             InternalTypeInfo<RowData> typeInfo,
@@ -46,12 +55,22 @@ public class ProcTimeMiniBatchDeduplicateKeepLastRowFunction
             long stateRetentionTime,
             boolean generateUpdateBefore,
             boolean generateInsert,
-            boolean inputInsertOnly) {
+            boolean inputInsertOnly,
+            GeneratedRecordEqualiser genRecordEqualiser) {
         super(typeInfo, stateRetentionTime);
         this.serializer = serializer;
         this.generateUpdateBefore = generateUpdateBefore;
         this.generateInsert = generateInsert;
         this.inputInsertOnly = inputInsertOnly;
+        this.genRecordEqualiser = genRecordEqualiser;
+        this.isStateTtlEnabled = stateRetentionTime > 0;
+    }
+
+    @Override
+    public void open(ExecutionContext ctx) throws Exception {
+        super.open(ctx);
+        equaliser =
+                genRecordEqualiser.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
     }
 
     @Override
@@ -69,9 +88,16 @@ public class ProcTimeMiniBatchDeduplicateKeepLastRowFunction
             ctx.setCurrentKey(currentKey);
             if (inputInsertOnly) {
                 processLastRowOnProcTime(
-                        currentRow, generateUpdateBefore, generateInsert, state, out);
+                        currentRow,
+                        generateUpdateBefore,
+                        generateInsert,
+                        state,
+                        out,
+                        isStateTtlEnabled,
+                        equaliser);
             } else {
-                processLastRowOnChangelog(currentRow, generateUpdateBefore, state, out);
+                processLastRowOnChangelog(
+                        currentRow, generateUpdateBefore, state, out, isStateTtlEnabled, equaliser);
             }
         }
     }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
index d7721a3..9473f45 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
@@ -20,10 +20,13 @@ package org.apache.flink.table.runtime.operators.deduplicate;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
 import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
 import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.runtime.util.RowDataRecordEqualiser;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -45,4 +48,15 @@ abstract class ProcTimeDeduplicateFunctionTestBase {
                     inputRowType.toRowFieldTypes(),
                     new GenericRowRecordSortComparator(
                             rowKeyIdx, inputRowType.toRowFieldTypes()[rowKeyIdx]));
+
+    static GeneratedRecordEqualiser generatedEqualiser =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new RowDataRecordEqualiser();
+                }
+            };
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunctionTest.java
index 5611642..e1e7fcc 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunctionTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunctionTest.java
@@ -35,11 +35,21 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBefore
 /** Tests for {@link ProcTimeDeduplicateKeepLastRowFunction}. */
 public class ProcTimeDeduplicateKeepLastRowFunctionTest
         extends ProcTimeDeduplicateFunctionTestBase {
+    private ProcTimeDeduplicateKeepLastRowFunction createFunctionWithoutStateTtl(
+            boolean generateUpdateBefore, boolean generateInsert) {
+        return new ProcTimeDeduplicateKeepLastRowFunction(
+                inputRowType, 0, generateUpdateBefore, generateInsert, true, generatedEqualiser);
+    }
 
     private ProcTimeDeduplicateKeepLastRowFunction createFunction(
             boolean generateUpdateBefore, boolean generateInsert) {
         return new ProcTimeDeduplicateKeepLastRowFunction(
-                inputRowType, minTime.toMilliseconds(), generateUpdateBefore, generateInsert, true);
+                inputRowType,
+                minTime.toMilliseconds(),
+                generateUpdateBefore,
+                generateInsert,
+                true,
+                generatedEqualiser);
     }
 
     private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
@@ -92,14 +102,31 @@ public class ProcTimeDeduplicateKeepLastRowFunctionTest
         testHarness.open();
         testHarness.processElement(insertRecord("book", 1L, 12));
         testHarness.processElement(insertRecord("book", 2L, 11));
-        testHarness.processElement(insertRecord("book", 1L, 13));
+        testHarness.processElement(insertRecord("book", 1L, 12));
         testHarness.close();
 
         // Keep LastRow in deduplicate may send UPDATE_BEFORE
         List<Object> expectedOutput = new ArrayList<>();
         expectedOutput.add(insertRecord("book", 1L, 12));
         expectedOutput.add(updateBeforeRecord("book", 1L, 12));
-        expectedOutput.add(updateAfterRecord("book", 1L, 13));
+        expectedOutput.add(updateAfterRecord("book", 1L, 12));
+        expectedOutput.add(insertRecord("book", 2L, 11));
+        assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
+    }
+
+    @Test
+    public void testWithStateTtlDisabled() throws Exception {
+        ProcTimeDeduplicateKeepLastRowFunction func = createFunctionWithoutStateTtl(true, true);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
+        testHarness.open();
+        testHarness.processElement(insertRecord("book", 1L, 12));
+        testHarness.processElement(insertRecord("book", 2L, 11));
+        testHarness.processElement(insertRecord("book", 1L, 12));
+        testHarness.close();
+
+        // Keep LastRow in deduplicate may send UPDATE_BEFORE
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("book", 1L, 12));
         expectedOutput.add(insertRecord("book", 2L, 11));
         assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
     }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest.java
index fa1d4b9..6af34f7 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest.java
@@ -46,13 +46,15 @@ public class ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest
 
     private ProcTimeMiniBatchDeduplicateKeepLastRowFunction createFunction(
             boolean generateUpdateBefore, boolean generateInsert, long minRetentionTime) {
+
         return new ProcTimeMiniBatchDeduplicateKeepLastRowFunction(
                 inputRowType,
                 typeSerializer,
                 minRetentionTime,
                 generateUpdateBefore,
                 generateInsert,
-                true);
+                true,
+                generatedEqualiser);
     }
 
     private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(