You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/10/07 22:28:42 UTC
[spark] branch master updated: [SPARK-45436][PYTHON][CONNECT] DataFrame methods check same session
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e43ef424feae [SPARK-45436][PYTHON][CONNECT] DataFrame methods check same session
e43ef424feae is described below
commit e43ef424feaec1cfc4b8604a27a417ea646159c5
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Sat Oct 7 15:28:32 2023 -0700
[SPARK-45436][PYTHON][CONNECT] DataFrame methods check same session
### What changes were proposed in this pull request?
1, rename the helper function to hide from auto-completion
2, check same session for methods which deal with multiple dataframes
### Why are the changes needed?
we need to make sure same session is used when process multiple dataframes
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #43248 from zhengruifeng/py_hide_checksamesession.
Authored-by: Ruifeng Zheng <ru...@apache.org>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
python/pyspark/sql/connect/dataframe.py | 16 +++++++++++-----
python/pyspark/sql/connect/group.py | 1 +
2 files changed, 12 insertions(+), 5 deletions(-)
diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index 5197a0db9688..2c0a75fad469 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -259,7 +259,7 @@ class DataFrame:
count.__doc__ = PySparkDataFrame.count.__doc__
def crossJoin(self, other: "DataFrame") -> "DataFrame":
- self.checkSameSparkSession(other)
+ self._check_same_session(other)
return DataFrame.withPlan(
plan.Join(left=self._plan, right=other._plan, on=None, how="cross"),
session=self._session,
@@ -267,7 +267,7 @@ class DataFrame:
crossJoin.__doc__ = PySparkDataFrame.crossJoin.__doc__
- def checkSameSparkSession(self, other: "DataFrame") -> None:
+ def _check_same_session(self, other: "DataFrame") -> None:
if self._session.session_id != other._session.session_id:
raise SessionNotSameException(
error_class="SESSION_NOT_SAME",
@@ -578,9 +578,9 @@ class DataFrame:
on: Optional[Union[str, List[str], Column, List[Column]]] = None,
how: Optional[str] = None,
) -> "DataFrame":
+ self._check_same_session(other)
if how is not None and isinstance(how, str):
how = how.lower().replace("_", "")
- self.checkSameSparkSession(other)
return DataFrame.withPlan(
plan.Join(left=self._plan, right=other._plan, on=on, how=how),
session=self._session,
@@ -600,6 +600,7 @@ class DataFrame:
allowExactMatches: bool = True,
direction: str = "backward",
) -> "DataFrame":
+ self._check_same_session(other)
if how is None:
how = "inner"
assert isinstance(how, str), "how should be a string"
@@ -1089,6 +1090,7 @@ class DataFrame:
show.__doc__ = PySparkDataFrame.show.__doc__
def union(self, other: "DataFrame") -> "DataFrame":
+ self._check_same_session(other)
return self.unionAll(other)
union.__doc__ = PySparkDataFrame.union.__doc__
@@ -1099,7 +1101,7 @@ class DataFrame:
error_class="MISSING_VALID_PLAN",
message_parameters={"operator": "Union"},
)
- self.checkSameSparkSession(other)
+ self._check_same_session(other)
return DataFrame.withPlan(
plan.SetOperation(self._plan, other._plan, "union", is_all=True), session=self._session
)
@@ -1107,7 +1109,7 @@ class DataFrame:
unionAll.__doc__ = PySparkDataFrame.unionAll.__doc__
def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> "DataFrame":
- self.checkSameSparkSession(other)
+ self._check_same_session(other)
return DataFrame.withPlan(
plan.SetOperation(
self._plan,
@@ -1122,6 +1124,7 @@ class DataFrame:
unionByName.__doc__ = PySparkDataFrame.unionByName.__doc__
def subtract(self, other: "DataFrame") -> "DataFrame":
+ self._check_same_session(other)
return DataFrame.withPlan(
plan.SetOperation(self._plan, other._plan, "except", is_all=False),
session=self._session,
@@ -1130,6 +1133,7 @@ class DataFrame:
subtract.__doc__ = PySparkDataFrame.subtract.__doc__
def exceptAll(self, other: "DataFrame") -> "DataFrame":
+ self._check_same_session(other)
return DataFrame.withPlan(
plan.SetOperation(self._plan, other._plan, "except", is_all=True), session=self._session
)
@@ -1137,6 +1141,7 @@ class DataFrame:
exceptAll.__doc__ = PySparkDataFrame.exceptAll.__doc__
def intersect(self, other: "DataFrame") -> "DataFrame":
+ self._check_same_session(other)
return DataFrame.withPlan(
plan.SetOperation(self._plan, other._plan, "intersect", is_all=False),
session=self._session,
@@ -1145,6 +1150,7 @@ class DataFrame:
intersect.__doc__ = PySparkDataFrame.intersect.__doc__
def intersectAll(self, other: "DataFrame") -> "DataFrame":
+ self._check_same_session(other)
return DataFrame.withPlan(
plan.SetOperation(self._plan, other._plan, "intersect", is_all=True),
session=self._session,
diff --git a/python/pyspark/sql/connect/group.py b/python/pyspark/sql/connect/group.py
index a393d2cb37e8..9b7dce360b2a 100644
--- a/python/pyspark/sql/connect/group.py
+++ b/python/pyspark/sql/connect/group.py
@@ -348,6 +348,7 @@ GroupedData.__doc__ = PySparkGroupedData.__doc__
class PandasCogroupedOps:
def __init__(self, gd1: "GroupedData", gd2: "GroupedData"):
+ gd1._df._check_same_session(gd2._df)
self._gd1 = gd1
self._gd2 = gd2
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org