You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/11/29 07:42:27 UTC
[spark] branch master updated: [SPARK-41312][CONNECT][PYTHON] Implement DataFrame.withColumnRenamed
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9f7420bc903 [SPARK-41312][CONNECT][PYTHON] Implement DataFrame.withColumnRenamed
9f7420bc903 is described below
commit 9f7420bc9038d921ad7652ad66b13bfbf9faef9a
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Tue Nov 29 15:42:11 2022 +0800
[SPARK-41312][CONNECT][PYTHON] Implement DataFrame.withColumnRenamed
### What changes were proposed in this pull request?
Implement DataFrame.withColumnRenamed by reusing existing Connect proto `RenameColumnsNameByName `.
### Why are the changes needed?
API coverage
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
UT
Closes #38831 from amaliujia/withColumnRenamed.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
python/pyspark/sql/connect/dataframe.py | 48 ++++++++++++++++++++++
python/pyspark/sql/connect/plan.py | 30 ++++++++++++++
.../sql/tests/connect/test_connect_basic.py | 15 +++++++
3 files changed, 93 insertions(+)
diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index 725c7fc90da..2d6e7352df9 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -570,6 +570,54 @@ class DataFrame(object):
session=self._session,
)
+ def withColumnRenamed(self, existing: str, new: str) -> "DataFrame":
+ """Returns a new :class:`DataFrame` by renaming an existing column.
+ This is a no-op if schema doesn't contain the given column name.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ existing : str
+ string, name of the existing column to rename.
+ new : str
+ string, new name of the column.
+
+ Returns
+ -------
+ :class:`DataFrame`
+ DataFrame with renamed column.
+ """
+ return self.withColumnsRenamed({existing: new})
+
+ def withColumnsRenamed(self, colsMap: Dict[str, str]) -> "DataFrame":
+ """
+ Returns a new :class:`DataFrame` by renaming multiple columns.
+ This is a no-op if schema doesn't contain the given column names.
+
+ .. versionadded:: 3.4.0
+ Added support for multiple columns renaming
+
+ Parameters
+ ----------
+ colsMap : dict
+ a dict of existing column names and corresponding desired column names.
+ Currently, only single map is supported.
+
+ Returns
+ -------
+ :class:`DataFrame`
+ DataFrame with renamed columns.
+
+ See Also
+ --------
+ :meth:`withColumnRenamed`
+ """
+ if not isinstance(colsMap, dict):
+ raise TypeError("colsMap must be dict of existing column name and new column name.")
+
+ return DataFrame.withPlan(plan.RenameColumnsNameByName(self._plan, colsMap), self._session)
+
def _show_string(
self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False
) -> str:
diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py
index 19889cb9eb8..48eb69ffdc2 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -938,6 +938,36 @@ class Range(LogicalPlan):
"""
+class RenameColumnsNameByName(LogicalPlan):
+ def __init__(self, child: Optional["LogicalPlan"], colsMap: Mapping[str, str]) -> None:
+ super().__init__(child)
+ self._colsMap = colsMap
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ assert self._child is not None
+
+ plan = proto.Relation()
+ plan.rename_columns_by_name_to_name_map.input.CopyFrom(self._child.plan(session))
+ for k, v in self._colsMap.items():
+ plan.rename_columns_by_name_to_name_map.rename_columns_map[k] = v
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ i = " " * indent
+ return f"""{i}<RenameColumnsNameByName ColsMap='{self._colsMap}'>"""
+
+ def _repr_html_(self) -> str:
+ return f"""
+ <ul>
+ <li>
+ <b>RenameColumns</b><br />
+ ColsMap: {self._colsMap} <br />
+ {self._child_repr_()}
+ </li>
+ </ul>
+ """
+
+
class NAFill(LogicalPlan):
def __init__(
self, child: Optional["LogicalPlan"], cols: Optional[List[str]], values: List[Any]
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py
index eb025fd5d04..3f673edb75f 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -123,6 +123,21 @@ class SparkConnectTests(SparkConnectSQLTestCase):
self.assertTrue("name" in data[0])
self.assertTrue("id" in data[0])
+ def test_with_columns_renamed(self):
+ # SPARK-41312: test DataFrame.withColumnsRenamed()
+ self.assertEqual(
+ self.connect.read.table(self.tbl_name).withColumnRenamed("id", "id_new").schema,
+ self.spark.read.table(self.tbl_name).withColumnRenamed("id", "id_new").schema,
+ )
+ self.assertEqual(
+ self.connect.read.table(self.tbl_name)
+ .withColumnsRenamed({"id": "id_new", "name": "name_new"})
+ .schema,
+ self.spark.read.table(self.tbl_name)
+ .withColumnsRenamed({"id": "id_new", "name": "name_new"})
+ .schema,
+ )
+
def test_simple_udf(self):
def conv_udf(x) -> str:
return "Martin"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org