You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/12/12 11:44:58 UTC

[flink] branch master updated: [FLINK-30366][python] Fix Python Group Agg failed in cleaning the idle state

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 72a70313b59 [FLINK-30366][python] Fix Python Group Agg failed in cleaning the idle state
72a70313b59 is described below

commit 72a70313b59352736514b4927a1dfadc2e8e4232
Author: huangxingbo <hx...@apache.org>
AuthorDate: Mon Dec 12 11:49:15 2022 +0800

    [FLINK-30366][python] Fix Python Group Agg failed in cleaning the idle state
    
    This closes #21488.
---
 flink-python/pyflink/fn_execution/coder_impl_fast.pyx      | 2 +-
 flink-python/pyflink/fn_execution/coder_impl_slow.py       | 2 ++
 flink-python/pyflink/fn_execution/table/aggregate_fast.pyx | 2 +-
 flink-python/pyflink/fn_execution/table/aggregate_slow.py  | 4 ++--
 flink-python/pyflink/fn_execution/tests/test_coders.py     | 6 ++++++
 5 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
index 41ed4ab38c4..92dff893fe9 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
+++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx
@@ -327,7 +327,7 @@ cdef class FlattenRowCoderImpl(FieldCoderImpl):
         cdef size_t i
         cdef FieldCoderImpl field_coder
 
-        list_value = <list> value
+        list_value = <list?> value
 
         # encode mask value
         self._mask_utils.write_mask(list_value, 0, out_stream)
diff --git a/flink-python/pyflink/fn_execution/coder_impl_slow.py b/flink-python/pyflink/fn_execution/coder_impl_slow.py
index 23af3483e0a..769720dc277 100644
--- a/flink-python/pyflink/fn_execution/coder_impl_slow.py
+++ b/flink-python/pyflink/fn_execution/coder_impl_slow.py
@@ -209,6 +209,8 @@ class FlattenRowCoderImpl(FieldCoderImpl):
         self._mask_utils = MaskUtils(self._field_count)
 
     def encode_to_stream(self, value, out_stream: OutputStream):
+        if not isinstance(value, List):
+            raise TypeError('Expected list, got {0}'.format(type(value)))
         # encode mask value
         self._mask_utils.write_mask(value, 0, out_stream)
 
diff --git a/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx b/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
index 2db06c735df..ca68f8546f1 100644
--- a/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
+++ b/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx
@@ -456,7 +456,7 @@ cdef class GroupAggFunctionBase:
 
     cpdef void on_timer(self, InternalRow key):
         if self.state_cleaning_enabled:
-            self.state_backend.set_current_key(key)
+            self.state_backend.set_current_key(list(key.values))
             accumulator_state = self.state_backend.get_value_state(
                 "accumulators", self.state_value_coder)
             accumulator_state.clear()
diff --git a/flink-python/pyflink/fn_execution/table/aggregate_slow.py b/flink-python/pyflink/fn_execution/table/aggregate_slow.py
index e10f6b06946..52c61e0f855 100644
--- a/flink-python/pyflink/fn_execution/table/aggregate_slow.py
+++ b/flink-python/pyflink/fn_execution/table/aggregate_slow.py
@@ -436,9 +436,9 @@ class GroupAggFunctionBase(object):
         except KeyError:
             self.buffer[tuple(key)] = [input_data]
 
-    def on_timer(self, key):
+    def on_timer(self, key: Row):
         if self.state_cleaning_enabled:
-            self.state_backend.set_current_key(key)
+            self.state_backend.set_current_key(list(key._values))
             accumulator_state = self.state_backend.get_value_state(
                 "accumulators", self.state_value_coder)
             accumulator_state.clear()
diff --git a/flink-python/pyflink/fn_execution/tests/test_coders.py b/flink-python/pyflink/fn_execution/tests/test_coders.py
index 4d11ca18e5f..8d4a3b24cc2 100644
--- a/flink-python/pyflink/fn_execution/tests/test_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_coders.py
@@ -189,6 +189,12 @@ class CodersTest(PyFlinkTestCase):
         coder = CountWindowCoder()
         self.check_coder(coder, CountWindow(100))
 
+    def test_coder_with_unmatched_type(self):
+        from pyflink.common import Row
+        coder = FlattenRowCoder([BigIntCoder()])
+        with self.assertRaises(TypeError, msg='Expected list, got Row'):
+            self.check_coder(coder, Row(1))
+
 
 if __name__ == '__main__':
     logging.getLogger().setLevel(logging.INFO)