You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/05/08 02:30:02 UTC

[flink] branch master updated: [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 89e81c2  [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder
89e81c2 is described below

commit 89e81c256990912ab15ac9a0939abba2606d4aca
Author: Jing Zhang <be...@126.com>
AuthorDate: Sat May 8 10:29:43 2021 +0800

    [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder
    
    This closes #15760
---
 .../operators/join/window/WindowJoinOperator.java  | 594 +++++++++++++++++++++
 .../join/window/WindowJoinOperatorBuilder.java     | 187 +++++++
 .../operators/window/state/WindowListState.java    |  61 +++
 .../join/window/WindowJoinOperatorTest.java        | 534 ++++++++++++++++++
 .../rank/window/WindowRankOperatorTest.java        | 225 +++++---
 5 files changed, 1520 insertions(+), 81 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
new file mode 100644
index 0000000..d51064f
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl;
+import org.apache.flink.table.runtime.operators.window.state.WindowListState;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.types.RowKind;
+
+import java.time.ZoneId;
+import java.util.IdentityHashMap;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
+
+/**
+ * Streaming window join operator.
+ *
+ * <p>Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus
+ * late elements (elements belong to emitted windows) will be simply dropped.
+ *
+ * <p>Note: currently, {@link WindowJoinOperator} doesn't support DELETE or UPDATE_BEFORE input row.
+ */
+public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
+        implements TwoInputStreamOperator<RowData, RowData, RowData>,
+                Triggerable<RowData, Long>,
+                KeyContext {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+            "leftNumLateRecordsDropped";
+    private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+            "leftLateRecordsDroppedRate";
+    private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+            "rightNumLateRecordsDropped";
+    private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+            "rightLateRecordsDroppedRate";
+    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
+    private static final String LEFT_RECORDS_STATE_NAME = "left-records";
+    private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
+
+    protected final RowDataSerializer leftSerializer;
+    protected final RowDataSerializer rightSerializer;
+    private final GeneratedJoinCondition generatedJoinCondition;
+
+    private final int leftWindowEndIndex;
+    private final int rightWindowEndIndex;
+
+    private final boolean[] filterNullKeys;
+    private final ZoneId shiftTimeZone;
+
+    /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
+    private transient boolean functionsClosed = false;
+
+    private transient WindowTimerService<Long> windowTimerService;
+
+    // ------------------------------------------------------------------------
+    protected transient JoinConditionWithNullFilters joinCondition;
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected transient TimestampedCollector<RowData> collector;
+
+    private transient WindowListState<Long> leftWindowState;
+    private transient WindowListState<Long> rightWindowState;
+
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    private transient Counter leftNumLateRecordsDropped;
+    private transient Meter leftLateRecordsDroppedRate;
+    private transient Counter rightNumLateRecordsDropped;
+    private transient Meter rightLateRecordsDroppedRate;
+    private transient Gauge<Long> watermarkLatency;
+
+    WindowJoinOperator(
+            TypeSerializer<RowData> leftSerializer,
+            TypeSerializer<RowData> rightSerializer,
+            GeneratedJoinCondition generatedJoinCondition,
+            int leftWindowEndIndex,
+            int rightWindowEndIndex,
+            boolean[] filterNullKeys,
+            ZoneId shiftTimeZone) {
+        this.leftSerializer = (RowDataSerializer) leftSerializer;
+        this.rightSerializer = (RowDataSerializer) rightSerializer;
+        this.generatedJoinCondition = generatedJoinCondition;
+        this.leftWindowEndIndex = leftWindowEndIndex;
+        this.rightWindowEndIndex = rightWindowEndIndex;
+        this.filterNullKeys = filterNullKeys;
+        this.shiftTimeZone = shiftTimeZone;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        functionsClosed = false;
+
+        this.collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        final LongSerializer windowSerializer = LongSerializer.INSTANCE;
+
+        InternalTimerService<Long> internalTimerService =
+                getInternalTimerService("window-timers", windowSerializer, this);
+        this.windowTimerService = new WindowTimerServiceImpl(internalTimerService, shiftTimeZone);
+
+        // init join condition
+        JoinCondition condition =
+                generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
+        this.joinCondition = new JoinConditionWithNullFilters(condition, filterNullKeys, this);
+        this.joinCondition.setRuntimeContext(getRuntimeContext());
+        this.joinCondition.open(new Configuration());
+
+        // init state
+        ListStateDescriptor<RowData> leftRecordStateDesc =
+                new ListStateDescriptor<>(LEFT_RECORDS_STATE_NAME, leftSerializer);
+        ListState<RowData> leftListState =
+                getOrCreateKeyedState(windowSerializer, leftRecordStateDesc);
+        this.leftWindowState =
+                new WindowListState<>((InternalListState<RowData, Long, RowData>) leftListState);
+
+        ListStateDescriptor<RowData> rightRecordStateDesc =
+                new ListStateDescriptor<>(RIGHT_RECORDS_STATE_NAME, rightSerializer);
+        ListState<RowData> rightListState =
+                getOrCreateKeyedState(windowSerializer, rightRecordStateDesc);
+        this.rightWindowState =
+                new WindowListState<>((InternalListState<RowData, Long, RowData>) rightListState);
+
+        // metrics
+        this.leftNumLateRecordsDropped = metrics.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.leftLateRecordsDroppedRate =
+                metrics.meter(
+                        LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(leftNumLateRecordsDropped));
+        this.rightNumLateRecordsDropped = metrics.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.rightLateRecordsDroppedRate =
+                metrics.meter(
+                        RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(rightNumLateRecordsDropped));
+        this.watermarkLatency =
+                metrics.gauge(
+                        WATERMARK_LATENCY_METRIC_NAME,
+                        () -> {
+                            long watermark = windowTimerService.currentWatermark();
+                            if (watermark < 0) {
+                                return 0L;
+                            } else {
+                                return windowTimerService.currentProcessingTime() - watermark;
+                            }
+                        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        functionsClosed = true;
+        if (joinCondition != null) {
+            joinCondition.close();
+        }
+    }
+
+    @Override
+    public void dispose() throws Exception {
+        super.dispose();
+        collector = null;
+        if (!functionsClosed) {
+            functionsClosed = true;
+            if (joinCondition != null) {
+                joinCondition.close();
+            }
+        }
+    }
+
+    @Override
+    public void processElement1(StreamRecord<RowData> element) throws Exception {
+        processElement(element, leftWindowEndIndex, leftLateRecordsDroppedRate, leftWindowState);
+    }
+
+    @Override
+    public void processElement2(StreamRecord<RowData> element) throws Exception {
+        processElement(element, rightWindowEndIndex, rightLateRecordsDroppedRate, rightWindowState);
+    }
+
+    private void processElement(
+            StreamRecord<RowData> element,
+            int windowEndIndex,
+            Meter lateRecordsDroppedRate,
+            WindowListState<Long> recordState)
+            throws Exception {
+        RowData inputRow = element.getValue();
+        long windowEnd = inputRow.getLong(windowEndIndex);
+        if (isWindowFired(windowEnd, windowTimerService.currentWatermark(), shiftTimeZone)) {
+            // element is late and should be dropped
+            lateRecordsDroppedRate.markEvent();
+            return;
+        }
+        if (RowDataUtil.isAccumulateMsg(inputRow)) {
+            recordState.add(windowEnd, inputRow);
+        } else {
+            // Window join could not handle retraction input stream
+            throw new UnsupportedOperationException(
+                    "This is a bug and should not happen. Please file an issue.");
+        }
+        // always register time for every element
+        windowTimerService.registerEventTimeWindowTimer(windowEnd);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<RowData, Long> timer) throws Exception {
+        // Window join only support event-time now
+        throw new UnsupportedOperationException(
+                "This is a bug and should not happen. Please file an issue.");
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<RowData, Long> timer) throws Exception {
+        setCurrentKey(timer.getKey());
+        Long window = timer.getNamespace();
+        // join left records and right records
+        List<RowData> leftData = leftWindowState.get(window);
+        List<RowData> rightData = rightWindowState.get(window);
+        join(leftData, rightData);
+        // clear state
+        if (leftData != null) {
+            leftWindowState.clear(window);
+        }
+        if (rightData != null) {
+            rightWindowState.clear(window);
+        }
+    }
+
+    public abstract void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords);
+
+    static class SemiAntiJoinOperator extends WindowJoinOperator {
+
+        private final boolean isAntiJoin;
+
+        SemiAntiJoinOperator(
+                TypeSerializer<RowData> leftSerializer,
+                TypeSerializer<RowData> rightSerializer,
+                GeneratedJoinCondition generatedJoinCondition,
+                int leftWindowEndIndex,
+                int rightWindowEndIndex,
+                boolean[] filterNullKeys,
+                boolean isAntiJoin,
+                ZoneId shiftTimeZone) {
+            super(
+                    leftSerializer,
+                    rightSerializer,
+                    generatedJoinCondition,
+                    leftWindowEndIndex,
+                    rightWindowEndIndex,
+                    filterNullKeys,
+                    shiftTimeZone);
+            this.isAntiJoin = isAntiJoin;
+        }
+
+        @Override
+        public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) {
+            if (leftRecords == null) {
+                return;
+            }
+            if (rightRecords == null) {
+                if (isAntiJoin) {
+                    for (RowData leftRecord : leftRecords) {
+                        collector.collect(leftRecord);
+                    }
+                }
+                return;
+            }
+            for (RowData leftRecord : leftRecords) {
+                boolean matches = false;
+                for (RowData rightRecord : rightRecords) {
+                    if (joinCondition.apply(leftRecord, rightRecord)) {
+                        if (joinCondition.apply(leftRecord, rightRecord)) {
+                            matches = true;
+                            break;
+                        }
+                    }
+                }
+                if (matches) {
+                    if (!isAntiJoin) {
+                        // emit left record if there are matched rows on the other side
+                        collector.collect(leftRecord);
+                    }
+                } else {
+                    if (isAntiJoin) {
+                        // emit left record if there is no matched row on the other side
+                        collector.collect(leftRecord);
+                    }
+                }
+            }
+        }
+    }
+
+    static class InnerJoinOperator extends WindowJoinOperator {
+        private transient JoinedRowData outRow;
+
+        InnerJoinOperator(
+                TypeSerializer<RowData> leftSerializer,
+                TypeSerializer<RowData> rightSerializer,
+                GeneratedJoinCondition generatedJoinCondition,
+                int leftWindowEndIndex,
+                int rightWindowEndIndex,
+                boolean[] filterNullKeys,
+                ZoneId shiftTimeZone) {
+            super(
+                    leftSerializer,
+                    rightSerializer,
+                    generatedJoinCondition,
+                    leftWindowEndIndex,
+                    rightWindowEndIndex,
+                    filterNullKeys,
+                    shiftTimeZone);
+        }
+
+        @Override
+        public void open() throws Exception {
+            super.open();
+            outRow = new JoinedRowData();
+        }
+
+        @Override
+        public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) {
+            if (leftRecords == null || rightRecords == null) {
+                return;
+            }
+            for (RowData leftRecord : leftRecords) {
+                for (RowData rightRecord : rightRecords) {
+                    if (joinCondition.apply(leftRecord, rightRecord)) {
+                        outRow.setRowKind(RowKind.INSERT);
+                        outRow.replace(leftRecord, rightRecord);
+                        collector.collect(outRow);
+                    }
+                }
+            }
+        }
+    }
+
+    private abstract static class AbstractOuterJoinOperator extends WindowJoinOperator {
+
+        private static final long serialVersionUID = 1L;
+
+        private transient RowData leftNullRow;
+        private transient RowData rightNullRow;
+        private transient JoinedRowData outRow;
+
+        AbstractOuterJoinOperator(
+                TypeSerializer<RowData> leftSerializer,
+                TypeSerializer<RowData> rightSerializer,
+                GeneratedJoinCondition generatedJoinCondition,
+                int leftWindowEndIndex,
+                int rightWindowEndIndex,
+                boolean[] filterNullKeys,
+                ZoneId shiftTimeZone) {
+            super(
+                    leftSerializer,
+                    rightSerializer,
+                    generatedJoinCondition,
+                    leftWindowEndIndex,
+                    rightWindowEndIndex,
+                    filterNullKeys,
+                    shiftTimeZone);
+        }
+
+        @Override
+        public void open() throws Exception {
+            super.open();
+            leftNullRow = new GenericRowData(leftSerializer.getArity());
+            rightNullRow = new GenericRowData(rightSerializer.getArity());
+            outRow = new JoinedRowData();
+        }
+
+        protected void outputNullPadding(RowData row, boolean isLeft) {
+            if (isLeft) {
+                outRow.replace(row, rightNullRow);
+            } else {
+                outRow.replace(leftNullRow, row);
+            }
+            outRow.setRowKind(RowKind.INSERT);
+            collector.collect(outRow);
+        }
+
+        protected void outputNullPadding(Iterable<RowData> rows, boolean isLeft) {
+            for (RowData row : rows) {
+                outputNullPadding(row, isLeft);
+            }
+        }
+
+        protected void output(RowData inputRow, RowData otherRow, boolean inputIsLeft) {
+            if (inputIsLeft) {
+                outRow.replace(inputRow, otherRow);
+            } else {
+                outRow.replace(otherRow, inputRow);
+            }
+            outRow.setRowKind(RowKind.INSERT);
+            collector.collect(outRow);
+        }
+    }
+
+    static class LeftOuterJoinOperator extends AbstractOuterJoinOperator {
+
+        private static final long serialVersionUID = 1L;
+
+        LeftOuterJoinOperator(
+                TypeSerializer<RowData> leftSerializer,
+                TypeSerializer<RowData> rightSerializer,
+                GeneratedJoinCondition generatedJoinCondition,
+                int leftWindowEndIndex,
+                int rightWindowEndIndex,
+                boolean[] filterNullKeys,
+                ZoneId shiftTimeZone) {
+            super(
+                    leftSerializer,
+                    rightSerializer,
+                    generatedJoinCondition,
+                    leftWindowEndIndex,
+                    rightWindowEndIndex,
+                    filterNullKeys,
+                    shiftTimeZone);
+        }
+
+        @Override
+        public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) {
+            if (leftRecords == null) {
+                return;
+            }
+            if (rightRecords == null) {
+                outputNullPadding(leftRecords, true);
+            } else {
+                for (RowData leftRecord : leftRecords) {
+                    boolean matches = false;
+                    for (RowData rightRecord : rightRecords) {
+                        if (joinCondition.apply(leftRecord, rightRecord)) {
+                            output(leftRecord, rightRecord, true);
+                            matches = true;
+                        }
+                    }
+                    if (!matches) {
+                        // padding null for left side
+                        outputNullPadding(leftRecord, true);
+                    }
+                }
+            }
+        }
+    }
+
+    static class RightOuterJoinOperator extends AbstractOuterJoinOperator {
+
+        private static final long serialVersionUID = 1L;
+
+        RightOuterJoinOperator(
+                TypeSerializer<RowData> leftSerializer,
+                TypeSerializer<RowData> rightSerializer,
+                GeneratedJoinCondition generatedJoinCondition,
+                int leftWindowEndIndex,
+                int rightWindowEndIndex,
+                boolean[] filterNullKeys,
+                ZoneId shiftTimeZone) {
+            super(
+                    leftSerializer,
+                    rightSerializer,
+                    generatedJoinCondition,
+                    leftWindowEndIndex,
+                    rightWindowEndIndex,
+                    filterNullKeys,
+                    shiftTimeZone);
+        }
+
+        @Override
+        public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) {
+            if (rightRecords == null) {
+                return;
+            }
+            if (leftRecords == null) {
+                outputNullPadding(rightRecords, false);
+            } else {
+                for (RowData rightRecord : rightRecords) {
+                    boolean matches = false;
+                    for (RowData leftRecord : leftRecords) {
+                        if (joinCondition.apply(leftRecord, rightRecord)) {
+                            output(leftRecord, rightRecord, true);
+                            matches = true;
+                        }
+                    }
+                    if (!matches) {
+                        outputNullPadding(rightRecord, false);
+                    }
+                }
+            }
+        }
+    }
+
+    static class FullOuterJoinOperator extends AbstractOuterJoinOperator {
+
+        private static final long serialVersionUID = 1L;
+
+        FullOuterJoinOperator(
+                TypeSerializer<RowData> leftSerializer,
+                TypeSerializer<RowData> rightSerializer,
+                GeneratedJoinCondition generatedJoinCondition,
+                int leftWindowEndIndex,
+                int rightWindowEndIndex,
+                boolean[] filterNullKeys,
+                ZoneId shiftTimeZone) {
+            super(
+                    leftSerializer,
+                    rightSerializer,
+                    generatedJoinCondition,
+                    leftWindowEndIndex,
+                    rightWindowEndIndex,
+                    filterNullKeys,
+                    shiftTimeZone);
+        }
+
+        @Override
+        public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) {
+            if (leftRecords == null && rightRecords == null) {
+                return;
+            }
+            if (rightRecords == null) {
+                outputNullPadding(leftRecords, true);
+            } else if (leftRecords == null) {
+                outputNullPadding(rightRecords, false);
+            } else {
+                IdentityHashMap<RowData, Boolean> emittedRightRecords = new IdentityHashMap<>();
+                for (RowData leftRecord : leftRecords) {
+                    boolean matches = false;
+                    for (RowData rightRecord : rightRecords) {
+                        if (joinCondition.apply(leftRecord, rightRecord)) {
+                            output(leftRecord, rightRecord, true);
+                            matches = true;
+                            emittedRightRecords.put(rightRecord, Boolean.TRUE);
+                        }
+                    }
+                    // padding null for left side
+                    if (!matches) {
+                        outputNullPadding(leftRecord, true);
+                    }
+                }
+                // padding null for never emitted right side
+                for (RowData rightRecord : rightRecords) {
+                    if (!emittedRightRecords.containsKey(rightRecord)) {
+                        outputNullPadding(rightRecord, false);
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java
new file mode 100644
index 0000000..c0e5665
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+
+import java.time.ZoneId;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link WindowJoinOperatorBuilder} is used to build a {@link WindowJoinOperator} for window
+ * join.
+ *
+ * <pre>
+ * WindowJoinOperatorBuilder.builder()
+ *   .leftType(leftType)
+ *   .rightType(rightType)
+ *   .generatedJoinCondition(generatedJoinCondition)
+ *   .leftWindowEndIndex(leftWindowEndIndex)
+ *   .rightWindowEndIndex(rightWindowEndIndex)
+ *   .filterNullKeys(filterNullKeys)
+ *   .joinType(joinType)
+ *   .build();
+ * </pre>
+ */
+public class WindowJoinOperatorBuilder {
+
+    public static WindowJoinOperatorBuilder builder() {
+        return new WindowJoinOperatorBuilder();
+    }
+
+    private TypeSerializer<RowData> leftSerializer;
+    private TypeSerializer<RowData> rightSerializer;
+    private GeneratedJoinCondition generatedJoinCondition;
+    private int leftWindowEndIndex = -1;
+    private int rightWindowEndIndex = -1;
+    private boolean[] filterNullKeys;
+    private FlinkJoinType joinType;
+    private ZoneId shiftTimeZone = ZoneId.of("UTC");
+
+    public WindowJoinOperatorBuilder leftSerializer(TypeSerializer<RowData> leftSerializer) {
+        this.leftSerializer = leftSerializer;
+        return this;
+    }
+
+    public WindowJoinOperatorBuilder rightSerializer(TypeSerializer<RowData> rightSerializer) {
+        this.rightSerializer = rightSerializer;
+        return this;
+    }
+
+    public WindowJoinOperatorBuilder generatedJoinCondition(
+            GeneratedJoinCondition generatedJoinCondition) {
+        this.generatedJoinCondition = generatedJoinCondition;
+        return this;
+    }
+
+    public WindowJoinOperatorBuilder filterNullKeys(boolean[] filterNullKeys) {
+        this.filterNullKeys = filterNullKeys;
+        return this;
+    }
+
+    public WindowJoinOperatorBuilder joinType(FlinkJoinType joinType) {
+        this.joinType = joinType;
+        return this;
+    }
+
+    public WindowJoinOperatorBuilder leftWindowEndIndex(int leftWindowEndIndex) {
+        this.leftWindowEndIndex = leftWindowEndIndex;
+        return this;
+    }
+
+    public WindowJoinOperatorBuilder rightWindowEndIndex(int rightWindowEndIndex) {
+        this.rightWindowEndIndex = rightWindowEndIndex;
+        return this;
+    }
+
+    /**
+     * The shift timezone of the window, if the proctime or rowtime type is TIMESTAMP_LTZ, the shift
+     * timezone is the timezone user configured in TableConfig, other cases the timezone is UTC
+     * which means never shift when assigning windows.
+     */
+    public WindowJoinOperatorBuilder withShiftTimezone(ZoneId shiftTimeZone) {
+        this.shiftTimeZone = shiftTimeZone;
+        return this;
+    }
+
+    public WindowJoinOperator build() {
+        checkNotNull(leftSerializer);
+        checkNotNull(rightSerializer);
+        checkNotNull(generatedJoinCondition);
+        checkNotNull(filterNullKeys);
+        checkNotNull(joinType);
+
+        checkArgument(
+                leftWindowEndIndex >= 0,
+                String.format(
+                        "Illegal window end index %s, it should not be negative!",
+                        leftWindowEndIndex));
+        checkArgument(
+                rightWindowEndIndex >= 0,
+                String.format(
+                        "Illegal window end index %s, it should not be negative!",
+                        rightWindowEndIndex));
+
+        switch (joinType) {
+            case INNER:
+                return new WindowJoinOperator.InnerJoinOperator(
+                        leftSerializer,
+                        rightSerializer,
+                        generatedJoinCondition,
+                        leftWindowEndIndex,
+                        rightWindowEndIndex,
+                        filterNullKeys,
+                        shiftTimeZone);
+            case SEMI:
+                return new WindowJoinOperator.SemiAntiJoinOperator(
+                        leftSerializer,
+                        rightSerializer,
+                        generatedJoinCondition,
+                        leftWindowEndIndex,
+                        rightWindowEndIndex,
+                        filterNullKeys,
+                        false,
+                        shiftTimeZone);
+            case ANTI:
+                return new WindowJoinOperator.SemiAntiJoinOperator(
+                        leftSerializer,
+                        rightSerializer,
+                        generatedJoinCondition,
+                        leftWindowEndIndex,
+                        rightWindowEndIndex,
+                        filterNullKeys,
+                        true,
+                        shiftTimeZone);
+            case LEFT:
+                return new WindowJoinOperator.LeftOuterJoinOperator(
+                        leftSerializer,
+                        rightSerializer,
+                        generatedJoinCondition,
+                        leftWindowEndIndex,
+                        rightWindowEndIndex,
+                        filterNullKeys,
+                        shiftTimeZone);
+            case RIGHT:
+                return new WindowJoinOperator.RightOuterJoinOperator(
+                        leftSerializer,
+                        rightSerializer,
+                        generatedJoinCondition,
+                        leftWindowEndIndex,
+                        rightWindowEndIndex,
+                        filterNullKeys,
+                        shiftTimeZone);
+            case FULL:
+                return new WindowJoinOperator.FullOuterJoinOperator(
+                        leftSerializer,
+                        rightSerializer,
+                        generatedJoinCondition,
+                        leftWindowEndIndex,
+                        rightWindowEndIndex,
+                        filterNullKeys,
+                        shiftTimeZone);
+            default:
+                throw new IllegalArgumentException("Invalid join type: " + joinType);
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java
new file mode 100644
index 0000000..1de0ad0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/** A wrapper of {@link ListState} which is easier to update based on window namespace. */
+public final class WindowListState<W> implements WindowState<W> {
+
+    private final InternalListState<RowData, W, RowData> windowState;
+
+    public WindowListState(InternalListState<RowData, W, RowData> windowState) {
+        this.windowState = windowState;
+    }
+
+    public void clear(W window) {
+        windowState.setCurrentNamespace(window);
+        windowState.clear();
+    }
+
+    public List<RowData> get(W window) throws Exception {
+        windowState.setCurrentNamespace(window);
+        return windowState.getInternal();
+    }
+
+    /**
+     * Updates the operator state accessible by {@link #get(W)})} by adding the given value to the
+     * list of values. The next time {@link #get(W)} is called (for the same state partition) the
+     * returned state will represent the updated list.
+     *
+     * <p>If null is passed in, the state value will remain unchanged.
+     *
+     * @param window The namespace for the state.
+     * @param value The new value for the state.
+     * @throws Exception Thrown if the system cannot access the state.
+     */
+    public void add(W window, RowData value) throws Exception {
+        windowState.setCurrentNamespace(window);
+        windowState.add(value);
+    }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
new file mode 100644
index 0000000..3b7d093
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for window join operators created by {@link WindowJoinOperatorBuilder}. */
+@RunWith(Parameterized.class)
+public class WindowJoinOperatorTest {
+
+    private static final InternalTypeInfo<RowData> INPUT_ROW_TYPE =
+            InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+
+    private static final InternalTypeInfo<RowData> OUTPUT_ROW_TYPE =
+            InternalTypeInfo.ofFields(
+                    new BigIntType(),
+                    new VarCharType(VarCharType.MAX_LENGTH),
+                    new BigIntType(),
+                    new VarCharType(VarCharType.MAX_LENGTH));
+
+    private static final RowDataHarnessAssertor ASSERTER =
+            new RowDataHarnessAssertor(OUTPUT_ROW_TYPE.toRowFieldTypes());
+
+    private static final RowDataHarnessAssertor SEMI_ANTI_JOIN_ASSERTER =
+            new RowDataHarnessAssertor(INPUT_ROW_TYPE.toRowFieldTypes());
+
+    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
+
+    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
+
+    private final ZoneId shiftTimeZone;
+
+    public WindowJoinOperatorTest(ZoneId shiftTimeZone) {
+        this.shiftTimeZone = shiftTimeZone;
+    }
+
+    @Parameterized.Parameters(name = "TimeZone = {0}")
+    public static Collection<Object[]> runMode() {
+        return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID});
+    }
+
+    @Test
+    public void testSemiJoin() throws Exception {
+        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+                createTestHarness(FlinkJoinType.SEMI);
+
+        testHarness.open();
+        testHarness.processWatermark1(new Watermark(1));
+        testHarness.processWatermark2(new Watermark(1));
+
+        // Test late data would be dropped
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+        assertEquals(0, testHarness.numEventTimeTimers());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numEventTimeTimers());
+        assertEquals(4, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(10));
+        testHarness.processWatermark2(new Watermark(10));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        expectedOutput.add(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        expectedOutput.add(new Watermark(10));
+        SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted(
+                "output wrong.", expectedOutput, testHarness.getOutput());
+        assertEquals(0, testHarness.numEventTimeTimers());
+        assertEquals(0, testHarness.numKeyedStateEntries());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(13));
+        testHarness.processWatermark2(new Watermark(13));
+
+        expectedOutput.add(new Watermark(13));
+        assertEquals(2, testHarness.numKeyedStateEntries());
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark1(new Watermark(18));
+        testHarness.processWatermark2(new Watermark(18));
+        expectedOutput.add(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        expectedOutput.add(new Watermark(18));
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testAntiJoin() throws Exception {
+        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+                createTestHarness(FlinkJoinType.ANTI);
+        testHarness.open();
+        testHarness.processWatermark1(new Watermark(1));
+        testHarness.processWatermark2(new Watermark(1));
+
+        // Test late data would be dropped
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+        assertEquals(0, testHarness.numEventTimeTimers());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numEventTimeTimers());
+        assertEquals(4, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(10));
+        testHarness.processWatermark2(new Watermark(10));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+        expectedOutput.add(new Watermark(10));
+        SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted(
+                "output wrong.", expectedOutput, testHarness.getOutput());
+        assertEquals(0, testHarness.numEventTimeTimers());
+        assertEquals(0, testHarness.numKeyedStateEntries());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(13));
+        testHarness.processWatermark2(new Watermark(13));
+
+        expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+        expectedOutput.add(new Watermark(13));
+        assertEquals(2, testHarness.numKeyedStateEntries());
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark1(new Watermark(18));
+        testHarness.processWatermark2(new Watermark(18));
+        expectedOutput.add(new Watermark(18));
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testInnerJoin() throws Exception {
+        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+                createTestHarness(FlinkJoinType.INNER);
+
+        testHarness.open();
+        testHarness.processWatermark1(new Watermark(1));
+        testHarness.processWatermark2(new Watermark(1));
+
+        // Test late data would be dropped
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+        assertEquals(0, testHarness.numEventTimeTimers());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numEventTimeTimers());
+        assertEquals(4, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(10));
+        testHarness.processWatermark2(new Watermark(10));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(new Watermark(10));
+        ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
+        assertEquals(0, testHarness.numEventTimeTimers());
+        assertEquals(0, testHarness.numKeyedStateEntries());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(13));
+        testHarness.processWatermark2(new Watermark(13));
+
+        expectedOutput.add(new Watermark(13));
+        assertEquals(2, testHarness.numKeyedStateEntries());
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark1(new Watermark(18));
+        testHarness.processWatermark2(new Watermark(18));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(new Watermark(18));
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testLeftOuterJoin() throws Exception {
+        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+                createTestHarness(FlinkJoinType.LEFT);
+
+        testHarness.open();
+        testHarness.processWatermark1(new Watermark(1));
+        testHarness.processWatermark2(new Watermark(1));
+
+        // Test late data would be dropped
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+        assertEquals(0, testHarness.numEventTimeTimers());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numEventTimeTimers());
+        assertEquals(4, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(10));
+        testHarness.processWatermark2(new Watermark(10));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1", null, null));
+        expectedOutput.add(new Watermark(10));
+        ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
+        assertEquals(0, testHarness.numEventTimeTimers());
+        assertEquals(0, testHarness.numKeyedStateEntries());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(13));
+        testHarness.processWatermark2(new Watermark(13));
+
+        expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1", null, null));
+        expectedOutput.add(new Watermark(13));
+        assertEquals(2, testHarness.numKeyedStateEntries());
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark1(new Watermark(18));
+        testHarness.processWatermark2(new Watermark(18));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(new Watermark(18));
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testRightOuterJoin() throws Exception {
+        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+                createTestHarness(FlinkJoinType.RIGHT);
+
+        testHarness.open();
+        testHarness.processWatermark1(new Watermark(1));
+        testHarness.processWatermark2(new Watermark(1));
+
+        // Test late data would be dropped
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+        assertEquals(0, testHarness.numEventTimeTimers());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numEventTimeTimers());
+        assertEquals(4, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(10));
+        testHarness.processWatermark2(new Watermark(10));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(insertRecord(null, null, toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+        expectedOutput.add(new Watermark(10));
+        ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
+        assertEquals(0, testHarness.numEventTimeTimers());
+        assertEquals(0, testHarness.numKeyedStateEntries());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(13));
+        testHarness.processWatermark2(new Watermark(13));
+
+        expectedOutput.add(new Watermark(13));
+        assertEquals(2, testHarness.numKeyedStateEntries());
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark1(new Watermark(18));
+        testHarness.processWatermark2(new Watermark(18));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(new Watermark(18));
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+        testHarness.close();
+    }
+
+    @Test
+    public void testOuterJoin() throws Exception {
+        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+                createTestHarness(FlinkJoinType.FULL);
+
+        testHarness.open();
+        testHarness.processWatermark1(new Watermark(1));
+        testHarness.processWatermark2(new Watermark(1));
+
+        // Test late data would be dropped
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+        assertEquals(0, testHarness.numEventTimeTimers());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numEventTimeTimers());
+        assertEquals(4, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(10));
+        testHarness.processWatermark2(new Watermark(10));
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(3L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1", null, null));
+        expectedOutput.add(insertRecord(null, null, toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+        expectedOutput.add(new Watermark(10));
+        ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
+        assertEquals(0, testHarness.numEventTimeTimers());
+        assertEquals(0, testHarness.numKeyedStateEntries());
+
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+        testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+        assertEquals(3, testHarness.numKeyedStateEntries());
+
+        testHarness.processWatermark1(new Watermark(13));
+        testHarness.processWatermark2(new Watermark(13));
+
+        expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1", null, null));
+        expectedOutput.add(new Watermark(13));
+        assertEquals(2, testHarness.numKeyedStateEntries());
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+        testHarness.processWatermark1(new Watermark(18));
+        testHarness.processWatermark2(new Watermark(18));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(
+                insertRecord(
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1",
+                        toUtcTimestampMills(15L, shiftTimeZone),
+                        "k1"));
+        expectedOutput.add(new Watermark(18));
+        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+        testHarness.close();
+    }
+
+    private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData>
+            createTestHarness(FlinkJoinType joinType) throws Exception {
+        String funcCode =
+                "public class TestWindowJoinCondition extends org.apache.flink.api.common.functions.AbstractRichFunction "
+                        + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n"
+                        + "\n"
+                        + "    public TestWindowJoinCondition(Object[] reference) {\n"
+                        + "    }\n"
+                        + "\n"
+                        + "    @Override\n"
+                        + "    public boolean apply(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) {\n"
+                        + "        return true;\n"
+                        + "    }\n"
+                        + "}\n";
+        GeneratedJoinCondition joinFunction =
+                new GeneratedJoinCondition("TestWindowJoinCondition", funcCode, new Object[0]);
+        int keyIdx = 1;
+        RowDataKeySelector keySelector =
+                HandwrittenSelectorUtil.getRowDataSelector(
+                        new int[] {keyIdx}, INPUT_ROW_TYPE.toRowFieldTypes());
+        TypeInformation<RowData> keyType = InternalTypeInfo.ofFields();
+        WindowJoinOperator operator =
+                WindowJoinOperatorBuilder.builder()
+                        .leftSerializer(INPUT_ROW_TYPE.toRowSerializer())
+                        .rightSerializer(INPUT_ROW_TYPE.toRowSerializer())
+                        .generatedJoinCondition(joinFunction)
+                        .leftWindowEndIndex(0)
+                        .rightWindowEndIndex(0)
+                        .filterNullKeys(new boolean[] {true})
+                        .joinType(joinType)
+                        .withShiftTimezone(shiftTimeZone)
+                        .build();
+        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+                new KeyedTwoInputStreamOperatorTestHarness<>(
+                        operator, keySelector, keySelector, keyType);
+        return testHarness;
+    }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
index c4d9df0..087cc4c 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
@@ -41,15 +41,20 @@ import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.utils.HandwrittenSelectorUtil;
 
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.time.ZoneId;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
 import static org.junit.Assert.assertEquals;
 
 /** Tests for window rank operators created by {@link WindowRankOperatorBuilder}. */
+@RunWith(Parameterized.class)
 public class WindowRankOperatorTest {
 
     private static final RowType INPUT_ROW_TYPE =
@@ -60,7 +65,6 @@ public class WindowRankOperatorTest {
                             new RowType.RowField("f2", new BigIntType())));
 
     private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE);
-    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
 
     private static final RowDataKeySelector KEY_SELECTOR =
             HandwrittenSelectorUtil.getRowDataSelector(
@@ -116,12 +120,31 @@ public class WindowRankOperatorTest {
                     OUTPUT_TYPES_WITHOUT_RANK_NUMBER,
                     new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
 
+    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
+    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
+    private final ZoneId shiftTimeZone;
+
+    public WindowRankOperatorTest(ZoneId shiftTimeZone) {
+        this.shiftTimeZone = shiftTimeZone;
+    }
+
+    @Parameterized.Parameters(name = "TimeZone = {0}")
+    public static Collection<Object[]> runMode() {
+        return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID});
+    }
+
+    private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
+            SlicingWindowOperator<RowData, ?> operator) throws Exception {
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
+    }
+
     @Test
     public void testTop2Windows() throws Exception {
         SlicingWindowOperator<RowData, ?> operator =
                 WindowRankOperatorBuilder.builder()
                         .inputSerializer(INPUT_ROW_SER)
-                        .shiftTimeZone(UTC_ZONE_ID)
+                        .shiftTimeZone(shiftTimeZone)
                         .keySerializer(KEY_SER)
                         .sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR)
                         .sortKeySelector(SORT_KEY_SELECTOR)
@@ -141,36 +164,51 @@ public class WindowRankOperatorTest {
         ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
         // add elements out-of-order
-        testHarness.processElement(insertRecord("key2", 1, 999L));
-        testHarness.processElement(insertRecord("key2", 4, 999L));
-        testHarness.processElement(insertRecord("key2", 5, 999L));
-        testHarness.processElement(insertRecord("key2", 3, 999L));
-        testHarness.processElement(insertRecord("key2", 2, 1999L));
-        testHarness.processElement(insertRecord("key2", 7, 3999L));
-        testHarness.processElement(insertRecord("key2", 8, 3999L));
-        testHarness.processElement(insertRecord("key2", 1, 3999L));
-
-        testHarness.processElement(insertRecord("key1", 2, 999L));
-        testHarness.processElement(insertRecord("key1", 1, 999L));
-        testHarness.processElement(insertRecord("key1", 3, 999L));
-        testHarness.processElement(insertRecord("key1", 3, 999L));
-        testHarness.processElement(insertRecord("key1", 4, 1999L));
-        testHarness.processElement(insertRecord("key1", 6, 1999L));
-        testHarness.processElement(insertRecord("key1", 7, 1999L));
+        testHarness.processElement(
+                insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone)));
+
+        testHarness.processElement(
+                insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone)));
 
         testHarness.processWatermark(new Watermark(999));
-        expectedOutput.add(insertRecord("key1", 1, 999L, 1L));
-        expectedOutput.add(insertRecord("key1", 2, 999L, 2L));
-        expectedOutput.add(insertRecord("key2", 1, 999L, 1L));
-        expectedOutput.add(insertRecord("key2", 3, 999L, 2L));
+        expectedOutput.add(insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone), 1L));
+        expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone), 2L));
+        expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone), 1L));
+        expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone), 2L));
         expectedOutput.add(new Watermark(999));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, testHarness.getOutput());
 
         testHarness.processWatermark(new Watermark(1999));
