You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/18 11:49:46 UTC

[GitHub] [flink] dianfu commented on a change in pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

dianfu commented on a change in pull request #14410:
URL: https://github.com/apache/flink/pull/14410#discussion_r545734030



##########
File path: flink-python/pyflink/table/table.py
##########
@@ -825,10 +832,19 @@ def aggregate(self, func: Union[str, Expression]) -> 'AggregatedTable':
         """
         if isinstance(func, str):
             return AggregatedTable(self._j_table.aggregate(func), self._t_env)
-        else:
+        elif isinstance(func, Expression):
             return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
+        else:
+            if hasattr(func, "_alias_names"):
+                alias_names = getattr(func, "_alias_names")
+                func = func(with_columns(col("*"))).alias(*alias_names)
+            else:
+                func = func(with_columns(col("*")))
+            return AggregatedTable(self._j_table.aggregate(func._j_expr),

Review comment:
       one line?

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -759,7 +761,7 @@ def drop_columns(self, *fields: Union[str, Expression]) -> 'Table':
             assert isinstance(fields[0], str)
             return Table(self._j_table.dropColumns(fields[0]), self._t_env)
 
-    def map(self, func: Union[str, Expression]) -> 'Table':
+    def map(self, func: Union[str, Expression, UserDefinedScalarFunctionWrapper]) -> 'Table':

Review comment:
       Could you addd some examples in the PythonDoc?

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1165,9 +1211,30 @@ def aggregate(self, func: Union[str, Expression]) -> 'AggregatedTable':
         """
         if isinstance(func, str):
             return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+        elif isinstance(func, Expression):
+            return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
         else:
+            func = self._without_columns(func)
             return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
 
+    def _without_columns(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:
+        get_gateway()

Review comment:
       could be removed?

##########
File path: flink-python/pyflink/table/tests/test_row_based_operation.py
##########
@@ -125,9 +123,11 @@ def test_aggregate_with_pandas_udaf(self):
                                [DataTypes.FIELD("a", DataTypes.FLOAT()),
                                 DataTypes.FIELD("b", DataTypes.INT())]),
                            func_type="pandas")
-        t.group_by(t.a) \
-            .aggregate(pandas_udaf(t.b).alias("c", "d")) \
-            .select("a, c, d").execute_insert("Results") \
+        t.select(t.a, t.b) \
+            .group_by(t.a) \
+            .aggregate(pandas_udaf) \
+            .select("*") \

Review comment:
       I guess `.select("*")`  could be removed?

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -800,10 +804,13 @@ def flat_map(self, func: Union[str, Expression]) -> 'Table':
         """
         if isinstance(func, str):
             return Table(self._j_table.flatMap(func), self._t_env)
-        else:
+        elif isinstance(func, Expression):

Review comment:
       ditto

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1059,9 +1087,26 @@ def flat_aggregate(self, func: Union[str, Expression]) -> 'FlatAggregateTable':
         """
         if isinstance(func, str):
             return FlatAggregateTable(self._j_table.flatAggregate(func), self._t_env)
+        elif isinstance(func, Expression):
+            return FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
         else:
+            func = self._without_columns(func)
             return FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
 
+    def _without_columns(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:
+        get_gateway()

Review comment:
       could be removed?

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1165,9 +1211,30 @@ def aggregate(self, func: Union[str, Expression]) -> 'AggregatedTable':
         """
         if isinstance(func, str):
             return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+        elif isinstance(func, Expression):
+            return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
         else:
+            func = self._without_columns(func)
             return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
 
+    def _without_columns(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:

Review comment:
       ```suggestion
       def _to_expr(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:
   ```

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1059,9 +1087,26 @@ def flat_aggregate(self, func: Union[str, Expression]) -> 'FlatAggregateTable':
         """
         if isinstance(func, str):
             return FlatAggregateTable(self._j_table.flatAggregate(func), self._t_env)
+        elif isinstance(func, Expression):
+            return FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
         else:
+            func = self._without_columns(func)
             return FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
 
+    def _without_columns(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:
+        get_gateway()
+        group_keys_field = self._j_table.getClass().getDeclaredField("groupKeys")
+        group_keys_field.setAccessible(True)
+        j_group_keys = group_keys_field.get(self._j_table)
+        without_keys = without_columns(

Review comment:
       ```suggestion
           fields_without_keys = without_columns(
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org