You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/15 05:50:13 UTC

[GitHub] [spark] zhengruifeng commented on a diff in pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation

zhengruifeng commented on code in PR #39068:
URL: https://github.com/apache/spark/pull/39068#discussion_r1049238906


##########
python/pyspark/sql/connect/functions.py:
##########
@@ -79,6 +85,84 @@ def _invoke_binary_math_function(name: str, col1: Any, col2: Any) -> Column:
     return _invoke_function(name, *_cols)
 
 
+def _get_lambda_parameters(f: Callable) -> ValuesView[inspect.Parameter]:
+    signature = inspect.signature(f)
+    parameters = signature.parameters.values()
+
+    # We should exclude functions that use
+    # variable args and keyword argnames
+    # as well as keyword only args
+    supported_parameter_types = {
+        inspect.Parameter.POSITIONAL_OR_KEYWORD,
+        inspect.Parameter.POSITIONAL_ONLY,
+    }
+
+    # Validate that
+    # function arity is between 1 and 3
+    if not (1 <= len(parameters) <= 3):
+        raise ValueError(
+            "f should take between 1 and 3 arguments, but provided function takes {}".format(
+                len(parameters)
+            )
+        )
+
+    # and all arguments can be used as positional
+    if not all(p.kind in supported_parameter_types for p in parameters):
+        raise ValueError("f should use only POSITIONAL or POSITIONAL OR KEYWORD arguments")
+
+    return parameters
+
+
+def _create_lambda(f: Callable) -> LambdaFunction:
+    """
+    Create `o.a.s.sql.expressions.LambdaFunction` corresponding
+    to transformation described by f
+
+    :param f: A Python of one of the following forms:
+            - (Column) -> Column: ...
+            - (Column, Column) -> Column: ...
+            - (Column, Column, Column) -> Column: ...
+    """
+    parameters = _get_lambda_parameters(f)
+
+    arg_names = ["x", "y", "z"]
+
+    arg_cols: List[Column] = []
+    for arg in arg_names[: len(parameters)]:
+        # TODO: How to make sure lambda variable names are unique? RPC for increasing ID?

Review Comment:
   PySpark invokes `UnresolvedNamedLambdaVariable.freshVarName` in JVM to get a unique variable name
   
    https://github.com/apache/spark/blob/master/python/pyspark/sql/functions.py#L8157
   
   ```
   object UnresolvedNamedLambdaVariable {
   
     // Counter to ensure lambda variable names are unique
     private val nextVarNameId = new AtomicInteger(0)
   
     def freshVarName(name: String): String = {
       s"${name}_${nextVarNameId.getAndIncrement()}"
     }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org