You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/30 02:21:06 UTC

[GitHub] [flink] dianfu commented on a change in pull request #13843: [FLINK-19235][python] Support mixed use with built-in aggs like count1, sum0 and first_value for Python UDAF.

dianfu commented on a change in pull request #13843:
URL: https://github.com/apache/flink/pull/13843#discussion_r514680464



##########
File path: flink-python/pyflink/fn_execution/aggregate.py
##########
@@ -37,12 +38,37 @@ def join_row(left: Row, right: Row):
     return Row(*fields)
 
 
+def extract_data_view_specs_from_accumulator(current_index, accumulator):
+    # for built in functions we extract the data view specs from their accumulator
+    i = -1
+    extracted_specs = []
+    for field in accumulator:
+        i += 1
+        # TODO: infer the coder from the input types and output type of the built-in functions
+        if isinstance(field, MapView):
+            extracted_specs.append(MapViewSpec(
+                "builtInAgg%df%d" % (current_index, i), i, PickleCoder(), PickleCoder()))
+        elif isinstance(field, ListView):
+            extracted_specs.append(ListViewSpec(
+                "builtInAgg%df%d" % (current_index, i), i, PickleCoder()))
+    return extracted_specs
+
+
 def extract_data_view_specs(udfs):
     extracted_udf_data_view_specs = []
+    current_index = -1
     for udf in udfs:
+        current_index += 1
         udf_data_view_specs_proto = udf.specs
-        if udf_data_view_specs_proto is None:
-            extracted_udf_data_view_specs.append([])
+        if not udf_data_view_specs_proto:
+            if is_built_in_function(udf.payload):
+                bulit_in_function = load_aggregate_function(udf.payload)

Review comment:
       ```suggestion
                   built_in_function = load_aggregate_function(udf.payload)
   ```

##########
File path: flink-python/pyflink/fn_execution/aggregate.py
##########
@@ -37,12 +38,37 @@ def join_row(left: Row, right: Row):
     return Row(*fields)
 
 
+def extract_data_view_specs_from_accumulator(current_index, accumulator):
+    # for built in functions we extract the data view specs from their accumulator
+    i = -1
+    extracted_specs = []
+    for field in accumulator:
+        i += 1
+        # TODO: infer the coder from the input types and output type of the built-in functions
+        if isinstance(field, MapView):
+            extracted_specs.append(MapViewSpec(
+                "builtInAgg%df%d" % (current_index, i), i, PickleCoder(), PickleCoder()))
+        elif isinstance(field, ListView):
+            extracted_specs.append(ListViewSpec(
+                "builtInAgg%df%d" % (current_index, i), i, PickleCoder()))
+    return extracted_specs
+
+
 def extract_data_view_specs(udfs):
     extracted_udf_data_view_specs = []
+    current_index = -1
     for udf in udfs:
+        current_index += 1
         udf_data_view_specs_proto = udf.specs
-        if udf_data_view_specs_proto is None:
-            extracted_udf_data_view_specs.append([])
+        if not udf_data_view_specs_proto:
+            if is_built_in_function(udf.payload):
+                bulit_in_function = load_aggregate_function(udf.payload)
+                accumulator = bulit_in_function.create_accumulator()
+                extracted_udf_data_view_specs.append(
+                    extract_data_view_specs_from_accumulator(current_index, accumulator))
+            else:
+                extracted_udf_data_view_specs.append([])
+            continue

Review comment:
       What about enclose the following code in an *else*, then we can remove the *continue*. It will make the code more readable.

##########
File path: flink-python/pyflink/table/tests/test_aggregate.py
##########
@@ -256,6 +258,59 @@ def test_double_aggregate(self):
             .select("my_count(a) as a, my_sum(b) as b")
         assert_frame_equal(result.to_pandas(), pd.DataFrame([[3, 12]], columns=['a', 'b']))
 
+    def test_mixed_with_built_in_functions_with_retract(self):

Review comment:
       Could we merge the following two test cases into one test case or even merge them into an existing test case?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonGroupAggregateRule.java
##########
@@ -67,13 +67,14 @@ public boolean matches(RelOptRuleCall call) {
 			aggCalls.stream().anyMatch(x -> PythonUtil.isPythonAggregate(x, PythonFunctionKind.GENERAL));
 		boolean existPandasFunction =
 			aggCalls.stream().anyMatch(x -> PythonUtil.isPythonAggregate(x, PythonFunctionKind.PANDAS));
-		boolean existJavaFunction =
-			aggCalls.stream().anyMatch(x -> !PythonUtil.isPythonAggregate(x, null));
+		boolean existUserDefinedJavaFunction =

Review comment:
       ```suggestion
   		boolean existJavaUserDefinedFunction =
   ```

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonAggregate.scala
##########
@@ -158,4 +172,24 @@ trait CommonPythonAggregate extends CommonPythonBase {
       Array()
     }
   }
+
+  private def getBuiltInPythonAggregateFunction(
+      javaBuiltInAggregateFunction: UserDefinedFunction): BuiltInPythonAggregateFunction = {
+    javaBuiltInAggregateFunction match {
+      case _: Count1AggFunction =>
+        BuiltInPythonAggregateFunction.COUNT1
+      case _: FirstValueWithRetractAggFunction[_] =>
+        BuiltInPythonAggregateFunction.FIRST_VALUE_RETRACT
+      case _: IntSum0AggFunction | _: ByteSum0AggFunction | _: ShortSum0AggFunction |
+           _: LongSum0AggFunction =>
+        BuiltInPythonAggregateFunction.INT_SUM0
+      case _: FloatSum0AggFunction | _: DoubleSum0AggFunction =>
+        BuiltInPythonAggregateFunction.FLOAT_SUM0
+      case _: DecimalSum0AggFunction =>
+        BuiltInPythonAggregateFunction.DECIMAL_SUM0
+      case _ =>
+        throw new TableException("This aggregate function can not be mixed with Python UDAF: " +

Review comment:
       ```suggestion
           throw new TableException("Aggregate function %s is still not supported to be mixed with Python UDAF: " +
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org