You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/11/03 03:09:06 UTC

[flink] branch master updated: [FLINK-23015][table-runtime] Implement streaming window Deduplicate operator

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 59d6a18  [FLINK-23015][table-runtime] Implement streaming window Deduplicate operator
59d6a18 is described below

commit 59d6a189b0aa4d8ccbbc39abb73fe317f0abfcf1
Author: Jing Zhang <be...@gmail.com>
AuthorDate: Wed Nov 3 11:08:43 2021 +0800

    [FLINK-23015][table-runtime] Implement streaming window Deduplicate operator
    
    This closes #17571
---
 .../deduplicate/DeduplicateFunctionHelper.java     |   4 +-
 .../RowTimeWindowDeduplicateOperatorBuilder.java   | 113 ++++++++
 .../RowTimeDeduplicateRecordsCombiner.java         | 145 ++++++++++
 .../RowTimeWindowDeduplicateProcessor.java         | 144 ++++++++++
 .../RowTimeWindowDeduplicateOperatorTest.java      | 306 +++++++++++++++++++++
 5 files changed, 710 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
index a068ac5..2186ee4 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
 /** Utility for deduplicate function. */
-class DeduplicateFunctionHelper {
+public class DeduplicateFunctionHelper {
 
     /**
      * Processes element to deduplicate on keys with process time semantic, sends current element as
@@ -207,7 +207,7 @@ class DeduplicateFunctionHelper {
     }
 
     /** Returns current row is duplicate row or not compared to previous row. */
-    static boolean isDuplicate(
+    public static boolean isDuplicate(
             RowData preRow, RowData currentRow, int rowtimeIndex, boolean keepLastRow) {
         if (keepLastRow) {
             return preRow == null
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
new file mode 100644
index 0000000..fe22b31
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
+import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
+import org.apache.flink.table.runtime.operators.deduplicate.window.combines.RowTimeDeduplicateRecordsCombiner;
+import org.apache.flink.table.runtime.operators.deduplicate.window.processors.RowTimeWindowDeduplicateProcessor;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator;
+import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
+
+import java.time.ZoneId;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link RowTimeWindowDeduplicateOperatorBuilder} is used to build a {@link
+ * SlicingWindowOperator} for rowtime window deduplicate.
+ *
+ * <pre>
+ * RowTimeWindowDeduplicateOperatorBuilder.builder()
+ *   .inputSerializer(inputSerializer)
+ *   .keySerializer(keySerializer)
+ *   .keepLastRow(true)
+ *   .rowtimeIndex(0)
+ *   .windowEndIndex(windowEndIndex)
+ *   .build();
+ * </pre>
+ */
+public class RowTimeWindowDeduplicateOperatorBuilder {
+
+    public static RowTimeWindowDeduplicateOperatorBuilder builder() {
+        return new RowTimeWindowDeduplicateOperatorBuilder();
+    }
+
+    private AbstractRowDataSerializer<RowData> inputSerializer;
+    private PagedTypeSerializer<RowData> keySerializer;
+    private int rowtimeIndex;
+    private int windowEndIndex = -1;
+    private ZoneId shiftTimeZone;
+    private boolean keepLastRow;
+
+    public RowTimeWindowDeduplicateOperatorBuilder inputSerializer(
+            AbstractRowDataSerializer<RowData> inputSerializer) {
+        this.inputSerializer = inputSerializer;
+        return this;
+    }
+
+    public RowTimeWindowDeduplicateOperatorBuilder shiftTimeZone(ZoneId shiftTimeZone) {
+        this.shiftTimeZone = shiftTimeZone;
+        return this;
+    }
+
+    public RowTimeWindowDeduplicateOperatorBuilder keySerializer(
+            PagedTypeSerializer<RowData> keySerializer) {
+        this.keySerializer = keySerializer;
+        return this;
+    }
+
+    public RowTimeWindowDeduplicateOperatorBuilder keepLastRow(boolean keepLastRow) {
+        this.keepLastRow = keepLastRow;
+        return this;
+    }
+
+    public RowTimeWindowDeduplicateOperatorBuilder rowtimeIndex(int rowtimeIndex) {
+        this.rowtimeIndex = rowtimeIndex;
+        return this;
+    }
+
+    public RowTimeWindowDeduplicateOperatorBuilder windowEndIndex(int windowEndIndex) {
+        this.windowEndIndex = windowEndIndex;
+        return this;
+    }
+
+    public SlicingWindowOperator<RowData, ?> build() {
+        checkNotNull(inputSerializer);
+        checkNotNull(keySerializer);
+        checkArgument(
+                windowEndIndex >= 0,
+                String.format(
+                        "Illegal window end index %s, it should not be negative!", windowEndIndex));
+        final RecordsCombiner.Factory combinerFactory =
+                new RowTimeDeduplicateRecordsCombiner.Factory(
+                        inputSerializer, rowtimeIndex, keepLastRow);
+        final WindowBuffer.Factory bufferFactory =
+                new RecordsWindowBuffer.Factory(keySerializer, inputSerializer, combinerFactory);
+        final SlicingWindowProcessor<Long> windowProcessor =
+                new RowTimeWindowDeduplicateProcessor(
+                        inputSerializer, bufferFactory, windowEndIndex, shiftTimeZone);
+        return new SlicingWindowOperator<>(windowProcessor);
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
new file mode 100644
index 0000000..bde7e73
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {
+                // the incoming record is reused, we should copy it
+                bufferedResult = recordSerializer.copy(record);
+            }
+        }
+        if (bufferedResult == null) {
+            return;
+        }
+        // step 2: flush data into state
+        keyContext.setCurrentKey(windowKey.getKey());
+        Long window = windowKey.getWindow();
+        RowData preRow = dataState.value(window);
+        if (isDuplicate(preRow, bufferedResult, rowtimeIndex, keepLastRow)) {
+            dataState.update(window, bufferedResult);
+        }
+        // step 3: register timer for current window
+        timerService.registerEventTimeWindowTimer(window);
+    }
+
+    @Override
+    public void close() throws Exception {}
+
+    // ----------------------------------------------------------------------------------------
+    // Factory
+    // ----------------------------------------------------------------------------------------
+
+    /** Factory to create {@link RowTimeDeduplicateRecordsCombiner}. */
+    public static final class Factory implements RecordsCombiner.Factory {
+
+        private static final long serialVersionUID = 1L;
+
+        private final TypeSerializer<RowData> recordSerializer;
+        private final int rowtimeIndex;
+        private final boolean keepLastRow;
+
+        public Factory(
+                TypeSerializer<RowData> recordSerializer, int rowtimeIndex, boolean keepLastRow) {
+            this.recordSerializer = recordSerializer;
+            this.rowtimeIndex = rowtimeIndex;
+            this.keepLastRow = keepLastRow;
+        }
+
+        @Override
+        public RecordsCombiner createRecordsCombiner(
+                RuntimeContext runtimeContext,
+                WindowTimerService<Long> timerService,
+                KeyedStateBackend<RowData> stateBackend,
+                WindowState<Long> windowState,
+                boolean isEventTime)
+                throws Exception {
+            WindowValueState<Long> windowMapState = (WindowValueState<Long>) windowState;
+            return new RowTimeDeduplicateRecordsCombiner(
+                    timerService,
+                    stateBackend::setCurrentKey,
+                    windowMapState,
+                    rowtimeIndex,
+                    keepLastRow,
+                    recordSerializer);
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/processors/RowTimeWindowDeduplicateProcessor.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/processors/RowTimeWindowDeduplicateProcessor.java
new file mode 100644
index 0000000..e3abae2
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/processors/RowTimeWindowDeduplicateProcessor.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.processors;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
+import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+
+import java.time.ZoneId;
+
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
+
+/** A rowtime window deduplicate processor. */
+public final class RowTimeWindowDeduplicateProcessor implements SlicingWindowProcessor<Long> {
+    private static final long serialVersionUID = 1L;
+
+    private final WindowBuffer.Factory bufferFactory;
+    private final TypeSerializer<RowData> inputSerializer;
+    private final int windowEndIndex;
+    private final ZoneId shiftTimeZone;
+
+    // ----------------------------------------------------------------------------------------
+
+    private transient long currentProgress;
+
+    private transient Context<Long> ctx;
+
+    private transient WindowTimerService<Long> windowTimerService;
+
+    private transient WindowBuffer windowBuffer;
+
+    /** state schema: [key, window_end, first/last record]. */
+    private transient WindowValueState<Long> windowState;
+
+    public RowTimeWindowDeduplicateProcessor(
+            TypeSerializer<RowData> inputSerializer,
+            WindowBuffer.Factory bufferFactory,
+            int windowEndIndex,
+            ZoneId shiftTimeZone) {
+        this.inputSerializer = inputSerializer;
+        this.bufferFactory = bufferFactory;
+        this.windowEndIndex = windowEndIndex;
+        this.shiftTimeZone = shiftTimeZone;
+    }
+
+    @Override
+    public void open(Context<Long> context) throws Exception {
+        this.ctx = context;
+        final LongSerializer namespaceSerializer = LongSerializer.INSTANCE;
+        ValueStateDescriptor<RowData> valueStateDescriptor =
+                new ValueStateDescriptor<>("window_deduplicate", inputSerializer);
+        ValueState<RowData> state =
+                ctx.getKeyedStateBackend()
+                        .getOrCreateKeyedState(namespaceSerializer, valueStateDescriptor);
+        this.windowTimerService = new WindowTimerServiceImpl(ctx.getTimerService(), shiftTimeZone);
+        this.windowState =
+                new WindowValueState<>((InternalValueState<RowData, Long, RowData>) state);
+        this.windowBuffer =
+                bufferFactory.create(
+                        ctx.getOperatorOwner(),
+                        ctx.getMemoryManager(),
+                        ctx.getMemorySize(),
+                        ctx.getRuntimeContext(),
+                        windowTimerService,
+                        ctx.getKeyedStateBackend(),
+                        windowState,
+                        true,
+                        shiftTimeZone);
+        this.currentProgress = Long.MIN_VALUE;
+    }
+
+    @Override
+    public boolean processElement(RowData key, RowData element) throws Exception {
+        long sliceEnd = element.getLong(windowEndIndex);
+        if (isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {
+            // element is late and should be dropped
+            return true;
+        }
+        windowBuffer.addElement(key, sliceEnd, element);
+        return false;
+    }
+
+    @Override
+    public void advanceProgress(long progress) throws Exception {
+        if (progress > currentProgress) {
+            currentProgress = progress;
+            windowBuffer.advanceProgress(currentProgress);
+        }
+    }
+
+    @Override
+    public void prepareCheckpoint() throws Exception {
+        windowBuffer.flush();
+    }
+
+    @Override
+    public void clearWindow(Long windowEnd) throws Exception {
+        windowState.clear(windowEnd);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (windowBuffer != null) {
+            windowBuffer.close();
+        }
+    }
+
+    @Override
+    public TypeSerializer<Long> createWindowSerializer() {
+        return LongSerializer.INSTANCE;
+    }
+
+    @Override
+    public void fireWindow(Long windowEnd) throws Exception {
+        RowData data = windowState.value(windowEnd);
+        if (data != null) {
+            ctx.output(data);
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
new file mode 100644
index 0000000..9c91345
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator;
+import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for window deduplicate operators created by {@link
+ * RowTimeWindowDeduplicateOperatorBuilder}.
+ */
+@RunWith(Parameterized.class)
+public class RowTimeWindowDeduplicateOperatorTest {
+
+    private static final RowType INPUT_ROW_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)),
+                            new RowType.RowField("f1", new BigIntType()),
+                            new RowType.RowField("f2", new BigIntType())));
+
+    private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE);
+
+    private static final RowDataKeySelector KEY_SELECTOR =
+            HandwrittenSelectorUtil.getRowDataSelector(
+                    new int[] {0}, INPUT_ROW_TYPE.getChildren().toArray(new LogicalType[0]));
+
+    private static final PagedTypeSerializer<RowData> KEY_SER =
+            (PagedTypeSerializer<RowData>) KEY_SELECTOR.getProducedType().toSerializer();
+
+    private static final int WINDOW_END_INDEX = 2;
+
+    private static final LogicalType[] OUTPUT_TYPES =
+            new LogicalType[] {
+                new VarCharType(Integer.MAX_VALUE), new BigIntType(), new BigIntType()
+            };
+
+    private static final TypeSerializer<RowData> OUT_SERIALIZER =
+            new RowDataSerializer(OUTPUT_TYPES);
+
+    private static final RowDataHarnessAssertor ASSERTER =
+            new RowDataHarnessAssertor(
+                    OUTPUT_TYPES,
+                    new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+
+    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
+    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
+    private final ZoneId shiftTimeZone;
+
+    public RowTimeWindowDeduplicateOperatorTest(ZoneId shiftTimeZone) {
+        this.shiftTimeZone = shiftTimeZone;
+    }
+
+    @Parameterized.Parameters(name = "TimeZone = {0}")
+    public static Collection<Object[]> runMode() {
+        return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID});
+    }
+
+    private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
+            SlicingWindowOperator<RowData, ?> operator) throws Exception {
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
+    }
+
+    @Test
+    public void testRowTimeWindowDeduplicateKeepFirstRow() throws Exception {
+        SlicingWindowOperator<RowData, ?> operator =
+                RowTimeWindowDeduplicateOperatorBuilder.builder()
+                        .inputSerializer(INPUT_ROW_SER)
+                        .shiftTimeZone(shiftTimeZone)
+                        .keySerializer(KEY_SER)
+                        .keepLastRow(false)
+                        .rowtimeIndex(1)
+                        .windowEndIndex(WINDOW_END_INDEX)
+                        .build();
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(operator);
+
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.open();
+
+        // process elements
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+        // add elements out-of-order
+        testHarness.processElement(
+                insertRecord("key2", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 4L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 5L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 1002L, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 3007L, toUtcTimestampMills(3999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 3008L, toUtcTimestampMills(3999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 3001L, toUtcTimestampMills(3999L, shiftTimeZone)));
+
+        testHarness.processElement(
+                insertRecord("key1", 2L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 1004L, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 1006L, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 1007L, toUtcTimestampMills(1999L, shiftTimeZone)));
+
+        testHarness.processWatermark(new Watermark(999));
+        expectedOutput.add(insertRecord("key1", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+        expectedOutput.add(insertRecord("key2", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+        expectedOutput.add(new Watermark(999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark(new Watermark(1999));
+        expectedOutput.add(insertRecord("key1", 1004L, toUtcTimestampMills(1999L, shiftTimeZone)));
+        expectedOutput.add(insertRecord("key2", 1002L, toUtcTimestampMills(1999L, shiftTimeZone)));
+        expectedOutput.add(new Watermark(1999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        // do a snapshot, close and restore again
+        testHarness.prepareSnapshotPreBarrier(0L);
+        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+        testHarness.close();
+        expectedOutput.clear();
+
+        testHarness = createTestHarness(operator);
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.initializeState(snapshot);
+        testHarness.open();
+
+        testHarness.processWatermark(new Watermark(3999));
+        expectedOutput.add(insertRecord("key2", 3001L, toUtcTimestampMills(3999L, shiftTimeZone)));
+        expectedOutput.add(new Watermark(3999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        // late element, should be dropped
+        testHarness.processElement(
+                insertRecord("key2", 3001L, toUtcTimestampMills(3500L, shiftTimeZone)));
+
+        testHarness.processWatermark(new Watermark(4999));
+        expectedOutput.add(new Watermark(4999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        assertEquals(1, operator.getNumLateRecordsDropped().getCount());
+
+        testHarness.close();
+    }
+
+    @Test
+    public void testRowTimeWindowDeduplicateKeepLastRow() throws Exception {
+        SlicingWindowOperator<RowData, ?> operator =
+                RowTimeWindowDeduplicateOperatorBuilder.builder()
+                        .inputSerializer(INPUT_ROW_SER)
+                        .shiftTimeZone(shiftTimeZone)
+                        .keySerializer(KEY_SER)
+                        .keepLastRow(true)
+                        .rowtimeIndex(1)
+                        .windowEndIndex(WINDOW_END_INDEX)
+                        .build();
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(operator);
+
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.open();
+
+        // process elements
+        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+        // add elements out-of-order
+        testHarness.processElement(
+                insertRecord("key2", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 4L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 5L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 1002L, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 3007L, toUtcTimestampMills(3999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 3008L, toUtcTimestampMills(3999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 3001L, toUtcTimestampMills(3999L, shiftTimeZone)));
+
+        testHarness.processElement(
+                insertRecord("key1", 2L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 1004L, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 1006L, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 1007L, toUtcTimestampMills(1999L, shiftTimeZone)));
+
+        testHarness.processWatermark(new Watermark(999));
+        expectedOutput.add(insertRecord("key1", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+        expectedOutput.add(insertRecord("key2", 5L, toUtcTimestampMills(999L, shiftTimeZone)));
+        expectedOutput.add(new Watermark(999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark(new Watermark(1999));
+        expectedOutput.add(insertRecord("key1", 1007L, toUtcTimestampMills(1999L, shiftTimeZone)));
+        expectedOutput.add(insertRecord("key2", 1002L, toUtcTimestampMills(1999L, shiftTimeZone)));
+        expectedOutput.add(new Watermark(1999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        // do a snapshot, close and restore again
+        testHarness.prepareSnapshotPreBarrier(0L);
+        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+        testHarness.close();
+        expectedOutput.clear();
+
+        testHarness = createTestHarness(operator);
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.initializeState(snapshot);
+        testHarness.open();
+
+        testHarness.processWatermark(new Watermark(3999));
+        expectedOutput.add(insertRecord("key2", 3008L, toUtcTimestampMills(3999L, shiftTimeZone)));
+        expectedOutput.add(new Watermark(3999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        // late element, should be dropped
+        testHarness.processElement(
+                insertRecord("key2", 3001L, toUtcTimestampMills(3500L, shiftTimeZone)));
+
+        testHarness.processWatermark(new Watermark(4999));
+        expectedOutput.add(new Watermark(4999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+        assertEquals(1, operator.getNumLateRecordsDropped().getCount());
+
+        testHarness.close();
+    }
+}