You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/02/23 10:46:53 UTC
[flink] branch master updated: [FLINK-21203][table-planner-blink]
Prevent emission of identical update records in LastRowFunction
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new eb8f071 [FLINK-21203][table-planner-blink] Prevent emission of identical update records in LastRowFunction
eb8f071 is described below
commit eb8f07160f1ca4841b5e343a57e3de2e928bf993
Author: wangpeibin <wa...@gmail.com>
AuthorDate: Tue Feb 23 18:46:31 2021 +0800
[FLINK-21203][table-planner-blink] Prevent emission of identical update records in LastRowFunction
This closes #14863
---
.../exec/stream/StreamExecChangelogNormalize.java | 13 +++++-
.../nodes/exec/stream/StreamExecDeduplicate.java | 15 ++++++-
.../planner/codegen/EqualiserCodeGenerator.scala | 6 ++-
.../deduplicate/DeduplicateFunctionHelper.java | 47 ++++++++++++++++------
.../ProcTimeDeduplicateKeepLastRowFunction.java | 32 +++++++++++++--
...imeMiniBatchDeduplicateKeepLastRowFunction.java | 32 +++++++++++++--
.../ProcTimeDeduplicateFunctionTestBase.java | 14 +++++++
...ProcTimeDeduplicateKeepLastRowFunctionTest.java | 33 +++++++++++++--
...iniBatchDeduplicateKeepLastRowFunctionTest.java | 4 +-
9 files changed, 169 insertions(+), 27 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
index b0163ed..5a1a3a9 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
@@ -33,6 +34,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
@@ -81,6 +83,11 @@ public class StreamExecChangelogNormalize extends ExecNodeBase<RowData>
tableConfig
.getConfiguration()
.getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+
+ GeneratedRecordEqualiser generatedEqualiser =
+ new EqualiserCodeGenerator(rowTypeInfo.toRowType())
+ .generateRecordEqualiser("DeduplicateRowEqualiser");
+
if (isMiniBatchEnabled) {
TypeSerializer<RowData> rowSerializer =
rowTypeInfo.createSerializer(planner.getExecEnv().getConfig());
@@ -91,7 +98,8 @@ public class StreamExecChangelogNormalize extends ExecNodeBase<RowData>
stateIdleTime,
generateUpdateBefore,
true, // generateInsert
- false); // inputInsertOnly
+ false, // inputInsertOnly
+ generatedEqualiser);
CountBundleTrigger<RowData> trigger = AggregateUtil.createMiniBatchTrigger(tableConfig);
operator = new KeyedMapBundleOperator<>(processFunction, trigger);
} else {
@@ -101,7 +109,8 @@ public class StreamExecChangelogNormalize extends ExecNodeBase<RowData>
stateIdleTime,
generateUpdateBefore,
true, // generateInsert
- false); // inputInsertOnly
+ false, // inputInsertOnly
+ generatedEqualiser);
operator = new KeyedProcessOperator<>(processFunction);
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
index 633bfe1..55152fc 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
@@ -29,12 +29,14 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
@@ -106,6 +108,7 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
final TypeSerializer<RowData> rowSerializer =
rowTypeInfo.createSerializer(planner.getExecEnv().getConfig());
final OneInputStreamOperator<RowData, RowData> operator;
+
if (isRowtime) {
operator =
new RowtimeDeduplicateOperatorTranslator(
@@ -122,6 +125,7 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
planner.getTableConfig(),
rowTypeInfo,
rowSerializer,
+ inputRowType,
keepLastRow,
generateUpdateBefore)
.createDeduplicateOperator();
@@ -260,14 +264,19 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
/** Translator to create process time deduplicate operator. */
private static class ProcTimeDeduplicateOperatorTranslator
extends DeduplicateOperatorTranslator {
+ private final GeneratedRecordEqualiser generatedEqualiser;
protected ProcTimeDeduplicateOperatorTranslator(
TableConfig tableConfig,
InternalTypeInfo<RowData> rowTypeInfo,
TypeSerializer<RowData> typeSerializer,
+ RowType inputRowType,
boolean keepLastRow,
boolean generateUpdateBefore) {
super(tableConfig, rowTypeInfo, typeSerializer, keepLastRow, generateUpdateBefore);
+ generatedEqualiser =
+ new EqualiserCodeGenerator(inputRowType)
+ .generateRecordEqualiser("DeduplicateRowEqualiser");
}
@Override
@@ -282,7 +291,8 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
getMinRetentionTime(),
generateUpdateBefore,
generateInsert(),
- true);
+ true,
+ generatedEqualiser);
return new KeyedMapBundleOperator<>(processFunction, trigger);
} else {
ProcTimeMiniBatchDeduplicateKeepFirstRowFunction processFunction =
@@ -298,7 +308,8 @@ public class StreamExecDeduplicate extends ExecNodeBase<RowData>
getMinRetentionTime(),
generateUpdateBefore,
generateInsert(),
- true);
+ true,
+ generatedEqualiser);
return new KeyedProcessOperator<>(processFunction);
} else {
ProcTimeDeduplicateKeepFirstRowFunction processFunction =
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
index 850d55f..be8d480 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/EqualiserCodeGenerator.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.runtime.generated.{GeneratedRecordEqualiser, Recor
import org.apache.flink.table.runtime.types.PlannerTypeUtils
import org.apache.flink.table.types.logical.LogicalTypeRoot._
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{getFieldTypes, isCompositeType}
-import org.apache.flink.table.types.logical.{DistinctType, LogicalType}
+import org.apache.flink.table.types.logical.{DistinctType, LogicalType, RowType}
import scala.annotation.tailrec
import scala.collection.JavaConverters._
@@ -36,6 +36,10 @@ class EqualiserCodeGenerator(fieldTypes: Array[LogicalType]) {
private val LEFT_INPUT = "left"
private val RIGHT_INPUT = "right"
+ def this(rowType: RowType) = {
+ this(rowType.getChildren.asScala.toArray)
+ }
+
def generateRecordEqualiser(name: String): GeneratedRecordEqualiser = {
// ignore time zone
val ctx = CodeGeneratorContext(new TableConfig)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
index f491e3b..a068ac5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.operators.deduplicate;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
@@ -35,13 +36,17 @@ class DeduplicateFunctionHelper {
* @param generateUpdateBefore whether need to send UPDATE_BEFORE message for updates
* @param state state of function, null if generateUpdateBefore is false
* @param out underlying collector
+ * @param isStateTtlEnabled whether state ttl is disabled
+ * @param equaliser the record equaliser used to equal RowData.
*/
static void processLastRowOnProcTime(
RowData currentRow,
boolean generateUpdateBefore,
boolean generateInsert,
ValueState<RowData> state,
- Collector<RowData> out)
+ Collector<RowData> out,
+ boolean isStateTtlEnabled,
+ RecordEqualiser equaliser)
throws Exception {
checkInsertOnly(currentRow);
@@ -55,12 +60,20 @@ class DeduplicateFunctionHelper {
currentRow.setRowKind(RowKind.INSERT);
out.collect(currentRow);
} else {
- if (generateUpdateBefore) {
- preRow.setRowKind(RowKind.UPDATE_BEFORE);
- out.collect(preRow);
+ if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) {
+ // currentRow is the same as preRow and state cleaning is not enabled.
+ // We do not emit retraction and update message.
+ // If state cleaning is enabled, we have to emit messages to prevent too early
+ // state eviction of downstream operators.
+ return;
+ } else {
+ if (generateUpdateBefore) {
+ preRow.setRowKind(RowKind.UPDATE_BEFORE);
+ out.collect(preRow);
+ }
+ currentRow.setRowKind(RowKind.UPDATE_AFTER);
+ out.collect(currentRow);
}
- currentRow.setRowKind(RowKind.UPDATE_AFTER);
- out.collect(currentRow);
}
} else {
// always send UPDATE_AFTER if INSERT is not needed
@@ -86,7 +99,9 @@ class DeduplicateFunctionHelper {
RowData currentRow,
boolean generateUpdateBefore,
ValueState<RowData> state,
- Collector<RowData> out)
+ Collector<RowData> out,
+ boolean isStateTtlEnabled,
+ RecordEqualiser equaliser)
throws Exception {
RowData preRow = state.value();
RowKind currentKind = currentRow.getRowKind();
@@ -96,12 +111,20 @@ class DeduplicateFunctionHelper {
currentRow.setRowKind(RowKind.INSERT);
out.collect(currentRow);
} else {
- if (generateUpdateBefore) {
- preRow.setRowKind(RowKind.UPDATE_BEFORE);
- out.collect(preRow);
+ if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) {
+ // currentRow is the same as preRow and state cleaning is not enabled.
+ // We do not emit retraction and update message.
+ // If state cleaning is enabled, we have to emit messages to prevent too early
+ // state eviction of downstream operators.
+ return;
+ } else {
+ if (generateUpdateBefore) {
+ preRow.setRowKind(RowKind.UPDATE_BEFORE);
+ out.collect(preRow);
+ }
+ currentRow.setRowKind(RowKind.UPDATE_AFTER);
+ out.collect(currentRow);
}
- currentRow.setRowKind(RowKind.UPDATE_AFTER);
- out.collect(currentRow);
}
// normalize row kind
currentRow.setRowKind(RowKind.INSERT);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
index 4ac2f1e..979b726 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
@@ -18,7 +18,10 @@
package org.apache.flink.table.runtime.operators.deduplicate;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;
@@ -33,26 +36,49 @@ public class ProcTimeDeduplicateKeepLastRowFunction
private final boolean generateUpdateBefore;
private final boolean generateInsert;
private final boolean inputIsInsertOnly;
+ private final boolean isStateTtlEnabled;
+ /** The code generated equaliser used to equal RowData. */
+ private final GeneratedRecordEqualiser genRecordEqualiser;
+
+ /** The record equaliser used to equal RowData. */
+ private transient RecordEqualiser equaliser;
public ProcTimeDeduplicateKeepLastRowFunction(
InternalTypeInfo<RowData> typeInfo,
long stateRetentionTime,
boolean generateUpdateBefore,
boolean generateInsert,
- boolean inputInsertOnly) {
+ boolean inputInsertOnly,
+ GeneratedRecordEqualiser genRecordEqualiser) {
super(typeInfo, null, stateRetentionTime);
this.generateUpdateBefore = generateUpdateBefore;
this.generateInsert = generateInsert;
this.inputIsInsertOnly = inputInsertOnly;
+ this.genRecordEqualiser = genRecordEqualiser;
+ this.isStateTtlEnabled = stateRetentionTime > 0;
+ }
+
+ @Override
+ public void open(Configuration configure) throws Exception {
+ super.open(configure);
+ equaliser = genRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
}
@Override
public void processElement(RowData input, Context ctx, Collector<RowData> out)
throws Exception {
if (inputIsInsertOnly) {
- processLastRowOnProcTime(input, generateUpdateBefore, generateInsert, state, out);
+ processLastRowOnProcTime(
+ input,
+ generateUpdateBefore,
+ generateInsert,
+ state,
+ out,
+ isStateTtlEnabled,
+ equaliser);
} else {
- processLastRowOnChangelog(input, generateUpdateBefore, state, out);
+ processLastRowOnChangelog(
+ input, generateUpdateBefore, state, out, isStateTtlEnabled, equaliser);
}
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
index 43af1d2..eb51008 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
@@ -20,6 +20,9 @@ package org.apache.flink.table.runtime.operators.deduplicate;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;
@@ -39,6 +42,12 @@ public class ProcTimeMiniBatchDeduplicateKeepLastRowFunction
private final boolean generateUpdateBefore;
private final boolean generateInsert;
private final boolean inputInsertOnly;
+ private final boolean isStateTtlEnabled;
+ // The code generated equaliser used to equal RowData.
+ private final GeneratedRecordEqualiser genRecordEqualiser;
+
+ // The record equaliser used to equal RowData.
+ private transient RecordEqualiser equaliser;
public ProcTimeMiniBatchDeduplicateKeepLastRowFunction(
InternalTypeInfo<RowData> typeInfo,
@@ -46,12 +55,22 @@ public class ProcTimeMiniBatchDeduplicateKeepLastRowFunction
long stateRetentionTime,
boolean generateUpdateBefore,
boolean generateInsert,
- boolean inputInsertOnly) {
+ boolean inputInsertOnly,
+ GeneratedRecordEqualiser genRecordEqualiser) {
super(typeInfo, stateRetentionTime);
this.serializer = serializer;
this.generateUpdateBefore = generateUpdateBefore;
this.generateInsert = generateInsert;
this.inputInsertOnly = inputInsertOnly;
+ this.genRecordEqualiser = genRecordEqualiser;
+ this.isStateTtlEnabled = stateRetentionTime > 0;
+ }
+
+ @Override
+ public void open(ExecutionContext ctx) throws Exception {
+ super.open(ctx);
+ equaliser =
+ genRecordEqualiser.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
}
@Override
@@ -69,9 +88,16 @@ public class ProcTimeMiniBatchDeduplicateKeepLastRowFunction
ctx.setCurrentKey(currentKey);
if (inputInsertOnly) {
processLastRowOnProcTime(
- currentRow, generateUpdateBefore, generateInsert, state, out);
+ currentRow,
+ generateUpdateBefore,
+ generateInsert,
+ state,
+ out,
+ isStateTtlEnabled,
+ equaliser);
} else {
- processLastRowOnChangelog(currentRow, generateUpdateBefore, state, out);
+ processLastRowOnChangelog(
+ currentRow, generateUpdateBefore, state, out, isStateTtlEnabled, equaliser);
}
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
index d7721a3..9473f45 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
@@ -20,10 +20,13 @@ package org.apache.flink.table.runtime.operators.deduplicate;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.runtime.util.RowDataRecordEqualiser;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -45,4 +48,15 @@ abstract class ProcTimeDeduplicateFunctionTestBase {
inputRowType.toRowFieldTypes(),
new GenericRowRecordSortComparator(
rowKeyIdx, inputRowType.toRowFieldTypes()[rowKeyIdx]));
+
+ static GeneratedRecordEqualiser generatedEqualiser =
+ new GeneratedRecordEqualiser("", "", new Object[0]) {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public RecordEqualiser newInstance(ClassLoader classLoader) {
+ return new RowDataRecordEqualiser();
+ }
+ };
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunctionTest.java
index 5611642..e1e7fcc 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunctionTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunctionTest.java
@@ -35,11 +35,21 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBefore
/** Tests for {@link ProcTimeDeduplicateKeepLastRowFunction}. */
public class ProcTimeDeduplicateKeepLastRowFunctionTest
extends ProcTimeDeduplicateFunctionTestBase {
+ private ProcTimeDeduplicateKeepLastRowFunction createFunctionWithoutStateTtl(
+ boolean generateUpdateBefore, boolean generateInsert) {
+ return new ProcTimeDeduplicateKeepLastRowFunction(
+ inputRowType, 0, generateUpdateBefore, generateInsert, true, generatedEqualiser);
+ }
private ProcTimeDeduplicateKeepLastRowFunction createFunction(
boolean generateUpdateBefore, boolean generateInsert) {
return new ProcTimeDeduplicateKeepLastRowFunction(
- inputRowType, minTime.toMilliseconds(), generateUpdateBefore, generateInsert, true);
+ inputRowType,
+ minTime.toMilliseconds(),
+ generateUpdateBefore,
+ generateInsert,
+ true,
+ generatedEqualiser);
}
private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
@@ -92,14 +102,31 @@ public class ProcTimeDeduplicateKeepLastRowFunctionTest
testHarness.open();
testHarness.processElement(insertRecord("book", 1L, 12));
testHarness.processElement(insertRecord("book", 2L, 11));
- testHarness.processElement(insertRecord("book", 1L, 13));
+ testHarness.processElement(insertRecord("book", 1L, 12));
testHarness.close();
// Keep LastRow in deduplicate may send UPDATE_BEFORE
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(insertRecord("book", 1L, 12));
expectedOutput.add(updateBeforeRecord("book", 1L, 12));
- expectedOutput.add(updateAfterRecord("book", 1L, 13));
+ expectedOutput.add(updateAfterRecord("book", 1L, 12));
+ expectedOutput.add(insertRecord("book", 2L, 11));
+ assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
+ }
+
+ @Test
+ public void testWithStateTtlDisabled() throws Exception {
+ ProcTimeDeduplicateKeepLastRowFunction func = createFunctionWithoutStateTtl(true, true);
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
+ testHarness.open();
+ testHarness.processElement(insertRecord("book", 1L, 12));
+ testHarness.processElement(insertRecord("book", 2L, 11));
+ testHarness.processElement(insertRecord("book", 1L, 12));
+ testHarness.close();
+
+ // Keep LastRow in deduplicate may send UPDATE_BEFORE
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("book", 1L, 12));
expectedOutput.add(insertRecord("book", 2L, 11));
assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest.java
index fa1d4b9..6af34f7 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest.java
@@ -46,13 +46,15 @@ public class ProcTimeMiniBatchDeduplicateKeepLastRowFunctionTest
private ProcTimeMiniBatchDeduplicateKeepLastRowFunction createFunction(
boolean generateUpdateBefore, boolean generateInsert, long minRetentionTime) {
+
return new ProcTimeMiniBatchDeduplicateKeepLastRowFunction(
inputRowType,
typeSerializer,
minRetentionTime,
generateUpdateBefore,
generateInsert,
- true);
+ true,
+ generatedEqualiser);
}
private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(