You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/30 10:02:23 UTC

[GitHub] [flink] HuangXingBo commented on a change in pull request #13854: [FLINK-19235][python] Support more built-in aggs to be mixed use with Python UDAF

HuangXingBo commented on a change in pull request #13854:
URL: https://github.com/apache/flink/pull/13854#discussion_r514979676



##########
File path: flink-python/pyflink/table/functions.py
##########
@@ -45,6 +82,53 @@ def merge(self, accumulator, accumulators):
             accumulator[0] += acc[0]
 
 
+class CountAggFunction(AggregateFunction):
+
+    def get_value(self, accumulator):
+        return accumulator[0]
+
+    def create_accumulator(self):
+        return [0]
+
+    def accumulate(self, accumulator, *args):
+        if args[0] is not None:
+            accumulator[0] += 1
+
+    def retract(self, accumulator, *args):
+        if args[0] is not None:
+            accumulator[0] -= 1
+
+    def merge(self, accumulator, accumulators):
+        for acc in accumulators:
+            accumulator[0] += acc[0]
+
+
+class FirstValueAggFunction(AggregateFunction):
+
+    def get_value(self, accumulator):
+        return accumulator[0]
+
+    def create_accumulator(self):
+        # [first_value, first_order]
+        return [None, MAX_LONG_VALUE]
+
+    def accumulate(self, accumulator, *args):
+        if args[0] is not None:
+            if len(args) > 1:
+                if accumulator[1] > args[1]:
+                    accumulator[0] = args[0]
+                    accumulator[1] = args[1]
+            else:
+                if accumulator[0] is None:
+                    accumulator[0] = args[0]

Review comment:
       maybe we need the logic of
   `accumulator[1] = int(round(time.time() * 1000))`

##########
File path: flink-python/pyflink/table/functions.py
##########
@@ -20,12 +20,49 @@
 from abc import abstractmethod
 from decimal import Decimal
 
-from pyflink.table import AggregateFunction, MapView
+from pyflink.table import AggregateFunction, MapView, ListView
 
 MAX_LONG_VALUE = sys.maxsize
 MIN_LONG_VALUE = -MAX_LONG_VALUE - 1
 
 
+class AvgAggFunction(AggregateFunction):
+
+    def get_value(self, accumulator):
+        # sum / count
+        if accumulator[0] != 0:
+            return accumulator[1] / accumulator[0]
+        else:
+            return None
+
+    def create_accumulator(self):
+        # [count, sum]
+        return [0, None]

Review comment:
       I think we can set the initial value of sum as `0`

##########
File path: flink-python/pyflink/table/functions.py
##########
@@ -20,12 +20,49 @@
 from abc import abstractmethod
 from decimal import Decimal
 
-from pyflink.table import AggregateFunction, MapView
+from pyflink.table import AggregateFunction, MapView, ListView
 
 MAX_LONG_VALUE = sys.maxsize
 MIN_LONG_VALUE = -MAX_LONG_VALUE - 1
 
 
+class AvgAggFunction(AggregateFunction):
+
+    def get_value(self, accumulator):
+        # sum / count
+        if accumulator[0] != 0:
+            return accumulator[1] / accumulator[0]
+        else:
+            return None
+
+    def create_accumulator(self):
+        # [count, sum]
+        return [0, None]
+
+    def accumulate(self, accumulator, *args):
+        if args[0] is not None:
+            accumulator[0] += 1
+            if accumulator[1] is None:

Review comment:
       If we set the initial value of sum as `0`, we can remove the logic of deciding whether accumulator[1] is None.

##########
File path: flink-python/pyflink/table/functions.py
##########
@@ -20,12 +20,49 @@
 from abc import abstractmethod
 from decimal import Decimal
 
-from pyflink.table import AggregateFunction, MapView
+from pyflink.table import AggregateFunction, MapView, ListView
 
 MAX_LONG_VALUE = sys.maxsize
 MIN_LONG_VALUE = -MAX_LONG_VALUE - 1
 
 
+class AvgAggFunction(AggregateFunction):
+
+    def get_value(self, accumulator):
+        # sum / count
+        if accumulator[0] != 0:
+            return accumulator[1] / accumulator[0]
+        else:
+            return None
+
+    def create_accumulator(self):
+        # [count, sum]
+        return [0, None]
+
+    def accumulate(self, accumulator, *args):
+        if args[0] is not None:
+            accumulator[0] += 1
+            if accumulator[1] is None:
+                accumulator[1] = args[0]
+            else:
+                accumulator[1] += args[0]
+
+    def retract(self, accumulator, *args):
+        if args[0] is not None:
+            if accumulator[1] is not None:
+                accumulator[0] -= 1
+                accumulator[1] -= args[0]
+
+    def merge(self, accumulator, accumulators):
+        for acc in accumulators:
+            if acc[1] is not None:

Review comment:
       ditto

##########
File path: flink-python/pyflink/table/functions.py
##########
@@ -20,12 +20,49 @@
 from abc import abstractmethod
 from decimal import Decimal
 
-from pyflink.table import AggregateFunction, MapView
+from pyflink.table import AggregateFunction, MapView, ListView
 
 MAX_LONG_VALUE = sys.maxsize
 MIN_LONG_VALUE = -MAX_LONG_VALUE - 1
 
 
+class AvgAggFunction(AggregateFunction):
+
+    def get_value(self, accumulator):
+        # sum / count
+        if accumulator[0] != 0:
+            return accumulator[1] / accumulator[0]
+        else:
+            return None
+
+    def create_accumulator(self):
+        # [count, sum]
+        return [0, None]
+
+    def accumulate(self, accumulator, *args):
+        if args[0] is not None:
+            accumulator[0] += 1
+            if accumulator[1] is None:
+                accumulator[1] = args[0]
+            else:
+                accumulator[1] += args[0]
+
+    def retract(self, accumulator, *args):
+        if args[0] is not None:
+            if accumulator[1] is not None:

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org