You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/10/07 22:28:42 UTC

[spark] branch master updated: [SPARK-45436][PYTHON][CONNECT] DataFrame methods check same session

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e43ef424feae [SPARK-45436][PYTHON][CONNECT] DataFrame methods check same session
e43ef424feae is described below

commit e43ef424feaec1cfc4b8604a27a417ea646159c5
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Sat Oct 7 15:28:32 2023 -0700

    [SPARK-45436][PYTHON][CONNECT] DataFrame methods check same session
    
    ### What changes were proposed in this pull request?
    1, rename the helper function to hide from auto-completion
    2, check same session for methods which deal with multiple dataframes
    
    ### Why are the changes needed?
    we need to make sure same session is used when process multiple dataframes
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #43248 from zhengruifeng/py_hide_checksamesession.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 python/pyspark/sql/connect/dataframe.py | 16 +++++++++++-----
 python/pyspark/sql/connect/group.py     |  1 +
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index 5197a0db9688..2c0a75fad469 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -259,7 +259,7 @@ class DataFrame:
     count.__doc__ = PySparkDataFrame.count.__doc__
 
     def crossJoin(self, other: "DataFrame") -> "DataFrame":
-        self.checkSameSparkSession(other)
+        self._check_same_session(other)
         return DataFrame.withPlan(
             plan.Join(left=self._plan, right=other._plan, on=None, how="cross"),
             session=self._session,
@@ -267,7 +267,7 @@ class DataFrame:
 
     crossJoin.__doc__ = PySparkDataFrame.crossJoin.__doc__
 
-    def checkSameSparkSession(self, other: "DataFrame") -> None:
+    def _check_same_session(self, other: "DataFrame") -> None:
         if self._session.session_id != other._session.session_id:
             raise SessionNotSameException(
                 error_class="SESSION_NOT_SAME",
@@ -578,9 +578,9 @@ class DataFrame:
         on: Optional[Union[str, List[str], Column, List[Column]]] = None,
         how: Optional[str] = None,
     ) -> "DataFrame":
+        self._check_same_session(other)
         if how is not None and isinstance(how, str):
             how = how.lower().replace("_", "")
-        self.checkSameSparkSession(other)
         return DataFrame.withPlan(
             plan.Join(left=self._plan, right=other._plan, on=on, how=how),
             session=self._session,
@@ -600,6 +600,7 @@ class DataFrame:
         allowExactMatches: bool = True,
         direction: str = "backward",
     ) -> "DataFrame":
+        self._check_same_session(other)
         if how is None:
             how = "inner"
         assert isinstance(how, str), "how should be a string"
@@ -1089,6 +1090,7 @@ class DataFrame:
     show.__doc__ = PySparkDataFrame.show.__doc__
 
     def union(self, other: "DataFrame") -> "DataFrame":
+        self._check_same_session(other)
         return self.unionAll(other)
 
     union.__doc__ = PySparkDataFrame.union.__doc__
@@ -1099,7 +1101,7 @@ class DataFrame:
                 error_class="MISSING_VALID_PLAN",
                 message_parameters={"operator": "Union"},
             )
-        self.checkSameSparkSession(other)
+        self._check_same_session(other)
         return DataFrame.withPlan(
             plan.SetOperation(self._plan, other._plan, "union", is_all=True), session=self._session
         )
@@ -1107,7 +1109,7 @@ class DataFrame:
     unionAll.__doc__ = PySparkDataFrame.unionAll.__doc__
 
     def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> "DataFrame":
-        self.checkSameSparkSession(other)
+        self._check_same_session(other)
         return DataFrame.withPlan(
             plan.SetOperation(
                 self._plan,
@@ -1122,6 +1124,7 @@ class DataFrame:
     unionByName.__doc__ = PySparkDataFrame.unionByName.__doc__
 
     def subtract(self, other: "DataFrame") -> "DataFrame":
+        self._check_same_session(other)
         return DataFrame.withPlan(
             plan.SetOperation(self._plan, other._plan, "except", is_all=False),
             session=self._session,
@@ -1130,6 +1133,7 @@ class DataFrame:
     subtract.__doc__ = PySparkDataFrame.subtract.__doc__
 
     def exceptAll(self, other: "DataFrame") -> "DataFrame":
+        self._check_same_session(other)
         return DataFrame.withPlan(
             plan.SetOperation(self._plan, other._plan, "except", is_all=True), session=self._session
         )
@@ -1137,6 +1141,7 @@ class DataFrame:
     exceptAll.__doc__ = PySparkDataFrame.exceptAll.__doc__
 
     def intersect(self, other: "DataFrame") -> "DataFrame":
+        self._check_same_session(other)
         return DataFrame.withPlan(
             plan.SetOperation(self._plan, other._plan, "intersect", is_all=False),
             session=self._session,
@@ -1145,6 +1150,7 @@ class DataFrame:
     intersect.__doc__ = PySparkDataFrame.intersect.__doc__
 
     def intersectAll(self, other: "DataFrame") -> "DataFrame":
+        self._check_same_session(other)
         return DataFrame.withPlan(
             plan.SetOperation(self._plan, other._plan, "intersect", is_all=True),
             session=self._session,
diff --git a/python/pyspark/sql/connect/group.py b/python/pyspark/sql/connect/group.py
index a393d2cb37e8..9b7dce360b2a 100644
--- a/python/pyspark/sql/connect/group.py
+++ b/python/pyspark/sql/connect/group.py
@@ -348,6 +348,7 @@ GroupedData.__doc__ = PySparkGroupedData.__doc__
 
 class PandasCogroupedOps:
     def __init__(self, gd1: "GroupedData", gd2: "GroupedData"):
+        gd1._df._check_same_session(gd2._df)
         self._gd1 = gd1
         self._gd2 = gd2
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org