You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "CaoYu (Jira)" <ji...@apache.org> on 2022/03/24 04:29:00 UTC

[jira] [Comment Edited] (FLINK-26728) Support min operation in KeyedStream

    [ https://issues.apache.org/jira/browse/FLINK-26728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511587#comment-17511587 ] 

CaoYu edited comment on FLINK-26728 at 3/24/22, 4:28 AM:
---------------------------------------------------------

Hi [~dianfu] 

 

Currently, I have preliminarily implemented some functions of the min operator.

In the process I found that the code and the sum operator are very similar.

And I think the unrealized max, minby, maxby will be also the same as min and sum.

 

Here is the min operator code for the preliminary implementation:
{code:java}
class MinReduceFunction(ReduceFunction):
    def __init__(self, position_to_min):
        self._pos = position_to_min
        self._reduce_func = None
    def reduce(self, value1, value2):
        def init_reduce_func(value_to_check):
            if isinstance(value_to_check, tuple):
                def reduce_func(v1, v2):
                    v1_list = list(v1)
                    v1_list[self._pos] = \
                        v2[self._pos] if v2[self._pos] < v1[self._pos] else v1[self._pos]
                    return tuple(v1_list)
                self._reduce_func = reduce_func
            elif isinstance(value_to_check, (list, Row)):
                pass
            else:
                if self._pos != 0:
                    raise TypeError(
                        "The %s field selected on a basic type. A field expression on a "
                        "basic type can only select the 0th field (which means selecting "
                        "the entire basic type)." % self._pos)
                def reduce_func(v1, v2):
                    return v2 if v2 < v1 else v1
                self._reduce_func = reduce_func
        try:
            value2 < value1
        except TypeError as err:
            raise TypeError("To get a minimum, a given field data must be comparable "
                            "to each other. \n%s" % err) {code}
As you can see, the implementation of the core logic reduce method is very similar to sum.

And importantly, the difference between the min operator and the max operator only is "<" replace to ">"

 

So I wondered, whether to abstract a top-level method as basic method.

And sum, min, max, minby, maxby exist as enumerations.

By enumerations to choose implementation logic.

 

 

It looks like it will be:

 
{code:java}
def _basic_min_max(self, pos, type, is_by):
    pass
 
def min(self, position_to_min):
    self._basic_min_max(pos=position_to_min, type=min, is_by=False)
 
 
def max(self, position_to_max):
    self._basic_min_max(pos=position_to_max, type=max, is_by=False)
 
def min_by(self, position_to_min_by):
    self._basic_min_max(pos=position_to_min_by, type=min, is_by=True)
 
def max_by(self, position_to_max_by):
    self._basic_min_max(pos=position_to_max_by, type=max, is_by=True) {code}
 

What do you think, waiting for your suggestions.

 

Thanks.

 

 

 

 


was (Author: javacaoyu):
Hi [~dianfu] 

 

Currently, I have preliminarily implemented some functions of the min operator.

In the process I found that the code and the sum operator are very similar.

And I think the unrealized max, minby, maxby will be also the same as min and sum.

 

Here is the min operator code for the preliminary implementation:
{code:java}
class MinReduceFunction(ReduceFunction):
    def __init__(self, position_to_min):
        self._pos = position_to_min
        self._reduce_func = None
    def reduce(self, value1, value2):
        def init_reduce_func(value_to_check):
            if isinstance(value_to_check, tuple):
                def reduce_func(v1, v2):
                    v1_list = list(v1)
                    v1_list[self._pos] = \
                        v2[self._pos] if v2[self._pos] < v1[self._pos] else v1[self._pos]
                    return tuple(v1_list)
                self._reduce_func = reduce_func
            elif isinstance(value_to_check, (list, Row)):
                pass
            else:
                if self._pos != 0:
                    raise TypeError(
                        "The %s field selected on a basic type. A field expression on a "
                        "basic type can only select the 0th field (which means selecting "
                        "the entire basic type)." % self._pos)
                def reduce_func(v1, v2):
                    return v2 if v2 < v1 else v1
                self._reduce_func = reduce_func
        try:
            value2 < value1
        except TypeError as err:
            raise TypeError("To get a minimum, a given field data must be comparable "
                            "to each other. \n%s" % err) {code}
As you can see, the implementation of the core logic reduce method is very similar to sum.

And importantly, the difference between the min operator and the max operator only is "<" replace to ">"

 

So I wondered, whether to abstract a top-level method as basic method.

And sum, min, max, minby, maxby exist as enumerations.

By enumerations to choose implementation logic.

 

 

looks like:

 
{code:java}
def _basic_min_max(self, pos, type, is_by):
    pass
 
def min(self, position_to_min):
    self._basic_min_max(pos=position_to_min, type=min, is_by=False)
 
 
def max(self, position_to_max):
    self._basic_min_max(pos=position_to_max, type=max, is_by=False)
 
def min_by(self, position_to_min_by):
    self._basic_min_max(pos=position_to_min_by, type=min, is_by=True)
 
def max_by(self, position_to_max_by):
    self._basic_min_max(pos=position_to_max_by, type=max, is_by=True) {code}
 

What do you think, waiting for your suggestions.

 

Thanks.

 

 

 

 

> Support min operation in KeyedStream
> ------------------------------------
>
>                 Key: FLINK-26728
>                 URL: https://issues.apache.org/jira/browse/FLINK-26728
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Python
>    Affects Versions: 1.14.3
>            Reporter: CaoYu
>            Assignee: CaoYu
>            Priority: Major
>
> Support min operation in python-flink KeyedStream
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)