-        expectedOutput.add(insertRecord("key1", 4, 1999L, 1L));
-        expectedOutput.add(insertRecord("key1", 6, 1999L, 2L));
-        expectedOutput.add(insertRecord("key2", 2, 1999L, 1L));
+        expectedOutput.add(insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone), 1L));
+        expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone), 2L));
+        expectedOutput.add(insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone), 1L));
         expectedOutput.add(new Watermark(1999));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -187,14 +225,15 @@ public class WindowRankOperatorTest {
         testHarness.open();
 
         testHarness.processWatermark(new Watermark(3999));
-        expectedOutput.add(insertRecord("key2", 1, 3999L, 1L));
-        expectedOutput.add(insertRecord("key2", 7, 3999L, 2L));
+        expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone), 1L));
+        expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone), 2L));
         expectedOutput.add(new Watermark(3999));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, testHarness.getOutput());
 
         // late element, should be dropped
-        testHarness.processElement(insertRecord("key2", 1, 3500L));
+        testHarness.processElement(
+                insertRecord("key2", 1, toUtcTimestampMills(3500L, shiftTimeZone)));
 
         testHarness.processWatermark(new Watermark(4999));
         expectedOutput.add(new Watermark(4999));
@@ -211,7 +250,7 @@ public class WindowRankOperatorTest {
         SlicingWindowOperator<RowData, ?> operator =
                 WindowRankOperatorBuilder.builder()
                         .inputSerializer(INPUT_ROW_SER)
-                        .shiftTimeZone(UTC_ZONE_ID)
+                        .shiftTimeZone(shiftTimeZone)
                         .keySerializer(KEY_SER)
                         .sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR)
                         .sortKeySelector(SORT_KEY_SELECTOR)
