You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/03/07 12:33:59 UTC
[flink] branch release-1.14 updated: [FLINK-26504][python] Fix the incorrect type error in unbounded Python UDAF
This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 6d3aa18 [FLINK-26504][python] Fix the incorrect type error in unbounded Python UDAF
6d3aa18 is described below
commit 6d3aa1839b5e4a9a099d4c59384430965e80bc00
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Mon Mar 7 15:03:50 2022 +0800
[FLINK-26504][python] Fix the incorrect type error in unbounded Python UDAF
This closes #18990.
---
.../pyflink/fn_execution/table/operations.py | 6 +++++-
flink-python/pyflink/table/tests/test_udaf.py | 22 ++++++++++++++++++++++
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git a/flink-python/pyflink/fn_execution/table/operations.py b/flink-python/pyflink/fn_execution/table/operations.py
index ab0f72a..f799d3d 100644
--- a/flink-python/pyflink/fn_execution/table/operations.py
+++ b/flink-python/pyflink/fn_execution/table/operations.py
@@ -352,7 +352,11 @@ class AbstractStreamGroupAggregateOperation(BaseStatefulOperation):
row = input_data[1]
self.group_agg_function.process_element(row)
else:
- self.group_agg_function.on_timer(input_data[3])
+ if has_cython:
+ timer = InternalRow.from_row(input_data[3])
+ else:
+ timer = input_data[3]
+ self.group_agg_function.on_timer(timer)
@abc.abstractmethod
def create_process_function(self, user_defined_aggs, input_extractors, filter_args,
diff --git a/flink-python/pyflink/table/tests/test_udaf.py b/flink-python/pyflink/table/tests/test_udaf.py
index 612f433..d85f4cd 100644
--- a/flink-python/pyflink/table/tests/test_udaf.py
+++ b/flink-python/pyflink/table/tests/test_udaf.py
@@ -505,6 +505,28 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
["hello.hello2", "", "hello,hello2", "hello"]],
columns=['a', 'b', 'c', 'd']))
+ def test_clean_state(self):
+ self.t_env.register_function("my_count", CountAggregateFunction())
+ self.t_env.get_config().get_configuration().set_string("parallelism.default", "1")
+ self.t_env.get_config().get_configuration().set_string(
+ "python.fn-execution.bundle.size", "1")
+ self.t_env.get_config().get_configuration().set_string(
+ "python.state.cache-size", "0")
+ self.t_env.get_config().get_configuration().set_string(
+ "table.exec.state.ttl", "2ms")
+
+ self.t_env.execute_sql("""
+ CREATE TABLE test_source (
+ a BIGINT
+ ) WITH (
+ 'connector' = 'datagen',
+ 'number-of-rows' = '5',
+ 'rows-per-second' = '1'
+ )
+ """)
+ t = self.t_env.from_path('test_source')
+ t.select("my_count(a) as a").to_pandas()
+
def test_tumbling_group_window_over_time(self):
# create source file path
tmp_dir = self.tempdir