You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/18 06:33:49 UTC

[GitHub] [flink] dianfu commented on a change in pull request #19126: [FLINK-26609][python] Support sum operation in KeyedStream

dianfu commented on a change in pull request #19126:
URL: https://github.com/apache/flink/pull/19126#discussion_r829715015



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1168,6 +1168,96 @@ def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
         return self.process(FilterKeyedProcessFunctionAdapter(func), self._original_data_type_info)\
             .name("Filter")
 
+    def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+        """
+        Applies an aggregation that gives a rolling sum of the data stream at the
+        given position grouped by the given key. An independent aggregate is kept
+        per key.
+
+        Example(Tuple data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 5)])
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data to sum):
+        ::
+
+            >>> ds = self.env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],

Review comment:
       ```suggestion
               >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
   ```

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1168,6 +1168,96 @@ def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
         return self.process(FilterKeyedProcessFunctionAdapter(func), self._original_data_type_info)\
             .name("Filter")
 
+    def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+        """
+        Applies an aggregation that gives a rolling sum of the data stream at the
+        given position grouped by the given key. An independent aggregate is kept
+        per key.
+
+        Example(Tuple data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 5)])
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data to sum):
+        ::
+
+            >>> ds = self.env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...                                type_info=Types.ROW([Types.STRING(), Types.INT()]))
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data with fields name to sum):
+        ::
+
+            >>> ds = self.env.from_collection(
+            ...     [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...     type_info=Types.ROW_NAMED(["key", "value"], [Types.STRING(), Types.INT()])
+            ... )
+            >>> ds.key_by(lambda x: x[0]).sum("value")
+
+        :param position_to_sum:
+            The field position in the data points to sum, type can be int or str.
+            This is applicable to Tuple types, and {pyflink.common.types.Row} types.

Review comment:
       ```suggestion
               This is applicable to Tuple types, and :class:`pyflink.common.Row` types.
   ```

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1168,6 +1168,96 @@ def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
         return self.process(FilterKeyedProcessFunctionAdapter(func), self._original_data_type_info)\
             .name("Filter")
 
+    def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+        """
+        Applies an aggregation that gives a rolling sum of the data stream at the
+        given position grouped by the given key. An independent aggregate is kept
+        per key.
+
+        Example(Tuple data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 5)])
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data to sum):
+        ::
+
+            >>> ds = self.env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...                                type_info=Types.ROW([Types.STRING(), Types.INT()]))
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data with fields name to sum):
+        ::
+
+            >>> ds = self.env.from_collection(
+            ...     [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...     type_info=Types.ROW_NAMED(["key", "value"], [Types.STRING(), Types.INT()])
+            ... )
+            >>> ds.key_by(lambda x: x[0]).sum("value")
+
+        :param position_to_sum:
+            The field position in the data points to sum, type can be int or str.
+            This is applicable to Tuple types, and {pyflink.common.types.Row} types.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(position_to_sum, int) and not isinstance(position_to_sum, str):
+            raise TypeError("The input must be a int or str type for locate the value to sum")
+
+        output_type = _from_java_type(self._original_data_type_info.get_java_type_info())
+
+        class SumKeyedProcessFunctionAdapter(KeyedProcessFunction):

Review comment:
       What about creating a ReduceFunction and then calling self.reduce? It could simple the implementation.

##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -455,6 +455,56 @@ def filter(self, value):
         expected = ['+I[c, 1]', '+I[e, 2]']
         self.assert_equals_sorted(expected, results)
 
+    def test_keyed_sum_with_tuple_type(self):
+        ds = self.env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+                                      type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
+        keyed_stream = ds.key_by(lambda x: x[0], key_type=Types.STRING())
+
+        keyed_stream.sum(1)\
+            .add_sink(self.test_sink)
+        self.env.execute('key_by_sum_test_with_tuple_type')
+        results = self.test_sink.get_results(False)
+        if self.__class__ == StreamingModeDataStreamTests:

Review comment:
       What about splitting the test cases into StreamingModeDataStreamTests and BatchModeDataStreamTests separately and then we could avoid this check.

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1168,6 +1168,96 @@ def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
         return self.process(FilterKeyedProcessFunctionAdapter(func), self._original_data_type_info)\
             .name("Filter")
 
+    def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+        """
+        Applies an aggregation that gives a rolling sum of the data stream at the
+        given position grouped by the given key. An independent aggregate is kept
+        per key.
+
+        Example(Tuple data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 5)])
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data to sum):
+        ::
+
+            >>> ds = self.env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...                                type_info=Types.ROW([Types.STRING(), Types.INT()]))
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data with fields name to sum):
+        ::
+
+            >>> ds = self.env.from_collection(
+            ...     [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...     type_info=Types.ROW_NAMED(["key", "value"], [Types.STRING(), Types.INT()])
+            ... )
+            >>> ds.key_by(lambda x: x[0]).sum("value")
+
+        :param position_to_sum:
+            The field position in the data points to sum, type can be int or str.
+            This is applicable to Tuple types, and {pyflink.common.types.Row} types.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(position_to_sum, int) and not isinstance(position_to_sum, str):
+            raise TypeError("The input must be a int or str type for locate the value to sum")

Review comment:
       ```suggestion
               raise TypeError("The input must be of of int or str type to locate the value to sum")
   ```

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1168,6 +1168,96 @@ def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
         return self.process(FilterKeyedProcessFunctionAdapter(func), self._original_data_type_info)\
             .name("Filter")
 
+    def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+        """
+        Applies an aggregation that gives a rolling sum of the data stream at the
+        given position grouped by the given key. An independent aggregate is kept
+        per key.
+
+        Example(Tuple data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 5)])
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data to sum):
+        ::
+
+            >>> ds = self.env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...                                type_info=Types.ROW([Types.STRING(), Types.INT()]))
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data with fields name to sum):
+        ::
+
+            >>> ds = self.env.from_collection(

Review comment:
       ```suggestion
               >>> ds = env.from_collection(
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org