You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/17 03:25:22 UTC

[GitHub] [flink] HuangXingBo opened a new pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

HuangXingBo opened a new pull request #14410:
URL: https://github.com/apache/flink/pull/14410


   ## What is the purpose of the change
   
   *This pull request will support row-based operation to accept user-defined function directly*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *IT test in `test_row_based_operation.py`*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747440691


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 872ad09e6fdbaca0abb420db2d86321a87f95b01 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747440691


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974",
       "triggerID" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c40710b468a239841a062702183352fbace8907a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11074",
       "triggerID" : "c40710b468a239841a062702183352fbace8907a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aeb710b99fcabb7f95ad5627f9c7e636430eb30e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aeb710b99fcabb7f95ad5627f9c7e636430eb30e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c40710b468a239841a062702183352fbace8907a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11074) 
   * aeb710b99fcabb7f95ad5627f9c7e636430eb30e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14410:
URL: https://github.com/apache/flink/pull/14410#discussion_r546348910



##########
File path: flink-python/pyflink/table/tests/test_row_based_operation.py
##########
@@ -125,9 +123,11 @@ def test_aggregate_with_pandas_udaf(self):
                                [DataTypes.FIELD("a", DataTypes.FLOAT()),
                                 DataTypes.FIELD("b", DataTypes.INT())]),
                            func_type="pandas")
-        t.group_by(t.a) \
-            .aggregate(pandas_udaf(t.b).alias("c", "d")) \
-            .select("a, c, d").execute_insert("Results") \
+        t.select(t.a, t.b) \
+            .group_by(t.a) \
+            .aggregate(pandas_udaf) \
+            .select("*") \

Review comment:
       `AggregateTable` need to call `select` method to convert to `Table`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #14410:
URL: https://github.com/apache/flink/pull/14410#discussion_r546478883



##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1059,9 +1098,25 @@ def flat_aggregate(self, func: Union[str, Expression]) -> 'FlatAggregateTable':
         """
         if isinstance(func, str):
             return FlatAggregateTable(self._j_table.flatAggregate(func), self._t_env)
+        elif isinstance(func, Expression):
+            return FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
         else:
+            func = self._to_expr(func)
             return FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
 
+    def _to_expr(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:
+        group_keys_field = self._j_table.getClass().getDeclaredField("groupKeys")
+        group_keys_field.setAccessible(True)
+        j_group_keys = group_keys_field.get(self._j_table)
+        fields_without_keys = without_columns(
+            j_group_keys[0], *([j_group_keys[i] for i in range(1, len(j_group_keys))]))
+        if hasattr(func, "alias_names"):
+            alias_names = getattr(func, "alias_names")
+            func_expression = func(fields_without_keys).alias(*alias_names)

Review comment:
       I think the keys should also be given to the row based function.

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -817,6 +828,8 @@ def aggregate(self, func: Union[str, Expression]) -> 'AggregatedTable':
             ...                    DataTypes.FIELD("b", DataTypes.INT())]),
             ...               func_type="pandas")
             >>> tab.aggregate(agg(tab.a).alias("a", "b")).select("a, b")
+            >>> # input all columns
+            >>> tab.aggregate(agg.alias("a, b")).select("a, b")

Review comment:
       For Row based operation, the row function should receive Row/DataFrame as input. Otherwise, the fields name information will be missing. This will become a problem when there are hundreds of columns. It may be difficult to use as users have to give each column a meaningful name.

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -792,6 +798,8 @@ def flat_map(self, func: Union[str, Expression]) -> 'Table':
             ...     for s in string.split(","):
             ...         yield x, s
             >>> tab.flat_map(split(tab.a, table.b))
+            >>> # input all columns

Review comment:
       ```suggestion
               >>> # take all the columns as inputs
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-749383498


   @dianfu Thanks a lot for the review. I have addressed the comments at the latest commit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747440691


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974",
       "triggerID" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c40710b468a239841a062702183352fbace8907a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11074",
       "triggerID" : "c40710b468a239841a062702183352fbace8907a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aeb710b99fcabb7f95ad5627f9c7e636430eb30e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11111",
       "triggerID" : "aeb710b99fcabb7f95ad5627f9c7e636430eb30e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0727e955bba043f05aa6d6f5da5377a9dc086bc",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11168",
       "triggerID" : "e0727e955bba043f05aa6d6f5da5377a9dc086bc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aeb710b99fcabb7f95ad5627f9c7e636430eb30e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11111) 
   * e0727e955bba043f05aa6d6f5da5377a9dc086bc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11168) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747440691


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974",
       "triggerID" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c40710b468a239841a062702183352fbace8907a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11074",
       "triggerID" : "c40710b468a239841a062702183352fbace8907a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aeb710b99fcabb7f95ad5627f9c7e636430eb30e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11111",
       "triggerID" : "aeb710b99fcabb7f95ad5627f9c7e636430eb30e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aeb710b99fcabb7f95ad5627f9c7e636430eb30e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11111) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu merged pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
dianfu merged pull request #14410:
URL: https://github.com/apache/flink/pull/14410


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #14410:
URL: https://github.com/apache/flink/pull/14410#discussion_r545734030



##########
File path: flink-python/pyflink/table/table.py
##########
@@ -825,10 +832,19 @@ def aggregate(self, func: Union[str, Expression]) -> 'AggregatedTable':
         """
         if isinstance(func, str):
             return AggregatedTable(self._j_table.aggregate(func), self._t_env)
