You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/11/03 03:09:06 UTC
[flink] branch master updated: [FLINK-23015][table-runtime]
Implement streaming window Deduplicate operator
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 59d6a18 [FLINK-23015][table-runtime] Implement streaming window Deduplicate operator
59d6a18 is described below
commit 59d6a189b0aa4d8ccbbc39abb73fe317f0abfcf1
Author: Jing Zhang <be...@gmail.com>
AuthorDate: Wed Nov 3 11:08:43 2021 +0800
[FLINK-23015][table-runtime] Implement streaming window Deduplicate operator
This closes #17571
---
.../deduplicate/DeduplicateFunctionHelper.java | 4 +-
.../RowTimeWindowDeduplicateOperatorBuilder.java | 113 ++++++++
.../RowTimeDeduplicateRecordsCombiner.java | 145 ++++++++++
.../RowTimeWindowDeduplicateProcessor.java | 144 ++++++++++
.../RowTimeWindowDeduplicateOperatorTest.java | 306 +++++++++++++++++++++
5 files changed, 710 insertions(+), 2 deletions(-)
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
index a068ac5..2186ee4 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
/** Utility for deduplicate function. */
-class DeduplicateFunctionHelper {
+public class DeduplicateFunctionHelper {
/**
* Processes element to deduplicate on keys with process time semantic, sends current element as
@@ -207,7 +207,7 @@ class DeduplicateFunctionHelper {
}
/** Returns current row is duplicate row or not compared to previous row. */
- static boolean isDuplicate(
+ public static boolean isDuplicate(
RowData preRow, RowData currentRow, int rowtimeIndex, boolean keepLastRow) {
if (keepLastRow) {
return preRow == null
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
new file mode 100644
index 0000000..fe22b31
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
+import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
+import org.apache.flink.table.runtime.operators.deduplicate.window.combines.RowTimeDeduplicateRecordsCombiner;
+import org.apache.flink.table.runtime.operators.deduplicate.window.processors.RowTimeWindowDeduplicateProcessor;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator;
+import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor;
+import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
+
+import java.time.ZoneId;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link RowTimeWindowDeduplicateOperatorBuilder} is used to build a {@link
+ * SlicingWindowOperator} for rowtime window deduplicate.
+ *
+ * <pre>
+ * RowTimeWindowDeduplicateOperatorBuilder.builder()
+ * .inputSerializer(inputSerializer)
+ * .keySerializer(keySerializer)
+ * .keepLastRow(true)
+ * .rowtimeIndex(0)
+ * .windowEndIndex(windowEndIndex)
+ * .build();
+ * </pre>
+ */
+public class RowTimeWindowDeduplicateOperatorBuilder {
+
+ public static RowTimeWindowDeduplicateOperatorBuilder builder() {
+ return new RowTimeWindowDeduplicateOperatorBuilder();
+ }
+
+ private AbstractRowDataSerializer<RowData> inputSerializer;
+ private PagedTypeSerializer<RowData> keySerializer;
+ private int rowtimeIndex;
+ private int windowEndIndex = -1;
+ private ZoneId shiftTimeZone;
+ private boolean keepLastRow;
+
+ public RowTimeWindowDeduplicateOperatorBuilder inputSerializer(
+ AbstractRowDataSerializer<RowData> inputSerializer) {
+ this.inputSerializer = inputSerializer;
+ return this;
+ }
+
+ public RowTimeWindowDeduplicateOperatorBuilder shiftTimeZone(ZoneId shiftTimeZone) {
+ this.shiftTimeZone = shiftTimeZone;
+ return this;
+ }
+
+ public RowTimeWindowDeduplicateOperatorBuilder keySerializer(
+ PagedTypeSerializer<RowData> keySerializer) {
+ this.keySerializer = keySerializer;
+ return this;
+ }
+
+ public RowTimeWindowDeduplicateOperatorBuilder keepLastRow(boolean keepLastRow) {
+ this.keepLastRow = keepLastRow;
+ return this;
+ }
+
+ public RowTimeWindowDeduplicateOperatorBuilder rowtimeIndex(int rowtimeIndex) {
+ this.rowtimeIndex = rowtimeIndex;
+ return this;
+ }
+
+ public RowTimeWindowDeduplicateOperatorBuilder windowEndIndex(int windowEndIndex) {
+ this.windowEndIndex = windowEndIndex;
+ return this;
+ }
+
+ public SlicingWindowOperator<RowData, ?> build() {
+ checkNotNull(inputSerializer);
+ checkNotNull(keySerializer);
+ checkArgument(
+ windowEndIndex >= 0,
+ String.format(
+ "Illegal window end index %s, it should not be negative!", windowEndIndex));
+ final RecordsCombiner.Factory combinerFactory =
+ new RowTimeDeduplicateRecordsCombiner.Factory(
+ inputSerializer, rowtimeIndex, keepLastRow);
+ final WindowBuffer.Factory bufferFactory =
+ new RecordsWindowBuffer.Factory(keySerializer, inputSerializer, combinerFactory);
+ final SlicingWindowProcessor<Long> windowProcessor =
+ new RowTimeWindowDeduplicateProcessor(
+ inputSerializer, bufferFactory, windowEndIndex, shiftTimeZone);
+ return new SlicingWindowOperator<>(windowProcessor);
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
new file mode 100644
index 0000000..bde7e73
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+ /** The service to register event-time or processing-time timers. */
+ private final WindowTimerService<Long> timerService;
+
+ /** Context to switch current key for states. */
+ private final StateKeyContext keyContext;
+
+ /** The state stores first/last record of each window. */
+ private final WindowValueState<Long> dataState;
+
+ private final int rowtimeIndex;
+
+ private final boolean keepLastRow;
+
+ /** Serializer to copy record if required. */
+ private final TypeSerializer<RowData> recordSerializer;
+
+ public RowTimeDeduplicateRecordsCombiner(
+ WindowTimerService<Long> timerService,
+ StateKeyContext keyContext,
+ WindowValueState<Long> dataState,
+ int rowtimeIndex,
+ boolean keepLastRow,
+ TypeSerializer<RowData> recordSerializer) {
+ this.timerService = timerService;
+ this.keyContext = keyContext;
+ this.dataState = dataState;
+ this.rowtimeIndex = rowtimeIndex;
+ this.keepLastRow = keepLastRow;
+ this.recordSerializer = recordSerializer;
+ }
+
+ @Override
+ public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+ // step 1: get first/last record of incremental data
+ RowData bufferedResult = null;
+ while (records.hasNext()) {
+ RowData record = records.next();
+ if (!isAccumulateMsg(record)) {
+ throw new UnsupportedOperationException(
+ "Window deduplicate does not support input RowKind: "
+ + record.getRowKind().shortString());
+ }
+ if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {
+ // the incoming record is reused, we should copy it
+ bufferedResult = recordSerializer.copy(record);
+ }
+ }
+ if (bufferedResult == null) {
+ return;
+ }
+ // step 2: flush data into state
+ keyContext.setCurrentKey(windowKey.getKey());
+ Long window = windowKey.getWindow();
+ RowData preRow = dataState.value(window);
+ if (isDuplicate(preRow, bufferedResult, rowtimeIndex, keepLastRow)) {
+ dataState.update(window, bufferedResult);
+ }
+ // step 3: register timer for current window
+ timerService.registerEventTimeWindowTimer(window);
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ // ----------------------------------------------------------------------------------------
+ // Factory
+ // ----------------------------------------------------------------------------------------
+
+ /** Factory to create {@link RowTimeDeduplicateRecordsCombiner}. */
+ public static final class Factory implements RecordsCombiner.Factory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeSerializer<RowData> recordSerializer;
+ private final int rowtimeIndex;
+ private final boolean keepLastRow;
+
+ public Factory(
+ TypeSerializer<RowData> recordSerializer, int rowtimeIndex, boolean keepLastRow) {
+ this.recordSerializer = recordSerializer;
+ this.rowtimeIndex = rowtimeIndex;
+ this.keepLastRow = keepLastRow;
+ }
+
+ @Override
+ public RecordsCombiner createRecordsCombiner(
+ RuntimeContext runtimeContext,
+ WindowTimerService<Long> timerService,
+ KeyedStateBackend<RowData> stateBackend,
+ WindowState<Long> windowState,
+ boolean isEventTime)
+ throws Exception {
+ WindowValueState<Long> windowMapState = (WindowValueState<Long>) windowState;
+ return new RowTimeDeduplicateRecordsCombiner(
+ timerService,
+ stateBackend::setCurrentKey,
+ windowMapState,
+ rowtimeIndex,
+ keepLastRow,
+ recordSerializer);
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/processors/RowTimeWindowDeduplicateProcessor.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/processors/RowTimeWindowDeduplicateProcessor.java
new file mode 100644
index 0000000..e3abae2
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/processors/RowTimeWindowDeduplicateProcessor.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.processors;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
+import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+
+import java.time.ZoneId;
+
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
+
+/** A rowtime window deduplicate processor. */
+public final class RowTimeWindowDeduplicateProcessor implements SlicingWindowProcessor<Long> {
+ private static final long serialVersionUID = 1L;
+
+ private final WindowBuffer.Factory bufferFactory;
+ private final TypeSerializer<RowData> inputSerializer;
+ private final int windowEndIndex;
+ private final ZoneId shiftTimeZone;
+
+ // ----------------------------------------------------------------------------------------
+
+ private transient long currentProgress;
+
+ private transient Context<Long> ctx;
+
+ private transient WindowTimerService<Long> windowTimerService;
+
+ private transient WindowBuffer windowBuffer;
+
+ /** state schema: [key, window_end, first/last record]. */
+ private transient WindowValueState<Long> windowState;
+
+ public RowTimeWindowDeduplicateProcessor(
+ TypeSerializer<RowData> inputSerializer,
+ WindowBuffer.Factory bufferFactory,
+ int windowEndIndex,
+ ZoneId shiftTimeZone) {
+ this.inputSerializer = inputSerializer;
+ this.bufferFactory = bufferFactory;
+ this.windowEndIndex = windowEndIndex;
+ this.shiftTimeZone = shiftTimeZone;
+ }
+
+ @Override
+ public void open(Context<Long> context) throws Exception {
+ this.ctx = context;
+ final LongSerializer namespaceSerializer = LongSerializer.INSTANCE;
+ ValueStateDescriptor<RowData> valueStateDescriptor =
+ new ValueStateDescriptor<>("window_deduplicate", inputSerializer);
+ ValueState<RowData> state =
+ ctx.getKeyedStateBackend()
+ .getOrCreateKeyedState(namespaceSerializer, valueStateDescriptor);
+ this.windowTimerService = new WindowTimerServiceImpl(ctx.getTimerService(), shiftTimeZone);
+ this.windowState =
+ new WindowValueState<>((InternalValueState<RowData, Long, RowData>) state);
+ this.windowBuffer =
+ bufferFactory.create(
+ ctx.getOperatorOwner(),
+ ctx.getMemoryManager(),
+ ctx.getMemorySize(),
+ ctx.getRuntimeContext(),
+ windowTimerService,
+ ctx.getKeyedStateBackend(),
+ windowState,
+ true,
+ shiftTimeZone);
+ this.currentProgress = Long.MIN_VALUE;
+ }
+
+ @Override
+ public boolean processElement(RowData key, RowData element) throws Exception {
+ long sliceEnd = element.getLong(windowEndIndex);
+ if (isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {
+ // element is late and should be dropped
+ return true;
+ }
+ windowBuffer.addElement(key, sliceEnd, element);
+ return false;
+ }
+
+ @Override
+ public void advanceProgress(long progress) throws Exception {
+ if (progress > currentProgress) {
+ currentProgress = progress;
+ windowBuffer.advanceProgress(currentProgress);
+ }
+ }
+
+ @Override
+ public void prepareCheckpoint() throws Exception {
+ windowBuffer.flush();
+ }
+
+ @Override
+ public void clearWindow(Long windowEnd) throws Exception {
+ windowState.clear(windowEnd);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (windowBuffer != null) {
+ windowBuffer.close();
+ }
+ }
+
+ @Override
+ public TypeSerializer<Long> createWindowSerializer() {
+ return LongSerializer.INSTANCE;
+ }
+
+ @Override
+ public void fireWindow(Long windowEnd) throws Exception {
+ RowData data = windowState.value(windowEnd);
+ if (data != null) {
+ ctx.output(data);
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
new file mode 100644
index 0000000..9c91345
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator;
+import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for window deduplicate operators created by {@link
+ * RowTimeWindowDeduplicateOperatorBuilder}.
+ */
+@RunWith(Parameterized.class)
+public class RowTimeWindowDeduplicateOperatorTest {
+
+ private static final RowType INPUT_ROW_TYPE =
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)),
+ new RowType.RowField("f1", new BigIntType()),
+ new RowType.RowField("f2", new BigIntType())));
+
+ private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE);
+
+ private static final RowDataKeySelector KEY_SELECTOR =
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {0}, INPUT_ROW_TYPE.getChildren().toArray(new LogicalType[0]));
+
+ private static final PagedTypeSerializer<RowData> KEY_SER =
+ (PagedTypeSerializer<RowData>) KEY_SELECTOR.getProducedType().toSerializer();
+
+ private static final int WINDOW_END_INDEX = 2;
+
+ private static final LogicalType[] OUTPUT_TYPES =
+ new LogicalType[] {
+ new VarCharType(Integer.MAX_VALUE), new BigIntType(), new BigIntType()
+ };
+
+ private static final TypeSerializer<RowData> OUT_SERIALIZER =
+ new RowDataSerializer(OUTPUT_TYPES);
+
+ private static final RowDataHarnessAssertor ASSERTER =
+ new RowDataHarnessAssertor(
+ OUTPUT_TYPES,
+ new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+
+ private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
+ private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
+ private final ZoneId shiftTimeZone;
+
+ public RowTimeWindowDeduplicateOperatorTest(ZoneId shiftTimeZone) {
+ this.shiftTimeZone = shiftTimeZone;
+ }
+
+ @Parameterized.Parameters(name = "TimeZone = {0}")
+ public static Collection<Object[]> runMode() {
+ return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID});
+ }
+
+ private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
+ SlicingWindowOperator<RowData, ?> operator) throws Exception {
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
+ }
+
+ @Test
+ public void testRowTimeWindowDeduplicateKeepFirstRow() throws Exception {
+ SlicingWindowOperator<RowData, ?> operator =
+ RowTimeWindowDeduplicateOperatorBuilder.builder()
+ .inputSerializer(INPUT_ROW_SER)
+ .shiftTimeZone(shiftTimeZone)
+ .keySerializer(KEY_SER)
+ .keepLastRow(false)
+ .rowtimeIndex(1)
+ .windowEndIndex(WINDOW_END_INDEX)
+ .build();
+
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(operator);
+
+ testHarness.setup(OUT_SERIALIZER);
+ testHarness.open();
+
+ // process elements
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ // add elements out-of-order
+ testHarness.processElement(
+ insertRecord("key2", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 4L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 5L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 1002L, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 3007L, toUtcTimestampMills(3999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 3008L, toUtcTimestampMills(3999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 3001L, toUtcTimestampMills(3999L, shiftTimeZone)));
+
+ testHarness.processElement(
+ insertRecord("key1", 2L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 1004L, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 1006L, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 1007L, toUtcTimestampMills(1999L, shiftTimeZone)));
+
+ testHarness.processWatermark(new Watermark(999));
+ expectedOutput.add(insertRecord("key1", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+ expectedOutput.add(insertRecord("key2", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+ expectedOutput.add(new Watermark(999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processWatermark(new Watermark(1999));
+ expectedOutput.add(insertRecord("key1", 1004L, toUtcTimestampMills(1999L, shiftTimeZone)));
+ expectedOutput.add(insertRecord("key2", 1002L, toUtcTimestampMills(1999L, shiftTimeZone)));
+ expectedOutput.add(new Watermark(1999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ // do a snapshot, close and restore again
+ testHarness.prepareSnapshotPreBarrier(0L);
+ OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+ testHarness.close();
+ expectedOutput.clear();
+
+ testHarness = createTestHarness(operator);
+ testHarness.setup(OUT_SERIALIZER);
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(3999));
+ expectedOutput.add(insertRecord("key2", 3001L, toUtcTimestampMills(3999L, shiftTimeZone)));
+ expectedOutput.add(new Watermark(3999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ // late element, should be dropped
+ testHarness.processElement(
+ insertRecord("key2", 3001L, toUtcTimestampMills(3500L, shiftTimeZone)));
+
+ testHarness.processWatermark(new Watermark(4999));
+ expectedOutput.add(new Watermark(4999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ assertEquals(1, operator.getNumLateRecordsDropped().getCount());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testRowTimeWindowDeduplicateKeepLastRow() throws Exception {
+ SlicingWindowOperator<RowData, ?> operator =
+ RowTimeWindowDeduplicateOperatorBuilder.builder()
+ .inputSerializer(INPUT_ROW_SER)
+ .shiftTimeZone(shiftTimeZone)
+ .keySerializer(KEY_SER)
+ .keepLastRow(true)
+ .rowtimeIndex(1)
+ .windowEndIndex(WINDOW_END_INDEX)
+ .build();
+
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(operator);
+
+ testHarness.setup(OUT_SERIALIZER);
+ testHarness.open();
+
+ // process elements
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ // add elements out-of-order
+ testHarness.processElement(
+ insertRecord("key2", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 4L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 5L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 1002L, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 3007L, toUtcTimestampMills(3999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 3008L, toUtcTimestampMills(3999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 3001L, toUtcTimestampMills(3999L, shiftTimeZone)));
+
+ testHarness.processElement(
+ insertRecord("key1", 2L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 1L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 1004L, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 1006L, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 1007L, toUtcTimestampMills(1999L, shiftTimeZone)));
+
+ testHarness.processWatermark(new Watermark(999));
+ expectedOutput.add(insertRecord("key1", 3L, toUtcTimestampMills(999L, shiftTimeZone)));
+ expectedOutput.add(insertRecord("key2", 5L, toUtcTimestampMills(999L, shiftTimeZone)));
+ expectedOutput.add(new Watermark(999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processWatermark(new Watermark(1999));
+ expectedOutput.add(insertRecord("key1", 1007L, toUtcTimestampMills(1999L, shiftTimeZone)));
+ expectedOutput.add(insertRecord("key2", 1002L, toUtcTimestampMills(1999L, shiftTimeZone)));
+ expectedOutput.add(new Watermark(1999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ // do a snapshot, close and restore again
+ testHarness.prepareSnapshotPreBarrier(0L);
+ OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+ testHarness.close();
+ expectedOutput.clear();
+
+ testHarness = createTestHarness(operator);
+ testHarness.setup(OUT_SERIALIZER);
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(3999));
+ expectedOutput.add(insertRecord("key2", 3008L, toUtcTimestampMills(3999L, shiftTimeZone)));
+ expectedOutput.add(new Watermark(3999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ // late element, should be dropped
+ testHarness.processElement(
+ insertRecord("key2", 3001L, toUtcTimestampMills(3500L, shiftTimeZone)));
+
+ testHarness.processWatermark(new Watermark(4999));
+ expectedOutput.add(new Watermark(4999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ assertEquals(1, operator.getNumLateRecordsDropped().getCount());
+
+ testHarness.close();
+ }
+}