You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/11/29 07:42:27 UTC

[spark] branch master updated: [SPARK-41312][CONNECT][PYTHON] Implement DataFrame.withColumnRenamed

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f7420bc903 [SPARK-41312][CONNECT][PYTHON] Implement DataFrame.withColumnRenamed
9f7420bc903 is described below

commit 9f7420bc9038d921ad7652ad66b13bfbf9faef9a
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Tue Nov 29 15:42:11 2022 +0800

    [SPARK-41312][CONNECT][PYTHON] Implement DataFrame.withColumnRenamed
    
    ### What changes were proposed in this pull request?
    
    Implement DataFrame.withColumnRenamed by reusing existing Connect proto `RenameColumnsNameByName `.
    
    ### Why are the changes needed?
    
    API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    UT
    
    Closes #38831 from amaliujia/withColumnRenamed.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 python/pyspark/sql/connect/dataframe.py            | 48 ++++++++++++++++++++++
 python/pyspark/sql/connect/plan.py                 | 30 ++++++++++++++
 .../sql/tests/connect/test_connect_basic.py        | 15 +++++++
 3 files changed, 93 insertions(+)

diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index 725c7fc90da..2d6e7352df9 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -570,6 +570,54 @@ class DataFrame(object):
             session=self._session,
         )
 
+    def withColumnRenamed(self, existing: str, new: str) -> "DataFrame":
+        """Returns a new :class:`DataFrame` by renaming an existing column.
+        This is a no-op if schema doesn't contain the given column name.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        existing : str
+            string, name of the existing column to rename.
+        new : str
+            string, new name of the column.
+
+        Returns
+        -------
+        :class:`DataFrame`
+            DataFrame with renamed column.
+        """
+        return self.withColumnsRenamed({existing: new})
+
+    def withColumnsRenamed(self, colsMap: Dict[str, str]) -> "DataFrame":
+        """
+        Returns a new :class:`DataFrame` by renaming multiple columns.
+        This is a no-op if schema doesn't contain the given column names.
+
+        .. versionadded:: 3.4.0
+           Added support for multiple columns renaming
+
+        Parameters
+        ----------
+        colsMap : dict
+            a dict of existing column names and corresponding desired column names.
+            Currently, only single map is supported.
+
+        Returns
+        -------
+        :class:`DataFrame`
+            DataFrame with renamed columns.
+
+        See Also
+        --------
+        :meth:`withColumnRenamed`
+        """
+        if not isinstance(colsMap, dict):
+            raise TypeError("colsMap must be dict of existing column name and new column name.")
+
+        return DataFrame.withPlan(plan.RenameColumnsNameByName(self._plan, colsMap), self._session)
+
     def _show_string(
         self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False
     ) -> str:
diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py
index 19889cb9eb8..48eb69ffdc2 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -938,6 +938,36 @@ class Range(LogicalPlan):
         """
 
 
+class RenameColumnsNameByName(LogicalPlan):
+    def __init__(self, child: Optional["LogicalPlan"], colsMap: Mapping[str, str]) -> None:
+        super().__init__(child)
+        self._colsMap = colsMap
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        assert self._child is not None
+
+        plan = proto.Relation()
+        plan.rename_columns_by_name_to_name_map.input.CopyFrom(self._child.plan(session))
+        for k, v in self._colsMap.items():
+            plan.rename_columns_by_name_to_name_map.rename_columns_map[k] = v
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"""{i}<RenameColumnsNameByName ColsMap='{self._colsMap}'>"""
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>RenameColumns</b><br />
+              ColsMap: {self._colsMap} <br />
+              {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
 class NAFill(LogicalPlan):
     def __init__(
         self, child: Optional["LogicalPlan"], cols: Optional[List[str]], values: List[Any]
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py
index eb025fd5d04..3f673edb75f 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -123,6 +123,21 @@ class SparkConnectTests(SparkConnectSQLTestCase):
         self.assertTrue("name" in data[0])
         self.assertTrue("id" in data[0])
 
+    def test_with_columns_renamed(self):
+        # SPARK-41312: test DataFrame.withColumnsRenamed()
+        self.assertEqual(
+            self.connect.read.table(self.tbl_name).withColumnRenamed("id", "id_new").schema,
+            self.spark.read.table(self.tbl_name).withColumnRenamed("id", "id_new").schema,
+        )
+        self.assertEqual(
+            self.connect.read.table(self.tbl_name)
+            .withColumnsRenamed({"id": "id_new", "name": "name_new"})
+            .schema,
+            self.spark.read.table(self.tbl_name)
+            .withColumnsRenamed({"id": "id_new", "name": "name_new"})
+            .schema,
+        )
+
     def test_simple_udf(self):
         def conv_udf(x) -> str:
             return "Martin"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org