@@ -231,32 +270,47 @@ public class WindowRankOperatorTest {
         ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
         // add elements out-of-order
-        testHarness.processElement(insertRecord("key2", 1, 999L));
-        testHarness.processElement(insertRecord("key2", 4, 999L));
-        testHarness.processElement(insertRecord("key2", 5, 999L));
-        testHarness.processElement(insertRecord("key2", 3, 999L));
-        testHarness.processElement(insertRecord("key2", 2, 1999L));
-        testHarness.processElement(insertRecord("key2", 7, 3999L));
-        testHarness.processElement(insertRecord("key2", 8, 3999L));
-        testHarness.processElement(insertRecord("key2", 1, 3999L));
-
-        testHarness.processElement(insertRecord("key1", 2, 999L));
-        testHarness.processElement(insertRecord("key1", 1, 999L));
-        testHarness.processElement(insertRecord("key1", 3, 999L));
-        testHarness.processElement(insertRecord("key1", 3, 999L));
-        testHarness.processElement(insertRecord("key1", 4, 1999L));
-        testHarness.processElement(insertRecord("key1", 6, 1999L));
-        testHarness.processElement(insertRecord("key1", 7, 1999L));
+        testHarness.processElement(
+                insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone)));
+
+        testHarness.processElement(
+                insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone)));
 
         testHarness.processWatermark(new Watermark(999));