-        else:
+        elif isinstance(func, Expression):
             return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
+        else:
+            if hasattr(func, "_alias_names"):
+                alias_names = getattr(func, "_alias_names")
+                func = func(with_columns(col("*"))).alias(*alias_names)
+            else:
+                func = func(with_columns(col("*")))
+            return AggregatedTable(self._j_table.aggregate(func._j_expr),

Review comment:
       one line?

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -759,7 +761,7 @@ def drop_columns(self, *fields: Union[str, Expression]) -> 'Table':
             assert isinstance(fields[0], str)
             return Table(self._j_table.dropColumns(fields[0]), self._t_env)
 
-    def map(self, func: Union[str, Expression]) -> 'Table':
+    def map(self, func: Union[str, Expression, UserDefinedScalarFunctionWrapper]) -> 'Table':

Review comment:
       Could you addd some examples in the PythonDoc?

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1165,9 +1211,30 @@ def aggregate(self, func: Union[str, Expression]) -> 'AggregatedTable':
         """
         if isinstance(func, str):
             return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+        elif isinstance(func, Expression):
+            return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
         else:
+            func = self._without_columns(func)
             return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
 
+    def _without_columns(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:
+        get_gateway()

Review comment:
       could be removed?

##########
File path: flink-python/pyflink/table/tests/test_row_based_operation.py
##########
@@ -125,9 +123,11 @@ def test_aggregate_with_pandas_udaf(self):
                                [DataTypes.FIELD("a", DataTypes.FLOAT()),
                                 DataTypes.FIELD("b", DataTypes.INT())]),
                            func_type="pandas")
-        t.group_by(t.a) \
-            .aggregate(pandas_udaf(t.b).alias("c", "d")) \
-            .select("a, c, d").execute_insert("Results") \
+        t.select(t.a, t.b) \
+            .group_by(t.a) \
+            .aggregate(pandas_udaf) \
+            .select("*") \

Review comment:
       I guess `.select("*")`  could be removed?

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -800,10 +804,13 @@ def flat_map(self, func: Union[str, Expression]) -> 'Table':
         """
         if isinstance(func, str):
             return Table(self._j_table.flatMap(func), self._t_env)
-        else:
+        elif isinstance(func, Expression):

Review comment:
       ditto

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1059,9 +1087,26 @@ def flat_aggregate(self, func: Union[str, Expression]) -> 'FlatAggregateTable':
         """
         if isinstance(func, str):
             return FlatAggregateTable(self._j_table.flatAggregate(func), self._t_env)
+        elif isinstance(func, Expression):
+            return FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
         else:
+            func = self._without_columns(func)
             return FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
 
+    def _without_columns(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:
+        get_gateway()

Review comment:
       could be removed?

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1165,9 +1211,30 @@ def aggregate(self, func: Union[str, Expression]) -> 'AggregatedTable':
         """
         if isinstance(func, str):
             return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+        elif isinstance(func, Expression):
+            return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
         else:
+            func = self._without_columns(func)
             return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
 
+    def _without_columns(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:

Review comment:
       ```suggestion
       def _to_expr(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:
   ```

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1059,9 +1087,26 @@ def flat_aggregate(self, func: Union[str, Expression]) -> 'FlatAggregateTable':
         """
         if isinstance(func, str):
             return FlatAggregateTable(self._j_table.flatAggregate(func), self._t_env)
+        elif isinstance(func, Expression):
+            return FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
         else:
+            func = self._without_columns(func)
             return FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
 
+    def _without_columns(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:
+        get_gateway()
+        group_keys_field = self._j_table.getClass().getDeclaredField("groupKeys")
+        group_keys_field.setAccessible(True)
+        j_group_keys = group_keys_field.get(self._j_table)
+        without_keys = without_columns(

Review comment:
       ```suggestion
           fields_without_keys = without_columns(
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14410:
URL: https://github.com/apache/flink/pull/14410#discussion_r547100592



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
##########
@@ -48,6 +48,7 @@
 	private final PythonFunctionKind pythonFunctionKind;
 	private final boolean deterministic;
 	private final PythonEnv pythonEnv;
+	private final boolean usedInRowBasedOperation;

Review comment:
       Yes. I have created https://issues.apache.org/jira/browse/FLINK-20712 to support this feature.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #14410:
URL: https://github.com/apache/flink/pull/14410#discussion_r547015112



##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -47,6 +47,9 @@ message UserDefinedFunction {
 
   // The index of the over window used in pandas batch over window aggregation
   int32 window_index = 3;
+
+  // Whether the UDF is used in row-based operation
+  bool used_in_row_based_operation = 4;

Review comment:
       ```suggestion
     bool takes_row_as_input = 4;
   ```

##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -47,6 +47,9 @@ message UserDefinedFunction {
 
   // The index of the over window used in pandas batch over window aggregation
   int32 window_index = 3;
+
+  // Whether the UDF is used in row-based operation

Review comment:
       ```suggestion
     // Whether the UDF takes row as input instead of each columns of a row
   ```

##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -348,11 +348,20 @@ def __init__(self, func, input_types, func_type, deterministic=None, name=None):
             func.is_deterministic() if isinstance(func, UserDefinedFunction) else True)
         self._func_type = func_type
         self._judf_placeholder = None
+        self._used_in_row_based_operation = False
 
     def __call__(self, *args) -> Expression:
         from pyflink.table import expressions as expr
         return expr.call(self, *args)
 
+    def alias(self, *alias_names: str):
+        self._alias_names = alias_names
+        return self
+
+    def set_used_in_row_based_operation(self):

Review comment:
       ```suggestion
       def _set_takes_row_as_input(self):
   ```

##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -348,11 +348,20 @@ def __init__(self, func, input_types, func_type, deterministic=None, name=None):
             func.is_deterministic() if isinstance(func, UserDefinedFunction) else True)
         self._func_type = func_type
         self._judf_placeholder = None
+        self._used_in_row_based_operation = False
 
     def __call__(self, *args) -> Expression:
         from pyflink.table import expressions as expr
         return expr.call(self, *args)
 
+    def alias(self, *alias_names: str):

Review comment:
       ```suggestion
       def _alias(self, *alias_names: str):
   ```

##########
File path: flink-python/pyflink/proto/flink-fn-execution.proto
##########
@@ -47,6 +47,9 @@ message UserDefinedFunction {
 
   // The index of the over window used in pandas batch over window aggregation
   int32 window_index = 3;
+
+  // Whether the UDF is used in row-based operation
+  bool used_in_row_based_operation = 4;

Review comment:
       Why there is no such flag for aggregate function?

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -1165,9 +1224,26 @@ def aggregate(self, func: Union[str, Expression]) -> 'AggregatedTable':
         """
         if isinstance(func, str):
             return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+        elif isinstance(func, Expression):
+            return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
         else:
+            func.set_used_in_row_based_operation()
+            func = self._to_expr(func)
             return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
 
+    def _to_expr(self, func: UserDefinedAggregateFunctionWrapper) -> Expression:
+        group_window_field = self._j_table.getClass().getDeclaredField("window")
+        group_window_field.setAccessible(True)
+        j_group_window = group_window_field.get(self._j_table)
+        j_time_field = j_group_window.getTimeField()
+        fields_without_window = without_columns(j_time_field)
+        if hasattr(func, "_alias_names"):
+            alias_names = getattr(func, "_alias_names")
+            func_expression = func(fields_without_window).alias(*alias_names)
+        else:
+            func_expression = func(fields_without_window)
+        return func_expression
+
 

Review comment:
       Could you also update the example in FlatAggregateTable.select/AggregatedTable.select to use row-based operations?

##########
File path: flink-python/pyflink/table/table.py
##########
@@ -769,6 +771,8 @@ def map(self, func: Union[str, Expression]) -> 'Table':
             >>> add = udf(lambda x: Row(x + 1, x * x), result_type=DataTypes.Row(
             ... [DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b", DataTypes.INT())]))
             >>> tab.map(add(tab.a)).alias("a, b")
+            >>> # take all the columns as inputs
+            >>> tab.map(add)

Review comment:
       I guess the example doesn't apply any more as the user-defined function should take Row as input. Should also update the other examples.

##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -43,6 +43,15 @@ def wrap_pandas_result(it):
     return arrays
 
 
+def wrap_inputs_to_row(*args):

Review comment:
       ```suggestion
   def wrap_inputs_as_row(*args):
   ```

##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -460,7 +470,8 @@ def _create_judf(self, serialized_func, j_input_types, j_function_kind):
             j_result_type,
             j_function_kind,
             self._deterministic,
-            _get_python_env())
+            _get_python_env(),
+            self._used_in_row_based_operation)

Review comment:
       What about putting self._used_in_row_based_operation after self._deterministic?

##########
File path: flink-python/pyflink/table/udf.py
##########
@@ -348,11 +348,20 @@ def __init__(self, func, input_types, func_type, deterministic=None, name=None):
             func.is_deterministic() if isinstance(func, UserDefinedFunction) else True)
         self._func_type = func_type
         self._judf_placeholder = None
+        self._used_in_row_based_operation = False
 
     def __call__(self, *args) -> Expression:
         from pyflink.table import expressions as expr
         return expr.call(self, *args)
 
+    def alias(self, *alias_names: str):
+        self._alias_names = alias_names
+        return self
+
+    def set_used_in_row_based_operation(self):
+        self._used_in_row_based_operation = True
+        return self
+
     def java_user_defined_function(self):

Review comment:
       ```suggestion
       def _java_user_defined_function(self):
   ```

##########
File path: flink-python/pyflink/table/tests/test_row_based_operation.py
##########
@@ -120,14 +117,16 @@ def test_aggregate_with_pandas_udaf(self):
             ['a', 'b', 'c'],
             [DataTypes.TINYINT(), DataTypes.FLOAT(), DataTypes.INT()])
         self.t_env.register_table_sink("Results", table_sink)
-        pandas_udaf = udaf(lambda a: (a.mean(), a.max()),
+        pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                            result_type=DataTypes.ROW(
                                [DataTypes.FIELD("a", DataTypes.FLOAT()),
                                 DataTypes.FIELD("b", DataTypes.INT())]),
                            func_type="pandas")
-        t.group_by(t.a) \
-            .aggregate(pandas_udaf(t.b).alias("c", "d")) \
-            .select("a, c, d").execute_insert("Results") \
+        t.select(t.a, t.b) \
+            .group_by(t.a) \
+            .aggregate(pandas_udaf) \

Review comment:
       could you improve pandas_udaf to let it also access the group key?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/python/PythonTableFunction.java
##########
@@ -48,6 +48,7 @@
 	private final PythonFunctionKind pythonFunctionKind;
 	private final boolean deterministic;
 	private final PythonEnv pythonEnv;
+	private final boolean usedInRowBasedOperation;

Review comment:
       I guess we could also add row-based support in join_lateral/left_outer_join_lateral? I'm fine to do it in a separate PR if it you want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747180777


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 872ad09e6fdbaca0abb420db2d86321a87f95b01 (Thu Dec 17 03:27:16 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747440691


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974",
       "triggerID" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c40710b468a239841a062702183352fbace8907a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c40710b468a239841a062702183352fbace8907a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 872ad09e6fdbaca0abb420db2d86321a87f95b01 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974) 
   * c40710b468a239841a062702183352fbace8907a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747440691


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974",
       "triggerID" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 872ad09e6fdbaca0abb420db2d86321a87f95b01 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] HuangXingBo commented on a change in pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on a change in pull request #14410:
