You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/06/22 13:38:56 UTC
[spark] branch master updated: [SPARK-44140][SQL][PYTHON] Support positional parameters in Python `sql()`
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 532a8325f5a [SPARK-44140][SQL][PYTHON] Support positional parameters in Python `sql()`
532a8325f5a is described below
commit 532a8325f5a3f11974c383cd1e344bb2ed56e9d8
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Thu Jun 22 16:38:36 2023 +0300
[SPARK-44140][SQL][PYTHON] Support positional parameters in Python `sql()`
### What changes were proposed in this pull request?
In the PR, I propose to extend PySpark API and extend the `sql` method by:
```python
def sql(
self, sqlQuery: str, args: Optional[Union[Dict[str, Any], List]] = None, **kwargs: Any
) -> DataFrame:
```
which accepts an list of Python objects that can be converted to SQL literal expressions.
For example:
```python
spark.sql("SELECT * FROM {df} WHERE {df[B]} > ? and ? < {df[A]}", args=[5, 2], df=mydf).show()
```
The `sql()` method parses the input SQL statement and replaces the positional parameters by the literal values.
### Why are the changes needed?
1. To conform the SQL standard and JDBC/ODBC protocol.
2. To improve user experience with PySpark via
- Using Spark as remote service (microservice).
- Write SQL code that will power reports, dashboards, charts and other data presentation solutions that need to account for criteria modifiable by users through an interface.
- Build a generic integration layer based on the PySpark API. The goal is to expose managed data to a wide application ecosystem with a microservice architecture. It is only natural in such a setup to ask for modular and reusable SQL code, that can be executed repeatedly with different parameter values.
3. To achieve feature parity with other systems that support positional parameters.
### Does this PR introduce _any_ user-facing change?
No, the changes extend the existing API.
### How was this patch tested?
By running new checks:
```
$ python/run-tests --parallelism=1 --testnames 'pyspark.sql.session SparkSession.sql'
$ python/run-tests --parallelism=1 --testnames 'pyspark.pandas.sql_formatter'
```
Closes #41695 from MaxGekk/parametrized-query-pos-param-python.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
---
python/pyspark/pandas/sql_formatter.py | 20 ++++++++++++-----
python/pyspark/sql/session.py | 40 +++++++++++++++++++++++++++-------
2 files changed, 47 insertions(+), 13 deletions(-)
diff --git a/python/pyspark/pandas/sql_formatter.py b/python/pyspark/pandas/sql_formatter.py
index 4387a1e0909..350152a2cdb 100644
--- a/python/pyspark/pandas/sql_formatter.py
+++ b/python/pyspark/pandas/sql_formatter.py
@@ -43,7 +43,7 @@ _CAPTURE_SCOPES = 3
def sql(
query: str,
index_col: Optional[Union[str, List[str]]] = None,
- args: Optional[Dict[str, Any]] = None,
+ args: Optional[Union[Dict[str, Any], List]] = None,
**kwargs: Any,
) -> DataFrame:
"""
@@ -102,18 +102,21 @@ def sql(
e f 3 6
Also note that the index name(s) should be matched to the existing name.
- args : dict
- A dictionary of parameter names to Python objects that can be converted to
- SQL literal expressions. See
+ args : dict or list
+ A dictionary of parameter names to Python objects or a list of Python objects
+ that can be converted to SQL literal expressions. See
<a href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
Supported Data Types</a> for supported value types in Python.
For example, dictionary keys: "rank", "name", "birthdate";
dictionary values: 1, "Steven", datetime.date(2023, 4, 2).
- Dict value can be also a `Column` of literal expression, in that case it is taken as is.
+ A value can be also a `Column` of literal expression, in that case it is taken as is.
.. versionadded:: 3.4.0
+ .. versionchanged:: 3.5.0
+ Added positional parameters.
+
kwargs
other variables that the user want to set that can be referenced in the query
@@ -174,6 +177,13 @@ def sql(
id
0 8
1 9
+
+ Or positional parameters marked by `?` in the SQL query by SQL literals.
+
+ >>> ps.sql("SELECT * FROM range(10) WHERE id > ?", args=[7])
+ id
+ 0 8
+ 1 9
"""
if os.environ.get("PYSPARK_PANDAS_SQL_LEGACY") == "1":
from pyspark.pandas import sql_processor
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 823164475ea..47b73700f0c 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -1410,28 +1410,35 @@ class SparkSession(SparkConversionMixin):
df._schema = struct
return df
- def sql(self, sqlQuery: str, args: Optional[Dict[str, Any]] = None, **kwargs: Any) -> DataFrame:
+ def sql(
+ self, sqlQuery: str, args: Optional[Union[Dict[str, Any], List]] = None, **kwargs: Any
+ ) -> DataFrame:
"""Returns a :class:`DataFrame` representing the result of the given query.
When ``kwargs`` is specified, this method formats the given string by using the Python
- standard formatter. The method binds named parameters to SQL literals from `args`.
+ standard formatter. The method binds named parameters to SQL literals or
+ positional parameters from `args`. It doesn't support named and positional parameters
+ in the same SQL query.
.. versionadded:: 2.0.0
.. versionchanged:: 3.4.0
Supports Spark Connect and parameterized SQL.
+ .. versionchanged:: 3.5.0
+ Added positional parameters.
+
Parameters
----------
sqlQuery : str
SQL query string.
- args : dict
- A dictionary of parameter names to Python objects that can be converted to
- SQL literal expressions. See
+ args : dict or list
+ A dictionary of parameter names to Python objects or a list of Python objects
+ that can be converted to SQL literal expressions. See
<a href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
Supported Data Types</a> for supported value types in Python.
For example, dictionary keys: "rank", "name", "birthdate";
- dictionary values: 1, "Steven", datetime.date(2023, 4, 2).
- Map value can be also a `Column` of literal expression, in that case it is taken as is.
+ dictionary or list values: 1, "Steven", datetime.date(2023, 4, 2).
+ A value can be also a `Column` of literal expression, in that case it is taken as is.
.. versionadded:: 3.4.0
@@ -1517,13 +1524,30 @@ class SparkSession(SparkConversionMixin):
+---+---+
| 3| 6|
+---+---+
+
+ Or positional parameters marked by `?` in the SQL query by SQL literals.
+
+ >>> spark.sql(
+ ... "SELECT * FROM {df} WHERE {df[B]} > ? and ? < {df[A]}",
+ ... args=[5, 2], df=mydf).show()
+ +---+---+
+ | A| B|
+ +---+---+
+ | 3| 6|
+ +---+---+
"""
formatter = SQLStringFormatter(self)
if len(kwargs) > 0:
sqlQuery = formatter.format(sqlQuery, **kwargs)
try:
- litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
+ if isinstance(args, Dict):
+ litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
+ else:
+ assert self._jvm is not None
+ litArgs = self._jvm.PythonUtils.toArray(
+ [_to_java_column(lit(v)) for v in (args or [])]
+ )
return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
finally:
if len(kwargs) > 0:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org