-        expectedOutput.add(insertRecord("key1", 2, 999L, 2L));
-        expectedOutput.add(insertRecord("key2", 3, 999L, 2L));
+        expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone), 2L));
+        expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone), 2L));
         expectedOutput.add(new Watermark(999));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, testHarness.getOutput());
 
         testHarness.processWatermark(new Watermark(1999));
-        expectedOutput.add(insertRecord("key1", 6, 1999L, 2L));
+        expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone), 2L));
         expectedOutput.add(new Watermark(1999));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -273,7 +327,7 @@ public class WindowRankOperatorTest {
         testHarness.open();
 
         testHarness.processWatermark(new Watermark(3999));
-        expectedOutput.add(insertRecord("key2", 7, 3999L, 2L));
+        expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone), 2L));
         expectedOutput.add(new Watermark(3999));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -285,7 +339,7 @@ public class WindowRankOperatorTest {
         SlicingWindowOperator<RowData, ?> operator =
                 WindowRankOperatorBuilder.builder()
                         .inputSerializer(INPUT_ROW_SER)
-                        .shiftTimeZone(UTC_ZONE_ID)
+                        .shiftTimeZone(shiftTimeZone)
                         .keySerializer(KEY_SER)
                         .sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR)
                         .sortKeySelector(SORT_KEY_SELECTOR)
