You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "CaoYu (Jira)" <ji...@apache.org> on 2022/03/24 04:29:00 UTC
[jira] [Comment Edited] (FLINK-26728) Support min operation in KeyedStream
[ https://issues.apache.org/jira/browse/FLINK-26728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511587#comment-17511587 ]
CaoYu edited comment on FLINK-26728 at 3/24/22, 4:28 AM:
---------------------------------------------------------
Hi [~dianfu]
Currently, I have preliminarily implemented some functions of the min operator.
In the process I found that the code and the sum operator are very similar.
And I think the unrealized max, minby, maxby will be also the same as min and sum.
Here is the min operator code for the preliminary implementation:
{code:java}
class MinReduceFunction(ReduceFunction):
def __init__(self, position_to_min):
self._pos = position_to_min
self._reduce_func = None
def reduce(self, value1, value2):
def init_reduce_func(value_to_check):
if isinstance(value_to_check, tuple):
def reduce_func(v1, v2):
v1_list = list(v1)
v1_list[self._pos] = \
v2[self._pos] if v2[self._pos] < v1[self._pos] else v1[self._pos]
return tuple(v1_list)
self._reduce_func = reduce_func
elif isinstance(value_to_check, (list, Row)):
pass
else:
if self._pos != 0:
raise TypeError(
"The %s field selected on a basic type. A field expression on a "
"basic type can only select the 0th field (which means selecting "
"the entire basic type)." % self._pos)
def reduce_func(v1, v2):
return v2 if v2 < v1 else v1
self._reduce_func = reduce_func
try:
value2 < value1
except TypeError as err:
raise TypeError("To get a minimum, a given field data must be comparable "
"to each other. \n%s" % err) {code}
As you can see, the implementation of the core logic reduce method is very similar to sum.
And importantly, the difference between the min operator and the max operator only is "<" replace to ">"
So I wondered, whether to abstract a top-level method as basic method.
And sum, min, max, minby, maxby exist as enumerations.
By enumerations to choose implementation logic.
It looks like it will be:
{code:java}
def _basic_min_max(self, pos, type, is_by):
pass
def min(self, position_to_min):
self._basic_min_max(pos=position_to_min, type=min, is_by=False)
def max(self, position_to_max):
self._basic_min_max(pos=position_to_max, type=max, is_by=False)
def min_by(self, position_to_min_by):
self._basic_min_max(pos=position_to_min_by, type=min, is_by=True)
def max_by(self, position_to_max_by):
self._basic_min_max(pos=position_to_max_by, type=max, is_by=True) {code}
What do you think, waiting for your suggestions.
Thanks.
was (Author: javacaoyu):
Hi [~dianfu]
Currently, I have preliminarily implemented some functions of the min operator.
In the process I found that the code and the sum operator are very similar.
And I think the unrealized max, minby, maxby will be also the same as min and sum.
Here is the min operator code for the preliminary implementation:
{code:java}
class MinReduceFunction(ReduceFunction):
def __init__(self, position_to_min):
self._pos = position_to_min
self._reduce_func = None
def reduce(self, value1, value2):
def init_reduce_func(value_to_check):
if isinstance(value_to_check, tuple):
def reduce_func(v1, v2):
v1_list = list(v1)
v1_list[self._pos] = \
v2[self._pos] if v2[self._pos] < v1[self._pos] else v1[self._pos]
return tuple(v1_list)
self._reduce_func = reduce_func
elif isinstance(value_to_check, (list, Row)):
pass
else:
if self._pos != 0:
raise TypeError(
"The %s field selected on a basic type. A field expression on a "
"basic type can only select the 0th field (which means selecting "
"the entire basic type)." % self._pos)
def reduce_func(v1, v2):
return v2 if v2 < v1 else v1
self._reduce_func = reduce_func
try:
value2 < value1
except TypeError as err:
raise TypeError("To get a minimum, a given field data must be comparable "
"to each other. \n%s" % err) {code}
As you can see, the implementation of the core logic reduce method is very similar to sum.
And importantly, the difference between the min operator and the max operator only is "<" replace to ">"
So I wondered, whether to abstract a top-level method as basic method.
And sum, min, max, minby, maxby exist as enumerations.
By enumerations to choose implementation logic.
looks like:
{code:java}
def _basic_min_max(self, pos, type, is_by):
pass
def min(self, position_to_min):
self._basic_min_max(pos=position_to_min, type=min, is_by=False)
def max(self, position_to_max):
self._basic_min_max(pos=position_to_max, type=max, is_by=False)
def min_by(self, position_to_min_by):
self._basic_min_max(pos=position_to_min_by, type=min, is_by=True)
def max_by(self, position_to_max_by):
self._basic_min_max(pos=position_to_max_by, type=max, is_by=True) {code}
What do you think, waiting for your suggestions.
Thanks.
> Support min operation in KeyedStream
> ------------------------------------
>
> Key: FLINK-26728
> URL: https://issues.apache.org/jira/browse/FLINK-26728
> Project: Flink
> Issue Type: Improvement
> Components: API / Python
> Affects Versions: 1.14.3
> Reporter: CaoYu
> Assignee: CaoYu
> Priority: Major
>
> Support min operation in python-flink KeyedStream
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)