You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/05/08 02:30:02 UTC
[flink] branch master updated: [FLINK-19606][table-runtime-blink]
Introduce WindowJoinOperator and WindowJoinOperatorBuilder
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 89e81c2 [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder
89e81c2 is described below
commit 89e81c256990912ab15ac9a0939abba2606d4aca
Author: Jing Zhang <be...@126.com>
AuthorDate: Sat May 8 10:29:43 2021 +0800
[FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder
This closes #15760
---
.../operators/join/window/WindowJoinOperator.java | 594 +++++++++++++++++++++
.../join/window/WindowJoinOperatorBuilder.java | 187 +++++++
.../operators/window/state/WindowListState.java | 61 +++
.../join/window/WindowJoinOperatorTest.java | 534 ++++++++++++++++++
.../rank/window/WindowRankOperatorTest.java | 225 +++++---
5 files changed, 1520 insertions(+), 81 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
new file mode 100644
index 0000000..d51064f
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl;
+import org.apache.flink.table.runtime.operators.window.state.WindowListState;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.types.RowKind;
+
+import java.time.ZoneId;
+import java.util.IdentityHashMap;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
+
+/**
+ * Streaming window join operator.
+ *
+ * <p>Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus
+ * late elements (elements belong to emitted windows) will be simply dropped.
+ *
+ * <p>Note: currently, {@link WindowJoinOperator} doesn't support DELETE or UPDATE_BEFORE input row.
+ */
+public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
+ implements TwoInputStreamOperator<RowData, RowData, RowData>,
+ Triggerable<RowData, Long>,
+ KeyContext {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+ "leftNumLateRecordsDropped";
+ private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+ "leftLateRecordsDroppedRate";
+ private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+ "rightNumLateRecordsDropped";
+ private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+ "rightLateRecordsDroppedRate";
+ private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
+ private static final String LEFT_RECORDS_STATE_NAME = "left-records";
+ private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
+
+ protected final RowDataSerializer leftSerializer;
+ protected final RowDataSerializer rightSerializer;
+ private final GeneratedJoinCondition generatedJoinCondition;
+
+ private final int leftWindowEndIndex;
+ private final int rightWindowEndIndex;
+
+ private final boolean[] filterNullKeys;
+ private final ZoneId shiftTimeZone;
+
+ /** Flag to prevent duplicate function.close() calls in close() and dispose(). */
+ private transient boolean functionsClosed = false;
+
+ private transient WindowTimerService<Long> windowTimerService;
+
+ // ------------------------------------------------------------------------
+ protected transient JoinConditionWithNullFilters joinCondition;
+
+ /** This is used for emitting elements with a given timestamp. */
+ protected transient TimestampedCollector<RowData> collector;
+
+ private transient WindowListState<Long> leftWindowState;
+ private transient WindowListState<Long> rightWindowState;
+
+ // ------------------------------------------------------------------------
+ // Metrics
+ // ------------------------------------------------------------------------
+
+ private transient Counter leftNumLateRecordsDropped;
+ private transient Meter leftLateRecordsDroppedRate;
+ private transient Counter rightNumLateRecordsDropped;
+ private transient Meter rightLateRecordsDroppedRate;
+ private transient Gauge<Long> watermarkLatency;
+
+ WindowJoinOperator(
+ TypeSerializer<RowData> leftSerializer,
+ TypeSerializer<RowData> rightSerializer,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys,
+ ZoneId shiftTimeZone) {
+ this.leftSerializer = (RowDataSerializer) leftSerializer;
+ this.rightSerializer = (RowDataSerializer) rightSerializer;
+ this.generatedJoinCondition = generatedJoinCondition;
+ this.leftWindowEndIndex = leftWindowEndIndex;
+ this.rightWindowEndIndex = rightWindowEndIndex;
+ this.filterNullKeys = filterNullKeys;
+ this.shiftTimeZone = shiftTimeZone;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ functionsClosed = false;
+
+ this.collector = new TimestampedCollector<>(output);
+ collector.eraseTimestamp();
+
+ final LongSerializer windowSerializer = LongSerializer.INSTANCE;
+
+ InternalTimerService<Long> internalTimerService =
+ getInternalTimerService("window-timers", windowSerializer, this);
+ this.windowTimerService = new WindowTimerServiceImpl(internalTimerService, shiftTimeZone);
+
+ // init join condition
+ JoinCondition condition =
+ generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ this.joinCondition = new JoinConditionWithNullFilters(condition, filterNullKeys, this);
+ this.joinCondition.setRuntimeContext(getRuntimeContext());
+ this.joinCondition.open(new Configuration());
+
+ // init state
+ ListStateDescriptor<RowData> leftRecordStateDesc =
+ new ListStateDescriptor<>(LEFT_RECORDS_STATE_NAME, leftSerializer);
+ ListState<RowData> leftListState =
+ getOrCreateKeyedState(windowSerializer, leftRecordStateDesc);
+ this.leftWindowState =
+ new WindowListState<>((InternalListState<RowData, Long, RowData>) leftListState);
+
+ ListStateDescriptor<RowData> rightRecordStateDesc =
+ new ListStateDescriptor<>(RIGHT_RECORDS_STATE_NAME, rightSerializer);
+ ListState<RowData> rightListState =
+ getOrCreateKeyedState(windowSerializer, rightRecordStateDesc);
+ this.rightWindowState =
+ new WindowListState<>((InternalListState<RowData, Long, RowData>) rightListState);
+
+ // metrics
+ this.leftNumLateRecordsDropped = metrics.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
+ this.leftLateRecordsDroppedRate =
+ metrics.meter(
+ LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+ new MeterView(leftNumLateRecordsDropped));
+ this.rightNumLateRecordsDropped = metrics.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
+ this.rightLateRecordsDroppedRate =
+ metrics.meter(
+ RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+ new MeterView(rightNumLateRecordsDropped));
+ this.watermarkLatency =
+ metrics.gauge(
+ WATERMARK_LATENCY_METRIC_NAME,
+ () -> {
+ long watermark = windowTimerService.currentWatermark();
+ if (watermark < 0) {
+ return 0L;
+ } else {
+ return windowTimerService.currentProcessingTime() - watermark;
+ }
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ collector = null;
+ functionsClosed = true;
+ if (joinCondition != null) {
+ joinCondition.close();
+ }
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ super.dispose();
+ collector = null;
+ if (!functionsClosed) {
+ functionsClosed = true;
+ if (joinCondition != null) {
+ joinCondition.close();
+ }
+ }
+ }
+
+ @Override
+ public void processElement1(StreamRecord<RowData> element) throws Exception {
+ processElement(element, leftWindowEndIndex, leftLateRecordsDroppedRate, leftWindowState);
+ }
+
+ @Override
+ public void processElement2(StreamRecord<RowData> element) throws Exception {
+ processElement(element, rightWindowEndIndex, rightLateRecordsDroppedRate, rightWindowState);
+ }
+
+ private void processElement(
+ StreamRecord<RowData> element,
+ int windowEndIndex,
+ Meter lateRecordsDroppedRate,
+ WindowListState<Long> recordState)
+ throws Exception {
+ RowData inputRow = element.getValue();
+ long windowEnd = inputRow.getLong(windowEndIndex);
+ if (isWindowFired(windowEnd, windowTimerService.currentWatermark(), shiftTimeZone)) {
+ // element is late and should be dropped
+ lateRecordsDroppedRate.markEvent();
+ return;
+ }
+ if (RowDataUtil.isAccumulateMsg(inputRow)) {
+ recordState.add(windowEnd, inputRow);
+ } else {
+ // Window join could not handle retraction input stream
+ throw new UnsupportedOperationException(
+ "This is a bug and should not happen. Please file an issue.");
+ }
+ // always register time for every element
+ windowTimerService.registerEventTimeWindowTimer(windowEnd);
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<RowData, Long> timer) throws Exception {
+ // Window join only support event-time now
+ throw new UnsupportedOperationException(
+ "This is a bug and should not happen. Please file an issue.");
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<RowData, Long> timer) throws Exception {
+ setCurrentKey(timer.getKey());
+ Long window = timer.getNamespace();
+ // join left records and right records
+ List<RowData> leftData = leftWindowState.get(window);
+ List<RowData> rightData = rightWindowState.get(window);
+ join(leftData, rightData);
+ // clear state
+ if (leftData != null) {
+ leftWindowState.clear(window);
+ }
+ if (rightData != null) {
+ rightWindowState.clear(window);
+ }
+ }
+
+ public abstract void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords);
+
+ static class SemiAntiJoinOperator extends WindowJoinOperator {
+
+ private final boolean isAntiJoin;
+
+ SemiAntiJoinOperator(
+ TypeSerializer<RowData> leftSerializer,
+ TypeSerializer<RowData> rightSerializer,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys,
+ boolean isAntiJoin,
+ ZoneId shiftTimeZone) {
+ super(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ shiftTimeZone);
+ this.isAntiJoin = isAntiJoin;
+ }
+
+ @Override
+ public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) {
+ if (leftRecords == null) {
+ return;
+ }
+ if (rightRecords == null) {
+ if (isAntiJoin) {
+ for (RowData leftRecord : leftRecords) {
+ collector.collect(leftRecord);
+ }
+ }
+ return;
+ }
+ for (RowData leftRecord : leftRecords) {
+ boolean matches = false;
+ for (RowData rightRecord : rightRecords) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ matches = true;
+ break;
+ }
+ }
+ }
+ if (matches) {
+ if (!isAntiJoin) {
+ // emit left record if there are matched rows on the other side
+ collector.collect(leftRecord);
+ }
+ } else {
+ if (isAntiJoin) {
+ // emit left record if there is no matched row on the other side
+ collector.collect(leftRecord);
+ }
+ }
+ }
+ }
+ }
+
+ static class InnerJoinOperator extends WindowJoinOperator {
+ private transient JoinedRowData outRow;
+
+ InnerJoinOperator(
+ TypeSerializer<RowData> leftSerializer,
+ TypeSerializer<RowData> rightSerializer,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys,
+ ZoneId shiftTimeZone) {
+ super(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ shiftTimeZone);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ outRow = new JoinedRowData();
+ }
+
+ @Override
+ public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) {
+ if (leftRecords == null || rightRecords == null) {
+ return;
+ }
+ for (RowData leftRecord : leftRecords) {
+ for (RowData rightRecord : rightRecords) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ outRow.setRowKind(RowKind.INSERT);
+ outRow.replace(leftRecord, rightRecord);
+ collector.collect(outRow);
+ }
+ }
+ }
+ }
+ }
+
+ private abstract static class AbstractOuterJoinOperator extends WindowJoinOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient RowData leftNullRow;
+ private transient RowData rightNullRow;
+ private transient JoinedRowData outRow;
+
+ AbstractOuterJoinOperator(
+ TypeSerializer<RowData> leftSerializer,
+ TypeSerializer<RowData> rightSerializer,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys,
+ ZoneId shiftTimeZone) {
+ super(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ shiftTimeZone);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ leftNullRow = new GenericRowData(leftSerializer.getArity());
+ rightNullRow = new GenericRowData(rightSerializer.getArity());
+ outRow = new JoinedRowData();
+ }
+
+ protected void outputNullPadding(RowData row, boolean isLeft) {
+ if (isLeft) {
+ outRow.replace(row, rightNullRow);
+ } else {
+ outRow.replace(leftNullRow, row);
+ }
+ outRow.setRowKind(RowKind.INSERT);
+ collector.collect(outRow);
+ }
+
+ protected void outputNullPadding(Iterable<RowData> rows, boolean isLeft) {
+ for (RowData row : rows) {
+ outputNullPadding(row, isLeft);
+ }
+ }
+
+ protected void output(RowData inputRow, RowData otherRow, boolean inputIsLeft) {
+ if (inputIsLeft) {
+ outRow.replace(inputRow, otherRow);
+ } else {
+ outRow.replace(otherRow, inputRow);
+ }
+ outRow.setRowKind(RowKind.INSERT);
+ collector.collect(outRow);
+ }
+ }
+
+ static class LeftOuterJoinOperator extends AbstractOuterJoinOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ LeftOuterJoinOperator(
+ TypeSerializer<RowData> leftSerializer,
+ TypeSerializer<RowData> rightSerializer,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys,
+ ZoneId shiftTimeZone) {
+ super(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ shiftTimeZone);
+ }
+
+ @Override
+ public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) {
+ if (leftRecords == null) {
+ return;
+ }
+ if (rightRecords == null) {
+ outputNullPadding(leftRecords, true);
+ } else {
+ for (RowData leftRecord : leftRecords) {
+ boolean matches = false;
+ for (RowData rightRecord : rightRecords) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ output(leftRecord, rightRecord, true);
+ matches = true;
+ }
+ }
+ if (!matches) {
+ // padding null for left side
+ outputNullPadding(leftRecord, true);
+ }
+ }
+ }
+ }
+ }
+
+ static class RightOuterJoinOperator extends AbstractOuterJoinOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ RightOuterJoinOperator(
+ TypeSerializer<RowData> leftSerializer,
+ TypeSerializer<RowData> rightSerializer,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys,
+ ZoneId shiftTimeZone) {
+ super(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ shiftTimeZone);
+ }
+
+ @Override
+ public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) {
+ if (rightRecords == null) {
+ return;
+ }
+ if (leftRecords == null) {
+ outputNullPadding(rightRecords, false);
+ } else {
+ for (RowData rightRecord : rightRecords) {
+ boolean matches = false;
+ for (RowData leftRecord : leftRecords) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ output(leftRecord, rightRecord, true);
+ matches = true;
+ }
+ }
+ if (!matches) {
+ outputNullPadding(rightRecord, false);
+ }
+ }
+ }
+ }
+ }
+
+ static class FullOuterJoinOperator extends AbstractOuterJoinOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ FullOuterJoinOperator(
+ TypeSerializer<RowData> leftSerializer,
+ TypeSerializer<RowData> rightSerializer,
+ GeneratedJoinCondition generatedJoinCondition,
+ int leftWindowEndIndex,
+ int rightWindowEndIndex,
+ boolean[] filterNullKeys,
+ ZoneId shiftTimeZone) {
+ super(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ shiftTimeZone);
+ }
+
+ @Override
+ public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords) {
+ if (leftRecords == null && rightRecords == null) {
+ return;
+ }
+ if (rightRecords == null) {
+ outputNullPadding(leftRecords, true);
+ } else if (leftRecords == null) {
+ outputNullPadding(rightRecords, false);
+ } else {
+ IdentityHashMap<RowData, Boolean> emittedRightRecords = new IdentityHashMap<>();
+ for (RowData leftRecord : leftRecords) {
+ boolean matches = false;
+ for (RowData rightRecord : rightRecords) {
+ if (joinCondition.apply(leftRecord, rightRecord)) {
+ output(leftRecord, rightRecord, true);
+ matches = true;
+ emittedRightRecords.put(rightRecord, Boolean.TRUE);
+ }
+ }
+ // padding null for left side
+ if (!matches) {
+ outputNullPadding(leftRecord, true);
+ }
+ }
+ // padding null for never emitted right side
+ for (RowData rightRecord : rightRecords) {
+ if (!emittedRightRecords.containsKey(rightRecord)) {
+ outputNullPadding(rightRecord, false);
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java
new file mode 100644
index 0000000..c0e5665
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+
+import java.time.ZoneId;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link WindowJoinOperatorBuilder} is used to build a {@link WindowJoinOperator} for window
+ * join.
+ *
+ * <pre>
+ * WindowJoinOperatorBuilder.builder()
+ * .leftType(leftType)
+ * .rightType(rightType)
+ * .generatedJoinCondition(generatedJoinCondition)
+ * .leftWindowEndIndex(leftWindowEndIndex)
+ * .rightWindowEndIndex(rightWindowEndIndex)
+ * .filterNullKeys(filterNullKeys)
+ * .joinType(joinType)
+ * .build();
+ * </pre>
+ */
+public class WindowJoinOperatorBuilder {
+
+ public static WindowJoinOperatorBuilder builder() {
+ return new WindowJoinOperatorBuilder();
+ }
+
+ private TypeSerializer<RowData> leftSerializer;
+ private TypeSerializer<RowData> rightSerializer;
+ private GeneratedJoinCondition generatedJoinCondition;
+ private int leftWindowEndIndex = -1;
+ private int rightWindowEndIndex = -1;
+ private boolean[] filterNullKeys;
+ private FlinkJoinType joinType;
+ private ZoneId shiftTimeZone = ZoneId.of("UTC");
+
+ public WindowJoinOperatorBuilder leftSerializer(TypeSerializer<RowData> leftSerializer) {
+ this.leftSerializer = leftSerializer;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder rightSerializer(TypeSerializer<RowData> rightSerializer) {
+ this.rightSerializer = rightSerializer;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder generatedJoinCondition(
+ GeneratedJoinCondition generatedJoinCondition) {
+ this.generatedJoinCondition = generatedJoinCondition;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder filterNullKeys(boolean[] filterNullKeys) {
+ this.filterNullKeys = filterNullKeys;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder joinType(FlinkJoinType joinType) {
+ this.joinType = joinType;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder leftWindowEndIndex(int leftWindowEndIndex) {
+ this.leftWindowEndIndex = leftWindowEndIndex;
+ return this;
+ }
+
+ public WindowJoinOperatorBuilder rightWindowEndIndex(int rightWindowEndIndex) {
+ this.rightWindowEndIndex = rightWindowEndIndex;
+ return this;
+ }
+
+ /**
+ * The shift timezone of the window, if the proctime or rowtime type is TIMESTAMP_LTZ, the shift
+ * timezone is the timezone user configured in TableConfig, other cases the timezone is UTC
+ * which means never shift when assigning windows.
+ */
+ public WindowJoinOperatorBuilder withShiftTimezone(ZoneId shiftTimeZone) {
+ this.shiftTimeZone = shiftTimeZone;
+ return this;
+ }
+
+ public WindowJoinOperator build() {
+ checkNotNull(leftSerializer);
+ checkNotNull(rightSerializer);
+ checkNotNull(generatedJoinCondition);
+ checkNotNull(filterNullKeys);
+ checkNotNull(joinType);
+
+ checkArgument(
+ leftWindowEndIndex >= 0,
+ String.format(
+ "Illegal window end index %s, it should not be negative!",
+ leftWindowEndIndex));
+ checkArgument(
+ rightWindowEndIndex >= 0,
+ String.format(
+ "Illegal window end index %s, it should not be negative!",
+ rightWindowEndIndex));
+
+ switch (joinType) {
+ case INNER:
+ return new WindowJoinOperator.InnerJoinOperator(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ shiftTimeZone);
+ case SEMI:
+ return new WindowJoinOperator.SemiAntiJoinOperator(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ false,
+ shiftTimeZone);
+ case ANTI:
+ return new WindowJoinOperator.SemiAntiJoinOperator(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ true,
+ shiftTimeZone);
+ case LEFT:
+ return new WindowJoinOperator.LeftOuterJoinOperator(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ shiftTimeZone);
+ case RIGHT:
+ return new WindowJoinOperator.RightOuterJoinOperator(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ shiftTimeZone);
+ case FULL:
+ return new WindowJoinOperator.FullOuterJoinOperator(
+ leftSerializer,
+ rightSerializer,
+ generatedJoinCondition,
+ leftWindowEndIndex,
+ rightWindowEndIndex,
+ filterNullKeys,
+ shiftTimeZone);
+ default:
+ throw new IllegalArgumentException("Invalid join type: " + joinType);
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java
new file mode 100644
index 0000000..1de0ad0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/state/WindowListState.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/** A wrapper of {@link ListState} which is easier to update based on window namespace. */
+public final class WindowListState<W> implements WindowState<W> {
+
+ private final InternalListState<RowData, W, RowData> windowState;
+
+ public WindowListState(InternalListState<RowData, W, RowData> windowState) {
+ this.windowState = windowState;
+ }
+
+ public void clear(W window) {
+ windowState.setCurrentNamespace(window);
+ windowState.clear();
+ }
+
+ public List<RowData> get(W window) throws Exception {
+ windowState.setCurrentNamespace(window);
+ return windowState.getInternal();
+ }
+
+ /**
+ * Updates the operator state accessible by {@link #get(W)})} by adding the given value to the
+ * list of values. The next time {@link #get(W)} is called (for the same state partition) the
+ * returned state will represent the updated list.
+ *
+ * <p>If null is passed in, the state value will remain unchanged.
+ *
+ * @param window The namespace for the state.
+ * @param value The new value for the state.
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public void add(W window, RowData value) throws Exception {
+ windowState.setCurrentNamespace(window);
+ windowState.add(value);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
new file mode 100644
index 0000000..3b7d093
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for window join operators created by {@link WindowJoinOperatorBuilder}. */
+@RunWith(Parameterized.class)
+public class WindowJoinOperatorTest {
+
+ private static final InternalTypeInfo<RowData> INPUT_ROW_TYPE =
+ InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH));
+
+ private static final InternalTypeInfo<RowData> OUTPUT_ROW_TYPE =
+ InternalTypeInfo.ofFields(
+ new BigIntType(),
+ new VarCharType(VarCharType.MAX_LENGTH),
+ new BigIntType(),
+ new VarCharType(VarCharType.MAX_LENGTH));
+
+ private static final RowDataHarnessAssertor ASSERTER =
+ new RowDataHarnessAssertor(OUTPUT_ROW_TYPE.toRowFieldTypes());
+
+ private static final RowDataHarnessAssertor SEMI_ANTI_JOIN_ASSERTER =
+ new RowDataHarnessAssertor(INPUT_ROW_TYPE.toRowFieldTypes());
+
+ private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
+
+ private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
+
+ private final ZoneId shiftTimeZone;
+
+ public WindowJoinOperatorTest(ZoneId shiftTimeZone) {
+ this.shiftTimeZone = shiftTimeZone;
+ }
+
+ @Parameterized.Parameters(name = "TimeZone = {0}")
+ public static Collection<Object[]> runMode() {
+ return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID});
+ }
+
+ @Test
+ public void testSemiJoin() throws Exception {
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+ createTestHarness(FlinkJoinType.SEMI);
+
+ testHarness.open();
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+
+ // Test late data would be dropped
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numEventTimeTimers());
+ assertEquals(4, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(10));
+ testHarness.processWatermark2(new Watermark(10));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ expectedOutput.add(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ expectedOutput.add(new Watermark(10));
+ SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted(
+ "output wrong.", expectedOutput, testHarness.getOutput());
+ assertEquals(0, testHarness.numEventTimeTimers());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(13));
+ testHarness.processWatermark2(new Watermark(13));
+
+ expectedOutput.add(new Watermark(13));
+ assertEquals(2, testHarness.numKeyedStateEntries());
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processWatermark1(new Watermark(18));
+ testHarness.processWatermark2(new Watermark(18));
+ expectedOutput.add(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ expectedOutput.add(new Watermark(18));
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testAntiJoin() throws Exception {
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+ createTestHarness(FlinkJoinType.ANTI);
+ testHarness.open();
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+
+ // Test late data would be dropped
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numEventTimeTimers());
+ assertEquals(4, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(10));
+ testHarness.processWatermark2(new Watermark(10));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+ expectedOutput.add(new Watermark(10));
+ SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted(
+ "output wrong.", expectedOutput, testHarness.getOutput());
+ assertEquals(0, testHarness.numEventTimeTimers());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(13));
+ testHarness.processWatermark2(new Watermark(13));
+
+ expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+ expectedOutput.add(new Watermark(13));
+ assertEquals(2, testHarness.numKeyedStateEntries());
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processWatermark1(new Watermark(18));
+ testHarness.processWatermark2(new Watermark(18));
+ expectedOutput.add(new Watermark(18));
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testInnerJoin() throws Exception {
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+ createTestHarness(FlinkJoinType.INNER);
+
+ testHarness.open();
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+
+ // Test late data would be dropped
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numEventTimeTimers());
+ assertEquals(4, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(10));
+ testHarness.processWatermark2(new Watermark(10));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(new Watermark(10));
+ ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
+ assertEquals(0, testHarness.numEventTimeTimers());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(13));
+ testHarness.processWatermark2(new Watermark(13));
+
+ expectedOutput.add(new Watermark(13));
+ assertEquals(2, testHarness.numKeyedStateEntries());
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processWatermark1(new Watermark(18));
+ testHarness.processWatermark2(new Watermark(18));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(new Watermark(18));
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testLeftOuterJoin() throws Exception {
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+ createTestHarness(FlinkJoinType.LEFT);
+
+ testHarness.open();
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+
+ // Test late data would be dropped
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numEventTimeTimers());
+ assertEquals(4, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(10));
+ testHarness.processWatermark2(new Watermark(10));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1", null, null));
+ expectedOutput.add(new Watermark(10));
+ ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
+ assertEquals(0, testHarness.numEventTimeTimers());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(13));
+ testHarness.processWatermark2(new Watermark(13));
+
+ expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1", null, null));
+ expectedOutput.add(new Watermark(13));
+ assertEquals(2, testHarness.numKeyedStateEntries());
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processWatermark1(new Watermark(18));
+ testHarness.processWatermark2(new Watermark(18));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(new Watermark(18));
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testRightOuterJoin() throws Exception {
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+ createTestHarness(FlinkJoinType.RIGHT);
+
+ testHarness.open();
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+
+ // Test late data would be dropped
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numEventTimeTimers());
+ assertEquals(4, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(10));
+ testHarness.processWatermark2(new Watermark(10));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(insertRecord(null, null, toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+ expectedOutput.add(new Watermark(10));
+ ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
+ assertEquals(0, testHarness.numEventTimeTimers());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(13));
+ testHarness.processWatermark2(new Watermark(13));
+
+ expectedOutput.add(new Watermark(13));
+ assertEquals(2, testHarness.numKeyedStateEntries());
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processWatermark1(new Watermark(18));
+ testHarness.processWatermark2(new Watermark(18));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(new Watermark(18));
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+ testHarness.close();
+ }
+
+ @Test
+ public void testOuterJoin() throws Exception {
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+ createTestHarness(FlinkJoinType.FULL);
+
+ testHarness.open();
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+
+ // Test late data would be dropped
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(1L, shiftTimeZone), "k1"));
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(3L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numEventTimeTimers());
+ assertEquals(4, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(10));
+ testHarness.processWatermark2(new Watermark(10));
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(new Watermark(1));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(3L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(insertRecord(toUtcTimestampMills(6L, shiftTimeZone), "k1", null, null));
+ expectedOutput.add(insertRecord(null, null, toUtcTimestampMills(9L, shiftTimeZone), "k1"));
+ expectedOutput.add(new Watermark(10));
+ ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
+ assertEquals(0, testHarness.numEventTimeTimers());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1"));
+ testHarness.processElement1(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ testHarness.processElement2(insertRecord(toUtcTimestampMills(15L, shiftTimeZone), "k1"));
+ assertEquals(3, testHarness.numKeyedStateEntries());
+
+ testHarness.processWatermark1(new Watermark(13));
+ testHarness.processWatermark2(new Watermark(13));
+
+ expectedOutput.add(insertRecord(toUtcTimestampMills(12L, shiftTimeZone), "k1", null, null));
+ expectedOutput.add(new Watermark(13));
+ assertEquals(2, testHarness.numKeyedStateEntries());
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processWatermark1(new Watermark(18));
+ testHarness.processWatermark2(new Watermark(18));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(
+ insertRecord(
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1",
+ toUtcTimestampMills(15L, shiftTimeZone),
+ "k1"));
+ expectedOutput.add(new Watermark(18));
+ ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
+ testHarness.close();
+ }
+
+ private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData>
+ createTestHarness(FlinkJoinType joinType) throws Exception {
+ String funcCode =
+ "public class TestWindowJoinCondition extends org.apache.flink.api.common.functions.AbstractRichFunction "
+ + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n"
+ + "\n"
+ + " public TestWindowJoinCondition(Object[] reference) {\n"
+ + " }\n"
+ + "\n"
+ + " @Override\n"
+ + " public boolean apply(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) {\n"
+ + " return true;\n"
+ + " }\n"
+ + "}\n";
+ GeneratedJoinCondition joinFunction =
+ new GeneratedJoinCondition("TestWindowJoinCondition", funcCode, new Object[0]);
+ int keyIdx = 1;
+ RowDataKeySelector keySelector =
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {keyIdx}, INPUT_ROW_TYPE.toRowFieldTypes());
+ TypeInformation<RowData> keyType = InternalTypeInfo.ofFields();
+ WindowJoinOperator operator =
+ WindowJoinOperatorBuilder.builder()
+ .leftSerializer(INPUT_ROW_TYPE.toRowSerializer())
+ .rightSerializer(INPUT_ROW_TYPE.toRowSerializer())
+ .generatedJoinCondition(joinFunction)
+ .leftWindowEndIndex(0)
+ .rightWindowEndIndex(0)
+ .filterNullKeys(new boolean[] {true})
+ .joinType(joinType)
+ .withShiftTimezone(shiftTimeZone)
+ .build();
+ KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator, keySelector, keySelector, keyType);
+ return testHarness;
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
index c4d9df0..087cc4c 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
@@ -41,15 +41,20 @@ import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.time.ZoneId;
import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
import static org.junit.Assert.assertEquals;
/** Tests for window rank operators created by {@link WindowRankOperatorBuilder}. */
+@RunWith(Parameterized.class)
public class WindowRankOperatorTest {
private static final RowType INPUT_ROW_TYPE =
@@ -60,7 +65,6 @@ public class WindowRankOperatorTest {
new RowType.RowField("f2", new BigIntType())));
private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE);
- private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
private static final RowDataKeySelector KEY_SELECTOR =
HandwrittenSelectorUtil.getRowDataSelector(
@@ -116,12 +120,31 @@ public class WindowRankOperatorTest {
OUTPUT_TYPES_WITHOUT_RANK_NUMBER,
new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH)));
+ private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
+ private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
+ private final ZoneId shiftTimeZone;
+
+ public WindowRankOperatorTest(ZoneId shiftTimeZone) {
+ this.shiftTimeZone = shiftTimeZone;
+ }
+
+ @Parameterized.Parameters(name = "TimeZone = {0}")
+ public static Collection<Object[]> runMode() {
+ return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] {SHANGHAI_ZONE_ID});
+ }
+
+ private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
+ SlicingWindowOperator<RowData, ?> operator) throws Exception {
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
+ }
+
@Test
public void testTop2Windows() throws Exception {
SlicingWindowOperator<RowData, ?> operator =
WindowRankOperatorBuilder.builder()
.inputSerializer(INPUT_ROW_SER)
- .shiftTimeZone(UTC_ZONE_ID)
+ .shiftTimeZone(shiftTimeZone)
.keySerializer(KEY_SER)
.sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR)
.sortKeySelector(SORT_KEY_SELECTOR)
@@ -141,36 +164,51 @@ public class WindowRankOperatorTest {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
// add elements out-of-order
- testHarness.processElement(insertRecord("key2", 1, 999L));
- testHarness.processElement(insertRecord("key2", 4, 999L));
- testHarness.processElement(insertRecord("key2", 5, 999L));
- testHarness.processElement(insertRecord("key2", 3, 999L));
- testHarness.processElement(insertRecord("key2", 2, 1999L));
- testHarness.processElement(insertRecord("key2", 7, 3999L));
- testHarness.processElement(insertRecord("key2", 8, 3999L));
- testHarness.processElement(insertRecord("key2", 1, 3999L));
-
- testHarness.processElement(insertRecord("key1", 2, 999L));
- testHarness.processElement(insertRecord("key1", 1, 999L));
- testHarness.processElement(insertRecord("key1", 3, 999L));
- testHarness.processElement(insertRecord("key1", 3, 999L));
- testHarness.processElement(insertRecord("key1", 4, 1999L));
- testHarness.processElement(insertRecord("key1", 6, 1999L));
- testHarness.processElement(insertRecord("key1", 7, 1999L));
+ testHarness.processElement(
+ insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone)));
+
+ testHarness.processElement(
+ insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone)));
testHarness.processWatermark(new Watermark(999));
- expectedOutput.add(insertRecord("key1", 1, 999L, 1L));
- expectedOutput.add(insertRecord("key1", 2, 999L, 2L));
- expectedOutput.add(insertRecord("key2", 1, 999L, 1L));
- expectedOutput.add(insertRecord("key2", 3, 999L, 2L));
+ expectedOutput.add(insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone), 1L));
+ expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone), 2L));
+ expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone), 1L));
+ expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone), 2L));
expectedOutput.add(new Watermark(999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(1999));
- expectedOutput.add(insertRecord("key1", 4, 1999L, 1L));
- expectedOutput.add(insertRecord("key1", 6, 1999L, 2L));
- expectedOutput.add(insertRecord("key2", 2, 1999L, 1L));
+ expectedOutput.add(insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone), 1L));
+ expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone), 2L));
+ expectedOutput.add(insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone), 1L));
expectedOutput.add(new Watermark(1999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -187,14 +225,15 @@ public class WindowRankOperatorTest {
testHarness.open();
testHarness.processWatermark(new Watermark(3999));
- expectedOutput.add(insertRecord("key2", 1, 3999L, 1L));
- expectedOutput.add(insertRecord("key2", 7, 3999L, 2L));
+ expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone), 1L));
+ expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone), 2L));
expectedOutput.add(new Watermark(3999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());
// late element, should be dropped
- testHarness.processElement(insertRecord("key2", 1, 3500L));
+ testHarness.processElement(
+ insertRecord("key2", 1, toUtcTimestampMills(3500L, shiftTimeZone)));
testHarness.processWatermark(new Watermark(4999));
expectedOutput.add(new Watermark(4999));
@@ -211,7 +250,7 @@ public class WindowRankOperatorTest {
SlicingWindowOperator<RowData, ?> operator =
WindowRankOperatorBuilder.builder()
.inputSerializer(INPUT_ROW_SER)
- .shiftTimeZone(UTC_ZONE_ID)
+ .shiftTimeZone(shiftTimeZone)
.keySerializer(KEY_SER)
.sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR)
.sortKeySelector(SORT_KEY_SELECTOR)
@@ -231,32 +270,47 @@ public class WindowRankOperatorTest {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
// add elements out-of-order
- testHarness.processElement(insertRecord("key2", 1, 999L));
- testHarness.processElement(insertRecord("key2", 4, 999L));
- testHarness.processElement(insertRecord("key2", 5, 999L));
- testHarness.processElement(insertRecord("key2", 3, 999L));
- testHarness.processElement(insertRecord("key2", 2, 1999L));
- testHarness.processElement(insertRecord("key2", 7, 3999L));
- testHarness.processElement(insertRecord("key2", 8, 3999L));
- testHarness.processElement(insertRecord("key2", 1, 3999L));
-
- testHarness.processElement(insertRecord("key1", 2, 999L));
- testHarness.processElement(insertRecord("key1", 1, 999L));
- testHarness.processElement(insertRecord("key1", 3, 999L));
- testHarness.processElement(insertRecord("key1", 3, 999L));
- testHarness.processElement(insertRecord("key1", 4, 1999L));
- testHarness.processElement(insertRecord("key1", 6, 1999L));
- testHarness.processElement(insertRecord("key1", 7, 1999L));
+ testHarness.processElement(
+ insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone)));
+
+ testHarness.processElement(
+ insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone)));
testHarness.processWatermark(new Watermark(999));
- expectedOutput.add(insertRecord("key1", 2, 999L, 2L));
- expectedOutput.add(insertRecord("key2", 3, 999L, 2L));
+ expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone), 2L));
+ expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone), 2L));
expectedOutput.add(new Watermark(999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(1999));
- expectedOutput.add(insertRecord("key1", 6, 1999L, 2L));
+ expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone), 2L));
expectedOutput.add(new Watermark(1999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -273,7 +327,7 @@ public class WindowRankOperatorTest {
testHarness.open();
testHarness.processWatermark(new Watermark(3999));
- expectedOutput.add(insertRecord("key2", 7, 3999L, 2L));
+ expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone), 2L));
expectedOutput.add(new Watermark(3999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -285,7 +339,7 @@ public class WindowRankOperatorTest {
SlicingWindowOperator<RowData, ?> operator =
WindowRankOperatorBuilder.builder()
.inputSerializer(INPUT_ROW_SER)
- .shiftTimeZone(UTC_ZONE_ID)
+ .shiftTimeZone(shiftTimeZone)
.keySerializer(KEY_SER)
.sortKeyComparator(GENERATED_SORT_KEY_COMPARATOR)
.sortKeySelector(SORT_KEY_SELECTOR)
@@ -305,36 +359,51 @@ public class WindowRankOperatorTest {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
// add elements out-of-order
- testHarness.processElement(insertRecord("key2", 1, 999L));
- testHarness.processElement(insertRecord("key2", 4, 999L));
- testHarness.processElement(insertRecord("key2", 5, 999L));
- testHarness.processElement(insertRecord("key2", 3, 999L));
- testHarness.processElement(insertRecord("key2", 2, 1999L));
- testHarness.processElement(insertRecord("key2", 7, 3999L));
- testHarness.processElement(insertRecord("key2", 8, 3999L));
- testHarness.processElement(insertRecord("key2", 1, 3999L));
-
- testHarness.processElement(insertRecord("key1", 2, 999L));
- testHarness.processElement(insertRecord("key1", 1, 999L));
- testHarness.processElement(insertRecord("key1", 3, 999L));
- testHarness.processElement(insertRecord("key1", 3, 999L));
- testHarness.processElement(insertRecord("key1", 4, 1999L));
- testHarness.processElement(insertRecord("key1", 6, 1999L));
- testHarness.processElement(insertRecord("key1", 7, 1999L));
+ testHarness.processElement(
+ insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 4, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 5, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 8, toUtcTimestampMills(3999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone)));
+
+ testHarness.processElement(
+ insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 3, toUtcTimestampMills(999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone)));
+ testHarness.processElement(
+ insertRecord("key1", 7, toUtcTimestampMills(1999L, shiftTimeZone)));
testHarness.processWatermark(new Watermark(999));
- expectedOutput.add(insertRecord("key1", 1, 999L));
- expectedOutput.add(insertRecord("key1", 2, 999L));
- expectedOutput.add(insertRecord("key2", 1, 999L));
- expectedOutput.add(insertRecord("key2", 3, 999L));
+ expectedOutput.add(insertRecord("key1", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+ expectedOutput.add(insertRecord("key1", 2, toUtcTimestampMills(999L, shiftTimeZone)));
+ expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(999L, shiftTimeZone)));
+ expectedOutput.add(insertRecord("key2", 3, toUtcTimestampMills(999L, shiftTimeZone)));
expectedOutput.add(new Watermark(999));
ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(1999));
- expectedOutput.add(insertRecord("key1", 4, 1999L));
- expectedOutput.add(insertRecord("key1", 6, 1999L));
- expectedOutput.add(insertRecord("key2", 2, 1999L));
+ expectedOutput.add(insertRecord("key1", 4, toUtcTimestampMills(1999L, shiftTimeZone)));
+ expectedOutput.add(insertRecord("key1", 6, toUtcTimestampMills(1999L, shiftTimeZone)));
+ expectedOutput.add(insertRecord("key2", 2, toUtcTimestampMills(1999L, shiftTimeZone)));
expectedOutput.add(new Watermark(1999));
ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -351,18 +420,12 @@ public class WindowRankOperatorTest {
testHarness.open();
testHarness.processWatermark(new Watermark(3999));
- expectedOutput.add(insertRecord("key2", 1, 3999L));
- expectedOutput.add(insertRecord("key2", 7, 3999L));
+ expectedOutput.add(insertRecord("key2", 1, toUtcTimestampMills(3999L, shiftTimeZone)));
+ expectedOutput.add(insertRecord("key2", 7, toUtcTimestampMills(3999L, shiftTimeZone)));
expectedOutput.add(new Watermark(3999));
ASSERTER_WITHOUT_RANK_NUMBER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.close();
}
-
- private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
- SlicingWindowOperator<RowData, ?> operator) throws Exception {
- return new KeyedOneInputStreamOperatorTestHarness<>(
- operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
- }
}