@@ -305,36 +359,51 @@ public class WindowRankOperatorTest {
         ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
         // add elements out-of-order
-        testHarness.processElement(insertRecord("key2", 1, 999L));
-        testHarness.processElement(insertRecord("key2", 4, 999L));
-        testHarness.processElement(insertRecord("key2", 5, 999L));
-        testHarness.processElement(insertRecord("key2", 3, 999L));
-        testHarness.processElement(insertRecord("key2", 2, 1999L));
-        testHarness.processElement(insertRecord("key2", 7, 3999L));
-        testHarness.processElement(insertRecord("key2", 8, 3999L));
-        testHarness.processElement(insertRecord("key2", 1, 3999L));
-
-        testHarness.processElement(insertRecord("key1", 2, 999L));
-        testHarness.processElement(insertRecord("key1", 1, 999L));
-        testHarness.processElement(insertRecord("key1", 3, 999L));
-        testHarness.processElement(insertRecord("key1", 3, 999L));
-        testHarness.processElement(insertRecord("key1", 4, 1999L));
-        testHarness.processElement(insertRecord("key1", 6, 1999L));
-        testHarness.processElement(insertRecord("key1", 7, 1999L));
+        testHarness.processElement(
+                insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone)));
+
+        testHarness.processElement(
+                insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone)));
+        testHarness.processElement(
+                insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone)));
 
         testHarness.processWatermark(new Watermark(999));