URL: https://github.com/apache/flink/pull/14410#discussion_r546348720



##########
File path: flink-python/pyflink/table/table.py
##########
@@ -759,7 +761,7 @@ def drop_columns(self, *fields: Union[str, Expression]) -> 'Table':
             assert isinstance(fields[0], str)
             return Table(self._j_table.dropColumns(fields[0]), self._t_env)
 
-    def map(self, func: Union[str, Expression]) -> 'Table':
+    def map(self, func: Union[str, Expression, UserDefinedScalarFunctionWrapper]) -> 'Table':

Review comment:
       Yes. make sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747440691


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974",
       "triggerID" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c40710b468a239841a062702183352fbace8907a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11074",
       "triggerID" : "c40710b468a239841a062702183352fbace8907a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aeb710b99fcabb7f95ad5627f9c7e636430eb30e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11111",
       "triggerID" : "aeb710b99fcabb7f95ad5627f9c7e636430eb30e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c40710b468a239841a062702183352fbace8907a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11074) 
   * aeb710b99fcabb7f95ad5627f9c7e636430eb30e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11111) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747440691


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974",
       "triggerID" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 872ad09e6fdbaca0abb420db2d86321a87f95b01 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747440691


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974",
       "triggerID" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c40710b468a239841a062702183352fbace8907a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11074",
       "triggerID" : "c40710b468a239841a062702183352fbace8907a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c40710b468a239841a062702183352fbace8907a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11074) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747440691


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974",
       "triggerID" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c40710b468a239841a062702183352fbace8907a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11074",
       "triggerID" : "c40710b468a239841a062702183352fbace8907a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 872ad09e6fdbaca0abb420db2d86321a87f95b01 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974) 
   * c40710b468a239841a062702183352fbace8907a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11074) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14410: [FLINK-20616][python] Support row-based operation to accept user-defined function directly

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14410:
URL: https://github.com/apache/flink/pull/14410#issuecomment-747440691


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10974",
       "triggerID" : "872ad09e6fdbaca0abb420db2d86321a87f95b01",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c40710b468a239841a062702183352fbace8907a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11074",
       "triggerID" : "c40710b468a239841a062702183352fbace8907a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aeb710b99fcabb7f95ad5627f9c7e636430eb30e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11111",
       "triggerID" : "aeb710b99fcabb7f95ad5627f9c7e636430eb30e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e0727e955bba043f05aa6d6f5da5377a9dc086bc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e0727e955bba043f05aa6d6f5da5377a9dc086bc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aeb710b99fcabb7f95ad5627f9c7e636430eb30e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11111) 
   * e0727e955bba043f05aa6d6f5da5377a9dc086bc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org