You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/12 09:09:11 UTC

[GitHub] [flink] dianfu commented on a change in pull request #13504: [FLINK-19404][python] Support Pandas Stream Over Window Aggregation

dianfu commented on a change in pull request #13504:
URL: https://github.com/apache/flink/pull/13504#discussion_r503085577



##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Stream Arrow Python {@link AggregateFunction} Operator for RANGE clause event-time bounded
+ * OVER window.
+ */
+@Internal
+public class StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator<K>
+	extends AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator<K> {
+
+	private static final long serialVersionUID = 1L;
+
+	public StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator(
+		Configuration config,
+		long minRetentionTime,
+		long maxRetentionTime,
+		PythonFunctionInfo[] pandasAggFunctions,
+		RowType inputType,
+		RowType outputType,
+		int inputTimeFieldIndex,
+		long lowerBoundary,
+		int[] groupingSet,
+		int[] udafInputOffsets) {
+		super(config, minRetentionTime, maxRetentionTime, pandasAggFunctions,
+			inputType, outputType, inputTimeFieldIndex, lowerBoundary, groupingSet, udafInputOffsets);
+	}
+
+	@Override
+	public void bufferInput(RowData input) throws Exception {
+		// register state-cleanup timer
+		registerProcessingCleanupTimer(timerService.currentProcessingTime());
+
+		// triggering timestamp for trigger calculation
+		long triggeringTs = input.getLong(inputTimeFieldIndex);
+
+		Long lastTriggeringTs = lastTriggeringTsState.value();
+		if (lastTriggeringTs == null) {
+			lastTriggeringTs = 0L;
+		}
+
+		// check if the data is expired, if not, save the data and register event time timer
+		if (triggeringTs > lastTriggeringTs) {
+			List<RowData> data = inputState.get(triggeringTs);
+			if (null != data) {
+				data.add(input);
+				inputState.put(triggeringTs, data);

Review comment:
       ```suggestion
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeRangeBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Stream Arrow Python {@link AggregateFunction} Operator for RANGE clause event-time bounded
+ * OVER window.
+ */
+@Internal
+public class StreamArrowPythonRowTimeRangeBoundedOverWindowAggregateFunctionOperator<K>

Review comment:
       ```suggestion
   public class StreamArrowPythonRowTimeBoundedRangeOperator<K>
   ```
   What about simplify the class name a bit? Same for the other classes.

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} Operator for ROWS clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator<K>
+	extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> implements CleanupState {
+
+	private static final long serialVersionUID = 1L;
+
+	private final long minRetentionTime;
+
+	private final long maxRetentionTime;
+
+	private final boolean stateCleaningEnabled;
+
+	/**
+	 * list to sort timestamps to access rows in timestamp order.
+	 */
+	transient LinkedList<Long> sortedTimestamps;
+
+	public AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator(
+		Configuration config,
+		long minRetentionTime,
+		long maxRetentionTime,
+		PythonFunctionInfo[] pandasAggFunctions,
+		RowType inputType,
+		RowType outputType,
+		int inputTimeFieldIndex,
+		long lowerBoundary,
+		int[] groupingSet,
+		int[] udafInputOffsets) {
+		super(config, pandasAggFunctions, inputType, outputType, inputTimeFieldIndex, lowerBoundary,
+			groupingSet, udafInputOffsets);
+		this.minRetentionTime = minRetentionTime;
+		this.maxRetentionTime = maxRetentionTime;
+		this.stateCleaningEnabled = minRetentionTime > 1;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		sortedTimestamps = new LinkedList<>();
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		if (stateCleaningEnabled) {
+
+			Iterator<Long> keysIt = inputState.keys().iterator();
+			Long lastProcessedTime = lastTriggeringTsState.value();
+			if (lastProcessedTime == null) {
+				lastProcessedTime = 0L;
+			}
+
+			// is data left which has not been processed yet?
+			boolean noRecordsToProcess = true;
+			while (keysIt.hasNext() && noRecordsToProcess) {
+				if (keysIt.next() > lastProcessedTime) {
+					noRecordsToProcess = false;
+				}
+			}
+
+			if (noRecordsToProcess) {
+				inputState.clear();
+				cleanupTsState.clear();
+			} else {
+				// There are records left to process because a watermark has not been received yet.
+				// This would only happen if the input stream has stopped. So we don't need to clean up.
+				// We leave the state as it is and schedule a new cleanup timer
+				registerProcessingCleanupTimer(timerService.currentProcessingTime());
+			}
+		}
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		long timestamp = timer.getTimestamp();
+		// gets all window data from state for the calculation
+		List<RowData> inputs = inputState.get(timestamp);
+
+		Iterable<Long> keyIter = inputState.keys();
+		for (Long dataTs : keyIter) {
+			insertToSortedList(dataTs);
+		}
+		int index = sortedTimestamps.indexOf(timestamp);
+		for (int i = 0; i < inputs.size(); i++) {
+			forwardedInputQueue.add(inputs.get(i));
+			triggerWindowProcess(inputs, i, index);
+		}
+		sortedTimestamps.clear();
+	}
+
+	@Override
+	@SuppressWarnings("ConstantConditions")
+	public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
+		byte[] udafResult = resultTuple.f0;
+		int length = resultTuple.f1;
+		bais.setBuffer(udafResult, 0, length);
+		int rowCount = arrowSerializer.load();
+		for (int i = 0; i < rowCount; i++) {
+			RowData data = arrowSerializer.read(i);
+			RowData key = forwardedInputQueue.poll();
+			reuseJoinedRow.setRowKind(key.getRowKind());
+			rowDataWrapper.collect(reuseJoinedRow.replace(key, data));
+		}
+	}
+
+	void registerProcessingCleanupTimer(long currentTime) throws Exception {
+		if (stateCleaningEnabled) {
+			registerProcessingCleanupTimer(
+				cleanupTsState,
+				currentTime,
+				minRetentionTime,
+				maxRetentionTime,
+				timerService);
+		}
+	}
+
+	void triggerWindowProcess(List<RowData> inputs, int i, int index) throws Exception {
+		int startIndex;
+		int startPos = 0;
+		if (i >= lowerBoundary) {
+			for (int j = (int) (i - lowerBoundary); j <= i; j++) {
+				arrowSerializer.write(getFunctionInput(inputs.get(j)));
+			}
+			currentBatchCount += lowerBoundary;
+		} else {
+			Long previousTimestamp;
+			List<RowData> previousData = null;
+			int length = 0;
+			startIndex = index - 1;
+			long remainingDataCount = lowerBoundary - i;
+			ListIterator<Long> iter = sortedTimestamps.listIterator(index);
+			while (remainingDataCount > 0 && iter.hasPrevious()) {
+				previousTimestamp = iter.previous();
+				previousData = inputState.get(previousTimestamp);
+				length = previousData.size();
+				if (remainingDataCount <= length) {
+					startPos = (int) (length - remainingDataCount);
+					remainingDataCount = 0;
+				} else {
+					remainingDataCount -= length;
+					startIndex--;
+				}
+			}
+			if (previousData != null) {
+				for (int j = startPos; j < length; j++) {
+					arrowSerializer.write(getFunctionInput(previousData.get(j)));
+					currentBatchCount++;
+					startIndex++;

Review comment:
       I guess ``startIndex+`` should be moved out of the for loop

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeRowsBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Stream Arrow Python {@link AggregateFunction} Operator for ROWS clause proc-time bounded
+ * OVER window.
+ */
+@Internal
+public class StreamArrowPythonProcTimeRowsBoundedOverWindowAggregateFunctionOperator<K>
+	extends AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator<K> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient long currentTime;
+
+	public StreamArrowPythonProcTimeRowsBoundedOverWindowAggregateFunctionOperator(
+		Configuration config,
+		long minRetentionTime,
+		long maxRetentionTime,
+		PythonFunctionInfo[] pandasAggFunctions,
+		RowType inputType,
+		RowType outputType,
+		int inputTimeFieldIndex,
+		long lowerBoundary,
+		int[] groupingSet,
+		int[] udafInputOffsets) {
+		super(config, minRetentionTime, maxRetentionTime, pandasAggFunctions,
+			inputType, outputType, inputTimeFieldIndex, lowerBoundary, groupingSet, udafInputOffsets);
+	}
+
+	@Override
+	public void bufferInput(RowData input) throws Exception {
+		currentTime = timerService.currentProcessingTime();
+		// register state-cleanup timer
+		registerProcessingCleanupTimer(currentTime);
+
+		// buffer the event incoming event
+
+		// add current element to the window list of elements with corresponding timestamp
+		List<RowData> rowList = inputState.get(currentTime);
+		// null value means that this is the first event received for this timestamp
+		if (rowList == null) {
+			rowList = new ArrayList<>();
+		}
+		rowList.add(input);

Review comment:
       Make rowList as an instance variable, then there is no need to read inputState again in *processElementInternal*

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Stream Arrow Python {@link AggregateFunction} Operator for RANGE clause event-time bounded
+ * OVER window.
+ */
+@Internal
+public class StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator<K>
+	extends AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator<K> {
+
+	private static final long serialVersionUID = 1L;
+
+	public StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator(
+		Configuration config,
+		long minRetentionTime,
+		long maxRetentionTime,
+		PythonFunctionInfo[] pandasAggFunctions,
+		RowType inputType,
+		RowType outputType,
+		int inputTimeFieldIndex,
+		long lowerBoundary,
+		int[] groupingSet,
+		int[] udafInputOffsets) {
+		super(config, minRetentionTime, maxRetentionTime, pandasAggFunctions,
+			inputType, outputType, inputTimeFieldIndex, lowerBoundary, groupingSet, udafInputOffsets);
+	}
+
+	@Override
+	public void bufferInput(RowData input) throws Exception {
+		// register state-cleanup timer
+		registerProcessingCleanupTimer(timerService.currentProcessingTime());
+
+		// triggering timestamp for trigger calculation
+		long triggeringTs = input.getLong(inputTimeFieldIndex);
+
+		Long lastTriggeringTs = lastTriggeringTsState.value();
+		if (lastTriggeringTs == null) {
+			lastTriggeringTs = 0L;
+		}
+
+		// check if the data is expired, if not, save the data and register event time timer
+		if (triggeringTs > lastTriggeringTs) {
+			List<RowData> data = inputState.get(triggeringTs);
+			if (null != data) {
+				data.add(input);
+				inputState.put(triggeringTs, data);
+			} else {
+				data = new ArrayList<>();
+				data.add(input);
+				inputState.put(triggeringTs, data);

Review comment:
       ```suggestion
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} Operator for ROWS clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator<K>
+	extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> implements CleanupState {
+
+	private static final long serialVersionUID = 1L;
+
+	private final long minRetentionTime;
+
+	private final long maxRetentionTime;
+
+	private final boolean stateCleaningEnabled;
+
+	/**
+	 * list to sort timestamps to access rows in timestamp order.
+	 */
+	transient LinkedList<Long> sortedTimestamps;
+
+	public AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator(
+		Configuration config,
+		long minRetentionTime,
+		long maxRetentionTime,
+		PythonFunctionInfo[] pandasAggFunctions,
+		RowType inputType,
+		RowType outputType,
+		int inputTimeFieldIndex,
+		long lowerBoundary,
+		int[] groupingSet,
+		int[] udafInputOffsets) {
+		super(config, pandasAggFunctions, inputType, outputType, inputTimeFieldIndex, lowerBoundary,
+			groupingSet, udafInputOffsets);
+		this.minRetentionTime = minRetentionTime;
+		this.maxRetentionTime = maxRetentionTime;
+		this.stateCleaningEnabled = minRetentionTime > 1;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		sortedTimestamps = new LinkedList<>();
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		if (stateCleaningEnabled) {
+
+			Iterator<Long> keysIt = inputState.keys().iterator();
+			Long lastProcessedTime = lastTriggeringTsState.value();
+			if (lastProcessedTime == null) {
+				lastProcessedTime = 0L;
+			}
+
+			// is data left which has not been processed yet?
+			boolean noRecordsToProcess = true;
+			while (keysIt.hasNext() && noRecordsToProcess) {
+				if (keysIt.next() > lastProcessedTime) {
+					noRecordsToProcess = false;
+				}
+			}
+
+			if (noRecordsToProcess) {
+				inputState.clear();
+				cleanupTsState.clear();
+			} else {
+				// There are records left to process because a watermark has not been received yet.
+				// This would only happen if the input stream has stopped. So we don't need to clean up.
+				// We leave the state as it is and schedule a new cleanup timer
+				registerProcessingCleanupTimer(timerService.currentProcessingTime());
+			}
+		}
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		long timestamp = timer.getTimestamp();
+		// gets all window data from state for the calculation
+		List<RowData> inputs = inputState.get(timestamp);
+
+		Iterable<Long> keyIter = inputState.keys();
+		for (Long dataTs : keyIter) {
+			insertToSortedList(dataTs);
+		}
+		int index = sortedTimestamps.indexOf(timestamp);
+		for (int i = 0; i < inputs.size(); i++) {
+			forwardedInputQueue.add(inputs.get(i));
+			triggerWindowProcess(inputs, i, index);
+		}
+		sortedTimestamps.clear();
+	}
+
+	@Override
+	@SuppressWarnings("ConstantConditions")
+	public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
+		byte[] udafResult = resultTuple.f0;
+		int length = resultTuple.f1;
+		bais.setBuffer(udafResult, 0, length);
+		int rowCount = arrowSerializer.load();
+		for (int i = 0; i < rowCount; i++) {
+			RowData data = arrowSerializer.read(i);
+			RowData key = forwardedInputQueue.poll();
+			reuseJoinedRow.setRowKind(key.getRowKind());
+			rowDataWrapper.collect(reuseJoinedRow.replace(key, data));
+		}
+	}
+
+	void registerProcessingCleanupTimer(long currentTime) throws Exception {
+		if (stateCleaningEnabled) {
+			registerProcessingCleanupTimer(
+				cleanupTsState,
+				currentTime,
+				minRetentionTime,
+				maxRetentionTime,
+				timerService);
+		}
+	}
+
+	void triggerWindowProcess(List<RowData> inputs, int i, int index) throws Exception {
+		int startIndex;
+		int startPos = 0;
+		if (i >= lowerBoundary) {
+			for (int j = (int) (i - lowerBoundary); j <= i; j++) {
+				arrowSerializer.write(getFunctionInput(inputs.get(j)));
+			}
+			currentBatchCount += lowerBoundary;
+		} else {
+			Long previousTimestamp;
+			List<RowData> previousData = null;
+			int length = 0;
+			startIndex = index - 1;
+			long remainingDataCount = lowerBoundary - i;
+			ListIterator<Long> iter = sortedTimestamps.listIterator(index);
+			while (remainingDataCount > 0 && iter.hasPrevious()) {
+				previousTimestamp = iter.previous();
+				previousData = inputState.get(previousTimestamp);
+				length = previousData.size();
+				if (remainingDataCount <= length) {
+					startPos = (int) (length - remainingDataCount);
+					remainingDataCount = 0;
+				} else {
+					remainingDataCount -= length;
+					startIndex--;
+				}
+			}
+			if (previousData != null) {
+				for (int j = startPos; j < length; j++) {
+					arrowSerializer.write(getFunctionInput(previousData.get(j)));
+					currentBatchCount++;
+					startIndex++;
+				}
+				while (startIndex < index) {
+					previousTimestamp = iter.next();
+					previousData = inputState.get(previousTimestamp);

Review comment:
       The data stored in the *inputState* will be read multiple times: 2 * inputs.size().
   
   What about buffering them in memory for performance optimization?

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} Operator for RANGE clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator<K>
+	extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient LinkedList<List<RowData>> inputData;
+
+	public AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo[] pandasAggFunctions,
+		RowType inputType,
+		RowType outputType,
+		int inputTimeFieldIndex,
+		long lowerBoundary,
+		int[] groupingSet,
+		int[] udafInputOffsets) {
+		super(config, pandasAggFunctions, inputType, outputType, inputTimeFieldIndex, lowerBoundary,
+			groupingSet, udafInputOffsets);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		inputData = new LinkedList<>();
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		long timestamp = timer.getTimestamp();
+		Long cleanupTimestamp = cleanupTsState.value();
+		// if cleanupTsState has not been updated then it is safe to cleanup states
+		if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+			inputState.clear();
+			lastTriggeringTsState.clear();
+			cleanupTsState.clear();
+			return;
+		}
+		// gets all window data from state for the calculation
+		List<RowData> inputs = inputState.get(timestamp);
+		triggerWindowProcess(timestamp, inputs);
+		lastTriggeringTsState.update(timestamp);
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		long timestamp = timer.getTimestamp();
+		Long cleanupTimestamp = cleanupTsState.value();
+		// if cleanupTsState has not been updated then it is safe to cleanup states
+		if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+			inputState.clear();
+			cleanupTsState.clear();
+			return;
+		}
+
+		// we consider the original timestamp of events
+		// that have registered this time trigger 1 ms ago
+
+		long currentTime = timestamp - 1;
+
+		// get the list of elements of current proctime
+		List<RowData> currentElements = inputState.get(currentTime);
+		triggerWindowProcess(timestamp, currentElements);
+	}
+
+	@Override
+	@SuppressWarnings("ConstantConditions")
+	public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
+		byte[] udafResult = resultTuple.f0;
+		int length = resultTuple.f1;
+		bais.setBuffer(udafResult, 0, length);
+		int rowCount = arrowSerializer.load();
+		for (int i = 0; i < rowCount; i++) {
+			RowData data = arrowSerializer.read(i);
+			List<RowData> input = inputData.poll();
+			for (RowData ele : input) {
+				reuseJoinedRow.setRowKind(ele.getRowKind());
+				rowDataWrapper.collect(reuseJoinedRow.replace(ele, data));
+			}
+		}
+	}
+
+	void registerCleanupTimer(long timestamp, TimeDomain domain) throws Exception {
+		long minCleanupTimestamp = timestamp + lowerBoundary + 1;
+		long maxCleanupTimestamp = timestamp + (long) (lowerBoundary * 1.5) + 1;
+		// update timestamp and register timer if needed
+		Long curCleanupTimestamp = cleanupTsState.value();
+		if (curCleanupTimestamp == null || curCleanupTimestamp < minCleanupTimestamp) {
+			// we don't delete existing timer since it may delete timer for data processing
+			if (domain == TimeDomain.EVENT_TIME) {
+				timerService.registerEventTimeTimer(maxCleanupTimestamp);
+			} else {
+				timerService.registerProcessingTimeTimer(maxCleanupTimestamp);
+			}
+			cleanupTsState.update(maxCleanupTimestamp);
+		}
+	}
+
+	private void triggerWindowProcess(long timestamp, List<RowData> inputs) throws Exception {
+		long limit = timestamp - lowerBoundary;

Review comment:
       ```suggestion
   		long lowerLimit = timestamp - lowerBoundary;
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} Operator for RANGE clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator<K>
+	extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient LinkedList<List<RowData>> inputData;
+
+	public AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo[] pandasAggFunctions,
+		RowType inputType,
+		RowType outputType,
+		int inputTimeFieldIndex,
+		long lowerBoundary,
+		int[] groupingSet,
+		int[] udafInputOffsets) {
+		super(config, pandasAggFunctions, inputType, outputType, inputTimeFieldIndex, lowerBoundary,
+			groupingSet, udafInputOffsets);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		inputData = new LinkedList<>();
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		long timestamp = timer.getTimestamp();
+		Long cleanupTimestamp = cleanupTsState.value();
+		// if cleanupTsState has not been updated then it is safe to cleanup states
+		if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+			inputState.clear();
+			lastTriggeringTsState.clear();
+			cleanupTsState.clear();
+			return;
+		}
+		// gets all window data from state for the calculation
+		List<RowData> inputs = inputState.get(timestamp);
+		triggerWindowProcess(timestamp, inputs);
+		lastTriggeringTsState.update(timestamp);
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		long timestamp = timer.getTimestamp();
+		Long cleanupTimestamp = cleanupTsState.value();
+		// if cleanupTsState has not been updated then it is safe to cleanup states
+		if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+			inputState.clear();
+			cleanupTsState.clear();
+			return;
+		}
+
+		// we consider the original timestamp of events
+		// that have registered this time trigger 1 ms ago
+
+		long currentTime = timestamp - 1;
+
+		// get the list of elements of current proctime
+		List<RowData> currentElements = inputState.get(currentTime);
+		triggerWindowProcess(timestamp, currentElements);
+	}
+
+	@Override
+	@SuppressWarnings("ConstantConditions")
+	public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
+		byte[] udafResult = resultTuple.f0;
+		int length = resultTuple.f1;
+		bais.setBuffer(udafResult, 0, length);
+		int rowCount = arrowSerializer.load();
+		for (int i = 0; i < rowCount; i++) {
+			RowData data = arrowSerializer.read(i);
+			List<RowData> input = inputData.poll();
+			for (RowData ele : input) {
+				reuseJoinedRow.setRowKind(ele.getRowKind());
+				rowDataWrapper.collect(reuseJoinedRow.replace(ele, data));
+			}
+		}
+	}
+
+	void registerCleanupTimer(long timestamp, TimeDomain domain) throws Exception {
+		long minCleanupTimestamp = timestamp + lowerBoundary + 1;
+		long maxCleanupTimestamp = timestamp + (long) (lowerBoundary * 1.5) + 1;
+		// update timestamp and register timer if needed
+		Long curCleanupTimestamp = cleanupTsState.value();
+		if (curCleanupTimestamp == null || curCleanupTimestamp < minCleanupTimestamp) {
+			// we don't delete existing timer since it may delete timer for data processing
+			if (domain == TimeDomain.EVENT_TIME) {
+				timerService.registerEventTimeTimer(maxCleanupTimestamp);
+			} else {
+				timerService.registerProcessingTimeTimer(maxCleanupTimestamp);
+			}
+			cleanupTsState.update(maxCleanupTimestamp);
+		}
+	}
+
+	private void triggerWindowProcess(long timestamp, List<RowData> inputs) throws Exception {
+		long limit = timestamp - lowerBoundary;
+		if (inputs != null) {
+			for (long dataTs : inputState.keys()) {

Review comment:
       What about using inputState.entries to avoid reading the same state twice.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonOverAggregateRule.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.functions.python.PythonFunctionKind;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonOverAggregate;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.List;
+
+/**
+ * The physical rule is responsible for converting {@link FlinkLogicalOverAggregate} to
+ * {@link StreamExecPythonOverAggregate}.
+ */
+public class StreamExecPythonOverAggregateRule extends ConverterRule {
+	public static final StreamExecPythonOverAggregateRule INSTANCE = new StreamExecPythonOverAggregateRule();
+
+	private StreamExecPythonOverAggregateRule() {
+		super(FlinkLogicalOverAggregate.class,
+			FlinkConventions.LOGICAL(),
+			FlinkConventions.STREAM_PHYSICAL(),
+			"StreamExecPythonOverAggregateRule");
+	}
+
+	@Override
+	public boolean matches(RelOptRuleCall call) {
+		FlinkLogicalOverAggregate logicWindow = call.rel(0);
+		List<AggregateCall> aggCalls = logicWindow.groups.get(0).getAggregateCalls(logicWindow);
+
+		boolean existGeneralPythonFunction =
+			aggCalls.stream().anyMatch(x -> PythonUtil.isPythonAggregate(x, PythonFunctionKind.GENERAL));
+		boolean existPandasFunction =
+			aggCalls.stream().anyMatch(x -> PythonUtil.isPythonAggregate(x, PythonFunctionKind.PANDAS));
+		boolean existJavaFunction =
+			aggCalls.stream().anyMatch(x -> !PythonUtil.isPythonAggregate(x, null));
+		if (existPandasFunction || existGeneralPythonFunction) {
+			if (existGeneralPythonFunction) {
+				throw new TableException("non-Pandas UDAFs are not supported in stream mode currently.");

Review comment:
       ```suggestion
   				throw new TableException("Non-Pandas Python UDAFs are not supported in stream mode currently.");
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} Operator for RANGE clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator<K>
+	extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient LinkedList<List<RowData>> inputData;
+
+	public AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo[] pandasAggFunctions,
+		RowType inputType,
+		RowType outputType,
+		int inputTimeFieldIndex,
+		long lowerBoundary,
+		int[] groupingSet,
+		int[] udafInputOffsets) {
+		super(config, pandasAggFunctions, inputType, outputType, inputTimeFieldIndex, lowerBoundary,
+			groupingSet, udafInputOffsets);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		inputData = new LinkedList<>();
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		long timestamp = timer.getTimestamp();
+		Long cleanupTimestamp = cleanupTsState.value();
+		// if cleanupTsState has not been updated then it is safe to cleanup states
+		if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+			inputState.clear();
+			lastTriggeringTsState.clear();
+			cleanupTsState.clear();
+			return;
+		}
+		// gets all window data from state for the calculation
+		List<RowData> inputs = inputState.get(timestamp);
+		triggerWindowProcess(timestamp, inputs);
+		lastTriggeringTsState.update(timestamp);
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		long timestamp = timer.getTimestamp();
+		Long cleanupTimestamp = cleanupTsState.value();
+		// if cleanupTsState has not been updated then it is safe to cleanup states
+		if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+			inputState.clear();
+			cleanupTsState.clear();
+			return;
+		}
+
+		// we consider the original timestamp of events
+		// that have registered this time trigger 1 ms ago
+
+		long currentTime = timestamp - 1;
+
+		// get the list of elements of current proctime
+		List<RowData> currentElements = inputState.get(currentTime);
+		triggerWindowProcess(timestamp, currentElements);
+	}
+
+	@Override
+	@SuppressWarnings("ConstantConditions")
+	public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
+		byte[] udafResult = resultTuple.f0;
+		int length = resultTuple.f1;
+		bais.setBuffer(udafResult, 0, length);
+		int rowCount = arrowSerializer.load();
+		for (int i = 0; i < rowCount; i++) {
+			RowData data = arrowSerializer.read(i);
+			List<RowData> input = inputData.poll();
+			for (RowData ele : input) {
+				reuseJoinedRow.setRowKind(ele.getRowKind());
+				rowDataWrapper.collect(reuseJoinedRow.replace(ele, data));
+			}
+		}
+	}
+
+	void registerCleanupTimer(long timestamp, TimeDomain domain) throws Exception {
+		long minCleanupTimestamp = timestamp + lowerBoundary + 1;
+		long maxCleanupTimestamp = timestamp + (long) (lowerBoundary * 1.5) + 1;
+		// update timestamp and register timer if needed
+		Long curCleanupTimestamp = cleanupTsState.value();
+		if (curCleanupTimestamp == null || curCleanupTimestamp < minCleanupTimestamp) {
+			// we don't delete existing timer since it may delete timer for data processing
+			if (domain == TimeDomain.EVENT_TIME) {
+				timerService.registerEventTimeTimer(maxCleanupTimestamp);
+			} else {
+				timerService.registerProcessingTimeTimer(maxCleanupTimestamp);
+			}
+			cleanupTsState.update(maxCleanupTimestamp);
+		}
+	}
+
+	private void triggerWindowProcess(long timestamp, List<RowData> inputs) throws Exception {
+		long limit = timestamp - lowerBoundary;
+		if (inputs != null) {
+			for (long dataTs : inputState.keys()) {
+				if (dataTs >= limit && dataTs <= timestamp) {
+					List<RowData> dataList = inputState.get(dataTs);

Review comment:
       The state in *inputState* are never cleared. I think we should also remove outdated state in *triggerWindowProcess*.

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonOverAggregate.scala
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.memory.ManagedMemoryUseCase
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonAggregate
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonOverAggregate.{
+  ARROW_PYTHON_OVER_WINDOW_RANGE_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_RANGE_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME
+}
+import org.apache.flink.table.planner.plan.utils.{KeySelectorUtil, OverAggregateUtil}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.{AggregateCall, Window}
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+/**
+  * Stream physical RelNode for python time-based over [[Window]].
+  */
+class StreamExecPythonOverAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    outputRowType: RelDataType,
+    inputRowType: RelDataType,
+    logicWindow: Window)
+  extends StreamExecOverAggregateBase(
+    cluster,
+    traitSet,
+    inputRel,
+    outputRowType,
+    inputRowType,
+    logicWindow)
+  with CommonPythonAggregate {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
+    new StreamExecPythonOverAggregate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      outputRowType,
+      inputRowType,
+      logicWindow
+      )
+  }
+
+  override protected def translateToPlanInternal(
+      planner: StreamPlanner): Transformation[RowData] = {
+    val tableConfig = planner.getTableConfig
+
+    val overWindow: Group = logicWindow.groups.get(0)
+
+    val orderKeys = overWindow.orderKeys.getFieldCollations
+
+    if (orderKeys.size() != 1) {
+      throw new TableException(
+        "The window can only be ordered by a single time column.")
+    }
+    val orderKey = orderKeys.get(0)
+
+    if (!orderKey.direction.equals(ASCENDING)) {
+      throw new TableException(
+        "The window can only be ordered in ASCENDING mode.")
+    }
+
+    val inputDS = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    if (!logicWindow.groups.get(0).keys.isEmpty && tableConfig.getMinIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates state. " +
+          "Please provide a query configuration with valid retention interval to prevent " +
+          "excessive state size. You may specify a retention time of 0 to not clean up the state.")
+    }
+
+    val timeType = outputRowType.getFieldList.get(orderKey.getFieldIndex).getType
+
+    // check time field
+    if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType)
+      && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      throw new TableException(
+        "OVER windows' ordering in stream mode must be defined on a time attribute.")
+    }
+
+    // identify window rowtime attribute
+    val rowTimeIdx: Option[Int] = if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
+      Some(orderKey.getFieldIndex)
+    } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      None
+    } else {
+      throw new TableException(
+        "OVER windows can only be applied on time attributes.")
+    }
+
+    if (overWindow.lowerBound.isPreceding
+      && overWindow.lowerBound.isUnbounded) {
+      throw new TableException(
+        "OVER PRECEDING windows are not supported yet."
+      )
+    } else if (!overWindow.upperBound.isCurrentRow) {
+      throw new TableException(
+        "OVER FOLLOWING windows are not supported yet."

Review comment:
       ```suggestion
           "UNBOUNDED FOLLOWING OVER windows are not supported yet for Python UDAF."
   ```

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonOverAggregate.scala
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.memory.ManagedMemoryUseCase
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonAggregate
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonOverAggregate.{
+  ARROW_PYTHON_OVER_WINDOW_RANGE_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_RANGE_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME
+}
+import org.apache.flink.table.planner.plan.utils.{KeySelectorUtil, OverAggregateUtil}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.{AggregateCall, Window}
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+/**
+  * Stream physical RelNode for python time-based over [[Window]].
+  */
+class StreamExecPythonOverAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    outputRowType: RelDataType,
+    inputRowType: RelDataType,
+    logicWindow: Window)
+  extends StreamExecOverAggregateBase(
+    cluster,
+    traitSet,
+    inputRel,
+    outputRowType,
+    inputRowType,
+    logicWindow)
+  with CommonPythonAggregate {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
+    new StreamExecPythonOverAggregate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      outputRowType,
+      inputRowType,
+      logicWindow
+      )
+  }
+
+  override protected def translateToPlanInternal(
+      planner: StreamPlanner): Transformation[RowData] = {
+    val tableConfig = planner.getTableConfig
+
+    val overWindow: Group = logicWindow.groups.get(0)
+
+    val orderKeys = overWindow.orderKeys.getFieldCollations
+
+    if (orderKeys.size() != 1) {
+      throw new TableException(
+        "The window can only be ordered by a single time column.")
+    }
+    val orderKey = orderKeys.get(0)
+
+    if (!orderKey.direction.equals(ASCENDING)) {
+      throw new TableException(
+        "The window can only be ordered in ASCENDING mode.")
+    }
+
+    val inputDS = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    if (!logicWindow.groups.get(0).keys.isEmpty && tableConfig.getMinIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates state. " +
+          "Please provide a query configuration with valid retention interval to prevent " +
+          "excessive state size. You may specify a retention time of 0 to not clean up the state.")
+    }
+
+    val timeType = outputRowType.getFieldList.get(orderKey.getFieldIndex).getType
+
+    // check time field
+    if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType)
+      && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      throw new TableException(
+        "OVER windows' ordering in stream mode must be defined on a time attribute.")
+    }
+
+    // identify window rowtime attribute
+    val rowTimeIdx: Option[Int] = if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
+      Some(orderKey.getFieldIndex)
+    } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      None
+    } else {
+      throw new TableException(
+        "OVER windows can only be applied on time attributes.")
+    }
+
+    if (overWindow.lowerBound.isPreceding
+      && overWindow.lowerBound.isUnbounded) {
+      throw new TableException(
+        "OVER PRECEDING windows are not supported yet."

Review comment:
       ```suggestion
           "UNBOUNDED PRECEDING OVER windows are not supported yet for Python UDAF."
   ```

##########
File path: flink-python/pyflink/table/tests/test_pandas_udaf.py
##########
@@ -504,6 +504,129 @@ def test_tumbling_group_window_over_count(self):
         self.assert_equals(actual, ["1,2.5", "1,6.0", "2,2.0", "3,2.5"])
         os.remove(source_path)
 
+    def test_over_range_window_aggregate_function(self):
+        # create source file path
+        import tempfile
+        import os
+        tmp_dir = tempfile.gettempdir()
+        data = [
+            '1,1,2013-01-01 03:10:00',
+            '3,2,2013-01-01 03:10:00',
+            '2,1,2013-01-01 03:10:00',
+            '1,5,2013-01-01 03:10:00',
+            '1,8,2013-01-01 04:20:00',
+            '2,3,2013-01-01 03:30:00'
+        ]
+        source_path = tmp_dir + '/test_over_range_window_aggregate_function.csv'
+        with open(source_path, 'w') as fd:
+            for ele in data:
+                fd.write(ele + '\n')
+        max_add_min_udaf = udaf(lambda a: a.max() + a.min(),
+                                result_type=DataTypes.SMALLINT(),
+                                func_type='pandas')
+        self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

Review comment:
       Add test cases for processing time?

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} Operator for RANGE clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator<K>
+	extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient LinkedList<List<RowData>> inputData;
+
+	public AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator(
+		Configuration config,
+		PythonFunctionInfo[] pandasAggFunctions,
+		RowType inputType,
+		RowType outputType,
+		int inputTimeFieldIndex,
+		long lowerBoundary,
+		int[] groupingSet,
+		int[] udafInputOffsets) {
+		super(config, pandasAggFunctions, inputType, outputType, inputTimeFieldIndex, lowerBoundary,
+			groupingSet, udafInputOffsets);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		inputData = new LinkedList<>();
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		long timestamp = timer.getTimestamp();
+		Long cleanupTimestamp = cleanupTsState.value();
+		// if cleanupTsState has not been updated then it is safe to cleanup states
+		if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+			inputState.clear();
+			lastTriggeringTsState.clear();
+			cleanupTsState.clear();
+			return;
+		}
+		// gets all window data from state for the calculation
+		List<RowData> inputs = inputState.get(timestamp);
+		triggerWindowProcess(timestamp, inputs);
+		lastTriggeringTsState.update(timestamp);
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		long timestamp = timer.getTimestamp();
+		Long cleanupTimestamp = cleanupTsState.value();
+		// if cleanupTsState has not been updated then it is safe to cleanup states
+		if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+			inputState.clear();
+			cleanupTsState.clear();
+			return;
+		}
+
+		// we consider the original timestamp of events
+		// that have registered this time trigger 1 ms ago
+
+		long currentTime = timestamp - 1;
+
+		// get the list of elements of current proctime
+		List<RowData> currentElements = inputState.get(currentTime);
+		triggerWindowProcess(timestamp, currentElements);
+	}
+
+	@Override
+	@SuppressWarnings("ConstantConditions")
+	public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
+		byte[] udafResult = resultTuple.f0;
+		int length = resultTuple.f1;
+		bais.setBuffer(udafResult, 0, length);
+		int rowCount = arrowSerializer.load();
+		for (int i = 0; i < rowCount; i++) {
+			RowData data = arrowSerializer.read(i);
+			List<RowData> input = inputData.poll();
+			for (RowData ele : input) {
+				reuseJoinedRow.setRowKind(ele.getRowKind());
+				rowDataWrapper.collect(reuseJoinedRow.replace(ele, data));
+			}
+		}
+	}
+
+	void registerCleanupTimer(long timestamp, TimeDomain domain) throws Exception {
+		long minCleanupTimestamp = timestamp + lowerBoundary + 1;
+		long maxCleanupTimestamp = timestamp + (long) (lowerBoundary * 1.5) + 1;
+		// update timestamp and register timer if needed
+		Long curCleanupTimestamp = cleanupTsState.value();
+		if (curCleanupTimestamp == null || curCleanupTimestamp < minCleanupTimestamp) {
+			// we don't delete existing timer since it may delete timer for data processing
+			if (domain == TimeDomain.EVENT_TIME) {
+				timerService.registerEventTimeTimer(maxCleanupTimestamp);
+			} else {
+				timerService.registerProcessingTimeTimer(maxCleanupTimestamp);
+			}
+			cleanupTsState.update(maxCleanupTimestamp);
+		}
+	}
+
+	private void triggerWindowProcess(long timestamp, List<RowData> inputs) throws Exception {

Review comment:
       ```suggestion
   	private void triggerWindowProcess(long upperLimit, List<RowData> inputs) throws Exception {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org