-        expectedOutput.add(insertRecord("key1", 1, 999L));
-        expectedOutput.add(insertRecord("key1", 2, 999L));
-        expectedOutput.add(insertRecord("key2", 1, 999L));
-        expectedOutput.add(insertRecord("key2", 3, 999L));
+        expectedOutput.add(insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+        expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone)));
+        expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+        expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone)));
         expectedOutput.add(new Watermark(999));
         ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, testHarness.getOutput());
 
         testHarness.processWatermark(new Watermark(1999));
-        expectedOutput.add(insertRecord("key1", 4, 1999L));
-        expectedOutput.add(insertRecord("key1", 6, 1999L));
-        expectedOutput.add(insertRecord("key2", 2, 1999L));
+        expectedOutput.add(insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone)));
+        expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone)));
+        expectedOutput.add(insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone)));
         expectedOutput.add(new Watermark(1999));
         ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -351,18 +420,12 @@ public class WindowRankOperatorTest {
         testHarness.open();
 
         testHarness.processWatermark(new Watermark(3999));
-        expectedOutput.add(insertRecord("key2", 1, 3999L));
-        expectedOutput.add(insertRecord("key2", 7, 3999L));
+        expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone)));
+        expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone)));
         expectedOutput.add(new Watermark(3999));
         ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, testHarness.getOutput());
 
         testHarness.close();
     }
-
-    private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
-            SlicingWindowOperator<RowData, ?> operator) throws Exception {
-        return new KeyedOneInputStreamOperatorTestHarness<>(
-                operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
-    }
 }