You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/11/26 01:37:40 UTC

[spark] branch master updated: [SPARK-37436][PYTHON] Uses Python's standard string formatter for SQL API in pandas API on Spark

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 69e1151  [SPARK-37436][PYTHON] Uses Python's standard string formatter for SQL API in pandas API on Spark
69e1151 is described below

commit 69e115183c1c424bd61a9a4a4724c1aff9970ef1
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Fri Nov 26 10:36:52 2021 +0900

    [SPARK-37436][PYTHON] Uses Python's standard string formatter for SQL API in pandas API on Spark
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to use [Python's standard string formatter](https://docs.python.org/3/library/string.html#custom-string-formatting) instead of hacky custom SQL parser for SQL API in pandas API on Spark
    
    ### Why are the changes needed?
    
    Current implementation of parsing is very hacky, and does not work. It is [dependent on Python's internal module](https://github.com/apache/spark/blob/master/python/pyspark/pandas/sql_processor.py#L291), and [Series is being treated as a table](https://github.com/apache/spark/blob/master/python/pyspark/pandas/sql_processor.py#L339-L340), etc.
    
    We should have the Python standard string formatter with the standard interface and the standard support.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    **Disallowed:**
    
    1. `Series` as a table
    
        ```python
        myser = ps.Series({'a': [1.0, 2.0, 3.0], 'b': [15.0, 30.0, 45.0]})
        ps.sql("SELECT * from {myser}", myser=myser)
        ```
    
    2. `list` and `range`
    
        ```python
        strs = ['a', 'b']
        ps.sql("SELECT 'a' IN {strs}", strs=strs)
        ```
    
    3. Automatic local/global variable detection:
    
        ```python
        strs = ['a', 'b']
        ps.sql("SELECT 'a' IN {strs}")
        ```
    
    **Allowed:**
    
    1. `Series` as a column
    
        ```python
        mydf = ps.range(10)
        ps.sql("SELECT {ser} FROM {mydf}", ser=mydf.id, mydf=mydf)
        ```
    
    2. Reference checking (between `Series` and `DataFrame`)
    
        ```python
        mydf = ps.range(10)
        ps.sql("SELECT {ser} FROM tblA", ser=mydf.id)
        ```
    
        ```
        ValueError: The series in {ser} does not refer any dataframe specified.
        ```
    
    3. Attribute supports from frame (standard Python support):
    
        ```python
        mydf = ps.range(10)
        ps.sql("SELECT {tbl.id} FROM {tbl}", tbl=mydf)
        ```
    
    ### How was this patch tested?
    
    Doctests were added.
    
    Closes #34677 from HyukjinKwon/custom-formatter.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 dev/sparktestsupport/modules.py                    |   1 +
 .../source/migration_guide/pyspark_3.2_to_3.3.rst  |   1 +
 python/pyspark/pandas/__init__.py                  |   2 +-
 python/pyspark/pandas/sql_formatter.py             | 273 +++++++++++++++++++++
 python/pyspark/pandas/sql_processor.py             |  32 ++-
 python/pyspark/pandas/tests/test_sql.py            |  49 +++-
 python/pyspark/pandas/usage_logging/__init__.py    |   6 +-
 7 files changed, 341 insertions(+), 23 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index d13be2e..e53ee5a 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -606,6 +606,7 @@ pyspark_pandas = Module(
         "pyspark.pandas.namespace",
         "pyspark.pandas.numpy_compat",
         "pyspark.pandas.sql_processor",
+        "pyspark.pandas.sql_formatter",
         "pyspark.pandas.strings",
         "pyspark.pandas.utils",
         "pyspark.pandas.window",
diff --git a/python/docs/source/migration_guide/pyspark_3.2_to_3.3.rst b/python/docs/source/migration_guide/pyspark_3.2_to_3.3.rst
index 060f24c..90651a9 100644
--- a/python/docs/source/migration_guide/pyspark_3.2_to_3.3.rst
+++ b/python/docs/source/migration_guide/pyspark_3.2_to_3.3.rst
@@ -20,4 +20,5 @@
 Upgrading from PySpark 3.2 to 3.3
 =================================
 
+* In Spark 3.3, the ``pyspark.pandas.sql`` method follows [the standard Python string formatter](https://docs.python.org/3/library/string.html#format-string-syntax). To restore the previous behavior, set ``PYSPARK_PANDAS_SQL_LEGACY`` environment variable to ``1``.
 * In Spark 3.3, the ``drop`` method of pandas API on Spark DataFrame supports dropping rows by ``index``, and sets dropping by index instead of column by default.
diff --git a/python/pyspark/pandas/__init__.py b/python/pyspark/pandas/__init__.py
index ea8a9ea..04128ed 100644
--- a/python/pyspark/pandas/__init__.py
+++ b/python/pyspark/pandas/__init__.py
@@ -144,4 +144,4 @@ _auto_patch_pandas()
 # Import after the usage logger is attached.
 from pyspark.pandas.config import get_option, options, option_context, reset_option, set_option
 from pyspark.pandas.namespace import *  # F405
-from pyspark.pandas.sql_processor import sql
+from pyspark.pandas.sql_formatter import sql
diff --git a/python/pyspark/pandas/sql_formatter.py b/python/pyspark/pandas/sql_formatter.py
new file mode 100644
index 0000000..685ee25
--- /dev/null
+++ b/python/pyspark/pandas/sql_formatter.py
@@ -0,0 +1,273 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import string
+from typing import Any, Optional, Union, List, Sequence, Mapping, Tuple
+import uuid
+import warnings
+
+import pandas as pd
+
+from pyspark.pandas.internal import InternalFrame
+from pyspark.pandas.namespace import _get_index_map
+from pyspark.sql.functions import lit
+from pyspark import pandas as ps
+from pyspark.sql import SparkSession
+from pyspark.pandas.utils import default_session
+from pyspark.pandas.frame import DataFrame
+from pyspark.pandas.series import Series
+
+
+__all__ = ["sql"]
+
+
+# This is not used in this file. It's for legacy sql_processor.
+_CAPTURE_SCOPES = 3
+
+
+def sql(
+    query: str,
+    index_col: Optional[Union[str, List[str]]] = None,
+    **kwargs: Any,
+) -> DataFrame:
+    """
+    Execute a SQL query and return the result as a pandas-on-Spark DataFrame.
+
+    This function acts as a standard Python string formatter with understanding
+    the following variable types:
+
+        * pandas-on-Spark DataFrame
+        * pandas-on-Spark Series
+        * pandas DataFrame
+        * pandas Series
+        * string
+
+    Parameters
+    ----------
+    query : str
+        the SQL query
+    index_col : str or list of str, optional
+        Column names to be used in Spark to represent pandas-on-Spark's index. The index name
+        in pandas-on-Spark is ignored. By default, the index is always lost.
+
+        .. note:: If you want to preserve the index, explicitly use :func:`DataFrame.reset_index`,
+            and pass it to the sql statement with `index_col` parameter.
+
+            For example,
+
+            >>> psdf = ps.DataFrame({"A": [1, 2, 3], "B":[4, 5, 6]}, index=['a', 'b', 'c'])
+            >>> new_psdf = psdf.reset_index()
+            >>> ps.sql("SELECT * FROM {new_psdf}", index_col="index", new_psdf=new_psdf)
+            ... # doctest: +NORMALIZE_WHITESPACE
+                   A  B
+            index
+            a      1  4
+            b      2  5
+            c      3  6
+
+            For MultiIndex,
+
+            >>> psdf = ps.DataFrame(
+            ...     {"A": [1, 2, 3], "B": [4, 5, 6]},
+            ...     index=pd.MultiIndex.from_tuples(
+            ...         [("a", "b"), ("c", "d"), ("e", "f")], names=["index1", "index2"]
+            ...     ),
+            ... )
+            >>> new_psdf = psdf.reset_index()
+            >>> ps.sql("SELECT * FROM {new_psdf}", index_col=["index1", "index2"], new_psdf=new_psdf)
+            ... # doctest: +NORMALIZE_WHITESPACE
+                           A  B
+            index1 index2
+            a      b       1  4
+            c      d       2  5
+            e      f       3  6
+
+            Also note that the index name(s) should be matched to the existing name.
+    kwargs
+        other variables that the user want to set that can be referenced in the query
+
+    Returns
+    -------
+    pandas-on-Spark DataFrame
+
+    Examples
+    --------
+
+    Calling a built-in SQL function.
+
+    >>> ps.sql("SELECT * FROM range(10) where id > 7")
+       id
+    0   8
+    1   9
+
+    >>> ps.sql("SELECT * FROM range(10) WHERE id > {bound1} AND id < {bound2}", bound1=7, bound2=9)
+       id
+    0   8
+
+    >>> mydf = ps.range(10)
+    >>> x = tuple(range(4))
+    >>> ps.sql("SELECT {ser} FROM {mydf} WHERE id IN {x}", ser=mydf.id, mydf=mydf, x=x)
+       id
+    0   0
+    1   1
+    2   2
+    3   3
+
+    Mixing pandas-on-Spark and pandas DataFrames in a join operation. Note that the index is
+    dropped.
+
+    >>> ps.sql('''
+    ...   SELECT m1.a, m2.b
+    ...   FROM {table1} m1 INNER JOIN {table2} m2
+    ...   ON m1.key = m2.key
+    ...   ORDER BY m1.a, m2.b''',
+    ...   table1=ps.DataFrame({"a": [1,2], "key": ["a", "b"]}),
+    ...   table2=pd.DataFrame({"b": [3,4,5], "key": ["a", "b", "b"]}))
+       a  b
+    0  1  3
+    1  2  4
+    2  2  5
+
+    Also, it is possible to query using Series.
+
+    >>> psdf = ps.DataFrame({"A": [1, 2, 3], "B":[4, 5, 6]}, index=['a', 'b', 'c'])
+    >>> ps.sql("SELECT {mydf.A} FROM {mydf}", mydf=psdf)
+       A
+    0  1
+    1  2
+    2  3
+    """
+    if os.environ.get("PYSPARK_PANDAS_SQL_LEGACY") == "1":
+        from pyspark.pandas import sql_processor
+
+        warnings.warn(
+            "Deprecated in 3.3.0, and the legacy behavior "
+            "will be removed in the future releases.",
+            FutureWarning,
+        )
+        return sql_processor.sql(query, index_col=index_col, **kwargs)
+
+    session = default_session()
+    formatter = SQLStringFormatter(session)
+    try:
+        sdf = session.sql(formatter.format(query, **kwargs))
+    finally:
+        formatter.clear()
+
+    index_spark_columns, index_names = _get_index_map(sdf, index_col)
+
+    return DataFrame(
+        InternalFrame(
+            spark_frame=sdf, index_spark_columns=index_spark_columns, index_names=index_names
+        )
+    )
+
+
+class SQLStringFormatter(string.Formatter):
+    """
+    A standard ``string.Formatter`` in Python that can understand pandas-on-Spark instances
+    with basic Python objects. This object has to be clear after the use for single SQL
+    query; cannot be reused across multiple SQL queries without cleaning.
+    """
+
+    def __init__(self, session: SparkSession) -> None:
+        self._session: SparkSession = session
+        self._temp_views: List[Tuple[DataFrame, str]] = []
+        self._ref_sers: List[Tuple[Series, str]] = []
+
+    def vformat(self, format_string: str, args: Sequence[Any], kwargs: Mapping[str, Any]) -> str:
+        ret = super(SQLStringFormatter, self).vformat(format_string, args, kwargs)
+
+        for ref, n in self._ref_sers:
+            if not any((ref is v for v in df._pssers.values()) for df, _ in self._temp_views):
+                # If referred DataFrame does not hold the given Series, raise an error.
+                raise ValueError("The series in {%s} does not refer any dataframe specified." % n)
+        return ret
+
+    def get_field(self, field_name: str, args: Sequence[Any], kwargs: Mapping[str, Any]) -> Any:
+        obj, first = super(SQLStringFormatter, self).get_field(field_name, args, kwargs)
+        return self._convert_value(obj, field_name), first
+
+    def _convert_value(self, val: Any, name: str) -> Optional[str]:
+        """
+        Converts the given value into a SQL string.
+        """
+        if isinstance(val, pd.Series):
+            # Return the column name from pandas Series directly.
+            return ps.from_pandas(val).to_frame()._to_spark().columns[0]
+        elif isinstance(val, Series):
+            # Return the column name of pandas-on-Spark Series iff its DataFrame was
+            # referred. The check will be done in `vformat` after we parse all.
+            self._ref_sers.append((val, name))
+            return val.to_frame()._to_spark().columns[0]
+        elif isinstance(val, (DataFrame, pd.DataFrame)):
+            df_name = "_pandas_api_%s" % str(uuid.uuid4()).replace("-", "")
+
+            if isinstance(val, pd.DataFrame):
+                # Don't store temp view for plain pandas instances
+                # because it is unable to know which pandas DataFrame
+                # holds which Series.
+                val = ps.from_pandas(val)
+            else:
+                for df, n in self._temp_views:
+                    if df is val:
+                        return n
+                self._temp_views.append((val, df_name))
+
+            val._to_spark().createOrReplaceTempView(df_name)
+            return df_name
+        elif isinstance(val, str):
+            return lit(val)._jc.expr().sql()  # for escaped characters.
+        else:
+            return val
+
+    def clear(self) -> None:
+        for _, n in self._temp_views:
+            self._session.catalog.dropTempView(n)
+        self._temp_views = []
+        self._ref_sers = []
+
+
+def _test() -> None:
+    import os
+    import doctest
+    import sys
+    from pyspark.sql import SparkSession
+    import pyspark.pandas.sql_formatter
+
+    os.chdir(os.environ["SPARK_HOME"])
+
+    globs = pyspark.pandas.sql_formatter.__dict__.copy()
+    globs["ps"] = pyspark.pandas
+    spark = (
+        SparkSession.builder.master("local[4]")
+        .appName("pyspark.pandas.sql_processor tests")
+        .getOrCreate()
+    )
+    (failure_count, test_count) = doctest.testmod(
+        pyspark.pandas.sql_formatter,
+        globs=globs,
+        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
+    )
+    spark.stop()
+    if failure_count:
+        sys.exit(-1)
+
+
+if __name__ == "__main__":
+    _test()
diff --git a/python/pyspark/pandas/sql_processor.py b/python/pyspark/pandas/sql_processor.py
index afdaa10..8126d1e 100644
--- a/python/pyspark/pandas/sql_processor.py
+++ b/python/pyspark/pandas/sql_processor.py
@@ -77,9 +77,13 @@ def sql(
 
             For example,
 
+            >>> from pyspark.pandas import sql_processor
+            >>> # we will call 'sql_processor' directly in doctests so decrease one level.
+            >>> sql_processor._CAPTURE_SCOPES = 2
+            >>> sql = sql_processor.sql
             >>> psdf = ps.DataFrame({"A": [1, 2, 3], "B":[4, 5, 6]}, index=['a', 'b', 'c'])
             >>> psdf_reset_index = psdf.reset_index()
-            >>> ps.sql("SELECT * FROM {psdf_reset_index}", index_col="index")
+            >>> sql("SELECT * FROM {psdf_reset_index}", index_col="index")
             ... # doctest: +NORMALIZE_WHITESPACE
                    A  B
             index
@@ -96,7 +100,7 @@ def sql(
             ...     ),
             ... )
             >>> psdf_reset_index = psdf.reset_index()
-            >>> ps.sql("SELECT * FROM {psdf_reset_index}", index_col=["index1", "index2"])
+            >>> sql("SELECT * FROM {psdf_reset_index}", index_col=["index1", "index2"])
             ... # doctest: +NORMALIZE_WHITESPACE
                            A  B
             index1 index2
@@ -122,7 +126,7 @@ def sql(
 
     Calling a built-in SQL function.
 
-    >>> ps.sql("select * from range(10) where id > 7")
+    >>> sql("select * from range(10) where id > 7")
        id
     0   8
     1   9
@@ -130,7 +134,7 @@ def sql(
     A query can also reference a local variable or parameter by wrapping them in curly braces:
 
     >>> bound1 = 7
-    >>> ps.sql("select * from range(10) where id > {bound1} and id < {bound2}", bound2=9)
+    >>> sql("select * from range(10) where id > {bound1} and id < {bound2}", bound2=9)
        id
     0   8
 
@@ -139,7 +143,7 @@ def sql(
 
     >>> mydf = ps.range(10)
     >>> x = range(4)
-    >>> ps.sql("SELECT * from {mydf} WHERE id IN {x}")
+    >>> sql("SELECT * from {mydf} WHERE id IN {x}")
        id
     0   0
     1   1
@@ -150,7 +154,7 @@ def sql(
 
     >>> def statement():
     ...     mydf2 = ps.DataFrame({"x": range(2)})
-    ...     return ps.sql("SELECT * from {mydf2}")
+    ...     return sql("SELECT * from {mydf2}")
     >>> statement()
        x
     0  0
@@ -159,7 +163,7 @@ def sql(
     Mixing pandas-on-Spark and pandas DataFrames in a join operation. Note that the index is
     dropped.
 
-    >>> ps.sql('''
+    >>> sql('''
     ...   SELECT m1.a, m2.b
     ...   FROM {table1} m1 INNER JOIN {table2} m2
     ...   ON m1.key = m2.key
@@ -174,7 +178,7 @@ def sql(
     Also, it is possible to query using Series.
 
     >>> myser = ps.Series({'a': [1.0, 2.0, 3.0], 'b': [15.0, 30.0, 45.0]})
-    >>> ps.sql("SELECT * from {myser}")
+    >>> sql("SELECT * from {myser}")
                         0
     0     [1.0, 2.0, 3.0]
     1  [15.0, 30.0, 45.0]
@@ -195,7 +199,7 @@ def sql(
     return SQLProcessor(_dict, query, default_session()).execute(index_col)
 
 
-_CAPTURE_SCOPES = 2
+_CAPTURE_SCOPES = 3
 
 
 def _get_local_scope() -> Dict[str, Any]:
@@ -272,19 +276,23 @@ class SQLProcessor(object):
         Returns a DataFrame for which the SQL statement has been executed by
         the underlying SQL engine.
 
+        >>> from pyspark.pandas import sql_processor
+        >>> # we will call 'sql_processor' directly in doctests so decrease one level.
+        >>> sql_processor._CAPTURE_SCOPES = 2
+        >>> sql = sql_processor.sql
         >>> str0 = 'abc'
-        >>> ps.sql("select {str0}")
+        >>> sql("select {str0}")
            abc
         0  abc
 
         >>> str1 = 'abc"abc'
         >>> str2 = "abc'abc"
-        >>> ps.sql("select {str0}, {str1}, {str2}")
+        >>> sql("select {str0}, {str1}, {str2}")
            abc  abc"abc  abc'abc
         0  abc  abc"abc  abc'abc
 
         >>> strs = ['a', 'b']
-        >>> ps.sql("select 'a' in {strs} as cond1, 'c' in {strs} as cond2")
+        >>> sql("select 'a' in {strs} as cond1, 'c' in {strs} as cond2")
            cond1  cond2
         0   True  False
         """
diff --git a/python/pyspark/pandas/tests/test_sql.py b/python/pyspark/pandas/tests/test_sql.py
index 306ea16..ca0dd99 100644
--- a/python/pyspark/pandas/tests/test_sql.py
+++ b/python/pyspark/pandas/tests/test_sql.py
@@ -23,20 +23,22 @@ from pyspark.testing.sqlutils import SQLTestUtils
 
 class SQLTest(PandasOnSparkTestCase, SQLTestUtils):
     def test_error_variable_not_exist(self):
-        msg = "The key variable_foo in the SQL statement was not found.*"
-        with self.assertRaisesRegex(ValueError, msg):
+        with self.assertRaisesRegex(KeyError, "variable_foo"):
             ps.sql("select * from {variable_foo}")
 
     def test_error_unsupported_type(self):
-        msg = "Unsupported variable type dict: {'a': 1}"
-        with self.assertRaisesRegex(ValueError, msg):
-            some_dict = {"a": 1}
+        with self.assertRaisesRegex(KeyError, "some_dict"):
             ps.sql("select * from {some_dict}")
 
     def test_error_bad_sql(self):
         with self.assertRaises(ParseException):
             ps.sql("this is not valid sql")
 
+    def test_series_not_referred(self):
+        psdf = ps.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
+        with self.assertRaisesRegex(ValueError, "The series in {ser}"):
+            ps.sql("SELECT {ser} FROM range(10)", ser=psdf.A)
+
     def test_sql_with_index_col(self):
         import pandas as pd
 
@@ -45,7 +47,11 @@ class SQLTest(PandasOnSparkTestCase, SQLTestUtils):
             {"A": [1, 2, 3], "B": [4, 5, 6]}, index=pd.Index(["a", "b", "c"], name="index")
         )
         psdf_reset_index = psdf.reset_index()
-        actual = ps.sql("select * from {psdf_reset_index} where A > 1", index_col="index")
+        actual = ps.sql(
+            "select * from {psdf_reset_index} where A > 1",
+            index_col="index",
+            psdf_reset_index=psdf_reset_index,
+        )
         expected = psdf.iloc[[1, 2]]
         self.assert_eq(actual, expected)
 
@@ -58,11 +64,40 @@ class SQLTest(PandasOnSparkTestCase, SQLTestUtils):
         )
         psdf_reset_index = psdf.reset_index()
         actual = ps.sql(
-            "select * from {psdf_reset_index} where A > 1", index_col=["index1", "index2"]
+            "select * from {psdf_reset_index} where A > 1",
+            index_col=["index1", "index2"],
+            psdf_reset_index=psdf_reset_index,
         )
         expected = psdf.iloc[[1, 2]]
         self.assert_eq(actual, expected)
 
+    def test_sql_with_pandas_objects(self):
+        import pandas as pd
+
+        pdf = pd.DataFrame({"a": [1, 2, 3, 4]})
+        self.assert_eq(ps.sql("SELECT {col} + 1 as a FROM {tbl}", col=pdf.a, tbl=pdf), pdf + 1)
+
+    def test_sql_with_python_objects(self):
+        self.assert_eq(
+            ps.sql("SELECT {col} as a FROM range(1)", col="lit"), ps.DataFrame({"a": ["lit"]})
+        )
+        self.assert_eq(
+            ps.sql("SELECT id FROM range(10) WHERE id IN {pred}", col="lit", pred=(1, 2, 3)),
+            ps.DataFrame({"id": [1, 2, 3]}),
+        )
+
+    def test_sql_with_pandas_on_spark_objects(self):
+        psdf = ps.DataFrame({"a": [1, 2, 3, 4]})
+
+        self.assert_eq(ps.sql("SELECT {col} FROM {tbl}", col=psdf.a, tbl=psdf), psdf)
+        self.assert_eq(ps.sql("SELECT {tbl.a} FROM {tbl}", tbl=psdf), psdf)
+
+        psdf = ps.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
+        self.assert_eq(
+            ps.sql("SELECT {col}, {col2} FROM {tbl}", col=psdf.A, col2=psdf.B, tbl=psdf), psdf
+        )
+        self.assert_eq(ps.sql("SELECT {tbl.A}, {tbl.B} FROM {tbl}", tbl=psdf), psdf)
+
 
 if __name__ == "__main__":
     import unittest
diff --git a/python/pyspark/pandas/usage_logging/__init__.py b/python/pyspark/pandas/usage_logging/__init__.py
index ebd23ac..b350faf 100644
--- a/python/pyspark/pandas/usage_logging/__init__.py
+++ b/python/pyspark/pandas/usage_logging/__init__.py
@@ -25,7 +25,7 @@ from typing import Union
 
 import pandas as pd
 
-from pyspark.pandas import config, namespace, sql_processor
+from pyspark.pandas import config, namespace, sql_formatter
 from pyspark.pandas.accessors import PandasOnSparkFrameMethods
 from pyspark.pandas.frame import DataFrame
 from pyspark.pandas.datetimes import DatetimeMethods
@@ -113,8 +113,8 @@ def attach(logger_module: Union[str, ModuleType]) -> None:
     except ImportError:
         pass
 
-    sql_processor._CAPTURE_SCOPES = 3
-    modules.append(sql_processor)
+    sql_formatter._CAPTURE_SCOPES = 4
+    modules.append(sql_formatter)
 
     # Modules
     for target_module in modules:

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org