You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/03/07 12:33:59 UTC

[flink] branch release-1.14 updated: [FLINK-26504][python] Fix the incorrect type error in unbounded Python UDAF

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 6d3aa18  [FLINK-26504][python] Fix the incorrect type error in unbounded Python UDAF
6d3aa18 is described below

commit 6d3aa1839b5e4a9a099d4c59384430965e80bc00
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Mon Mar 7 15:03:50 2022 +0800

    [FLINK-26504][python] Fix the incorrect type error in unbounded Python UDAF
    
    This closes #18990.
---
 .../pyflink/fn_execution/table/operations.py       |  6 +++++-
 flink-python/pyflink/table/tests/test_udaf.py      | 22 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/flink-python/pyflink/fn_execution/table/operations.py b/flink-python/pyflink/fn_execution/table/operations.py
index ab0f72a..f799d3d 100644
--- a/flink-python/pyflink/fn_execution/table/operations.py
+++ b/flink-python/pyflink/fn_execution/table/operations.py
@@ -352,7 +352,11 @@ class AbstractStreamGroupAggregateOperation(BaseStatefulOperation):
                 row = input_data[1]
             self.group_agg_function.process_element(row)
         else:
-            self.group_agg_function.on_timer(input_data[3])
+            if has_cython:
+                timer = InternalRow.from_row(input_data[3])
+            else:
+                timer = input_data[3]
+            self.group_agg_function.on_timer(timer)
 
     @abc.abstractmethod
     def create_process_function(self, user_defined_aggs, input_extractors, filter_args,
diff --git a/flink-python/pyflink/table/tests/test_udaf.py b/flink-python/pyflink/table/tests/test_udaf.py
index 612f433..d85f4cd 100644
--- a/flink-python/pyflink/table/tests/test_udaf.py
+++ b/flink-python/pyflink/table/tests/test_udaf.py
@@ -505,6 +505,28 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
                                          ["hello.hello2", "", "hello,hello2", "hello"]],
                                         columns=['a', 'b', 'c', 'd']))
 
+    def test_clean_state(self):
+        self.t_env.register_function("my_count", CountAggregateFunction())
+        self.t_env.get_config().get_configuration().set_string("parallelism.default", "1")
+        self.t_env.get_config().get_configuration().set_string(
+            "python.fn-execution.bundle.size", "1")
+        self.t_env.get_config().get_configuration().set_string(
+            "python.state.cache-size", "0")
+        self.t_env.get_config().get_configuration().set_string(
+            "table.exec.state.ttl", "2ms")
+
+        self.t_env.execute_sql("""
+            CREATE TABLE test_source (
+                a BIGINT
+            ) WITH (
+              'connector' = 'datagen',
+              'number-of-rows' = '5',
+              'rows-per-second' = '1'
+            )
+        """)
+        t = self.t_env.from_path('test_source')
+        t.select("my_count(a) as a").to_pandas()
+
     def test_tumbling_group_window_over_time(self):
         # create source file path
         tmp_dir = self.tempdir