You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/06/22 13:38:56 UTC

[spark] branch master updated: [SPARK-44140][SQL][PYTHON] Support positional parameters in Python `sql()`

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 532a8325f5a [SPARK-44140][SQL][PYTHON] Support positional parameters in Python `sql()`
532a8325f5a is described below

commit 532a8325f5a3f11974c383cd1e344bb2ed56e9d8
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Thu Jun 22 16:38:36 2023 +0300

    [SPARK-44140][SQL][PYTHON] Support positional parameters in Python `sql()`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to extend PySpark API and extend the `sql` method by:
    ```python
    def sql(
      self, sqlQuery: str, args: Optional[Union[Dict[str, Any], List]] = None, **kwargs: Any
    ) -> DataFrame:
    ```
    which accepts an list of Python objects that can be converted to SQL literal expressions.
    
    For example:
    ```python
    spark.sql("SELECT * FROM {df} WHERE {df[B]} > ? and ? < {df[A]}", args=[5, 2], df=mydf).show()
    ```
    The `sql()` method parses the input SQL statement and replaces the positional parameters by the literal values.
    
    ### Why are the changes needed?
    1. To conform the SQL standard and JDBC/ODBC protocol.
    2. To improve user experience with PySpark via
        - Using Spark as remote service (microservice).
        - Write SQL code that will power reports, dashboards, charts and other data presentation solutions that need to account for criteria modifiable by users through an interface.
        - Build a generic integration layer based on the PySpark API. The goal is to expose managed data to a wide application ecosystem with a microservice architecture. It is only natural in such a setup to ask for modular and reusable SQL code, that can be executed repeatedly with different parameter values.
    
    3. To achieve feature parity with other systems that support positional parameters.
    
    ### Does this PR introduce _any_ user-facing change?
    No, the changes extend the existing API.
    
    ### How was this patch tested?
    By running new checks:
    ```
    $ python/run-tests --parallelism=1 --testnames 'pyspark.sql.session SparkSession.sql'
    $ python/run-tests --parallelism=1 --testnames 'pyspark.pandas.sql_formatter'
    ```
    
    Closes #41695 from MaxGekk/parametrized-query-pos-param-python.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 python/pyspark/pandas/sql_formatter.py | 20 ++++++++++++-----
 python/pyspark/sql/session.py          | 40 +++++++++++++++++++++++++++-------
 2 files changed, 47 insertions(+), 13 deletions(-)

diff --git a/python/pyspark/pandas/sql_formatter.py b/python/pyspark/pandas/sql_formatter.py
index 4387a1e0909..350152a2cdb 100644
--- a/python/pyspark/pandas/sql_formatter.py
+++ b/python/pyspark/pandas/sql_formatter.py
@@ -43,7 +43,7 @@ _CAPTURE_SCOPES = 3
 def sql(
     query: str,
     index_col: Optional[Union[str, List[str]]] = None,
-    args: Optional[Dict[str, Any]] = None,
+    args: Optional[Union[Dict[str, Any], List]] = None,
     **kwargs: Any,
 ) -> DataFrame:
     """
@@ -102,18 +102,21 @@ def sql(
             e      f       3  6
 
             Also note that the index name(s) should be matched to the existing name.
-    args : dict
-        A dictionary of parameter names to Python objects that can be converted to
-        SQL literal expressions. See
+    args : dict or list
+        A dictionary of parameter names to Python objects or a list of Python objects
+        that can be converted to SQL literal expressions. See
         <a href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
         Supported Data Types</a> for supported value types in Python.
         For example, dictionary keys: "rank", "name", "birthdate";
         dictionary values: 1, "Steven", datetime.date(2023, 4, 2).
-        Dict value can be also a `Column` of literal expression, in that case it is taken as is.
+        A value can be also a `Column` of literal expression, in that case it is taken as is.
 
 
         .. versionadded:: 3.4.0
 
+        .. versionchanged:: 3.5.0
+            Added positional parameters.
+
     kwargs
         other variables that the user want to set that can be referenced in the query
 
@@ -174,6 +177,13 @@ def sql(
        id
     0   8
     1   9
+
+    Or positional parameters marked by `?` in the SQL query by SQL literals.
+
+    >>> ps.sql("SELECT * FROM range(10) WHERE id > ?", args=[7])
+       id
+    0   8
+    1   9
     """
     if os.environ.get("PYSPARK_PANDAS_SQL_LEGACY") == "1":
         from pyspark.pandas import sql_processor
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 823164475ea..47b73700f0c 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -1410,28 +1410,35 @@ class SparkSession(SparkConversionMixin):
         df._schema = struct
         return df
 
-    def sql(self, sqlQuery: str, args: Optional[Dict[str, Any]] = None, **kwargs: Any) -> DataFrame:
+    def sql(
+        self, sqlQuery: str, args: Optional[Union[Dict[str, Any], List]] = None, **kwargs: Any
+    ) -> DataFrame:
         """Returns a :class:`DataFrame` representing the result of the given query.
         When ``kwargs`` is specified, this method formats the given string by using the Python
-        standard formatter. The method binds named parameters to SQL literals from `args`.
+        standard formatter. The method binds named parameters to SQL literals or
+        positional parameters from `args`. It doesn't support named and positional parameters
+        in the same SQL query.
 
         .. versionadded:: 2.0.0
 
         .. versionchanged:: 3.4.0
             Supports Spark Connect and parameterized SQL.
 
+        .. versionchanged:: 3.5.0
+            Added positional parameters.
+
         Parameters
         ----------
         sqlQuery : str
             SQL query string.
-        args : dict
-            A dictionary of parameter names to Python objects that can be converted to
-            SQL literal expressions. See
+        args : dict or list
+            A dictionary of parameter names to Python objects or a list of Python objects
+            that can be converted to SQL literal expressions. See
             <a href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
             Supported Data Types</a> for supported value types in Python.
             For example, dictionary keys: "rank", "name", "birthdate";
-            dictionary values: 1, "Steven", datetime.date(2023, 4, 2).
-            Map value can be also a `Column` of literal expression, in that case it is taken as is.
+            dictionary or list values: 1, "Steven", datetime.date(2023, 4, 2).
+            A value can be also a `Column` of literal expression, in that case it is taken as is.
 
             .. versionadded:: 3.4.0
 
@@ -1517,13 +1524,30 @@ class SparkSession(SparkConversionMixin):
         +---+---+
         |  3|  6|
         +---+---+
+
+        Or positional parameters marked by `?` in the SQL query by SQL literals.
+
+        >>> spark.sql(
+        ...   "SELECT * FROM {df} WHERE {df[B]} > ? and ? < {df[A]}",
+        ...   args=[5, 2], df=mydf).show()
+        +---+---+
+        |  A|  B|
+        +---+---+
+        |  3|  6|
+        +---+---+
         """
 
         formatter = SQLStringFormatter(self)
         if len(kwargs) > 0:
             sqlQuery = formatter.format(sqlQuery, **kwargs)
         try:
-            litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
+            if isinstance(args, Dict):
+                litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
+            else:
+                assert self._jvm is not None
+                litArgs = self._jvm.PythonUtils.toArray(
+                    [_to_java_column(lit(v)) for v in (args or [])]
+                )
             return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
         finally:
             if len(kwargs) > 0:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org