You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/11/09 09:36:03 UTC

[spark] branch master updated: [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 736be3116c7 [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client
736be3116c7 is described below

commit 736be3116c7c13c82eac91f426ee6e96753c9cf5
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Wed Nov 9 17:35:43 2022 +0800

    [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client
    
    ### What changes were proposed in this pull request?
    
    Following up https://github.com/apache/spark/pull/38529, with `Reparitition` proto we can support `Coalesce` and `Repartition` API in Python client.
    
    ### Why are the changes needed?
    
    Improve API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    UT
    
    Closes #38549 from amaliujia/support_coalesce_in_python.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 python/pyspark/sql/connect/dataframe.py            | 49 +++++++++++++++++++++-
 python/pyspark/sql/connect/plan.py                 | 34 +++++++++++++++
 .../sql/tests/connect/test_connect_plan_only.py    | 17 ++++++++
 3 files changed, 98 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index 64b2e54f0ef..c6877707ad2 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -156,8 +156,53 @@ class DataFrame(object):
     def crossJoin(self, other: "DataFrame") -> "DataFrame":
         ...
 
-    def coalesce(self, num_partitions: int) -> "DataFrame":
-        ...
+    def coalesce(self, numPartitions: int) -> "DataFrame":
+        """
+        Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions.
+
+        Coalesce does not trigger a shuffle.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        numPartitions : int
+            specify the target number of partitions
+
+        Returns
+        -------
+        :class:`DataFrame`
+        """
+        if not numPartitions > 0:
+            raise ValueError("numPartitions must be positive.")
+        return DataFrame.withPlan(
+            plan.Repartition(self._plan, num_partitions=numPartitions, shuffle=False),
+            self._session,
+        )
+
+    def repartition(self, numPartitions: int) -> "DataFrame":
+        """
+        Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions.
+
+        Repartition will shuffle source partition into partitions specified by numPartitions.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        numPartitions : int
+            specify the target number of partitions
+
+        Returns
+        -------
+        :class:`DataFrame`
+        """
+        if not numPartitions > 0:
+            raise ValueError("numPartitions must be positive.")
+        return DataFrame.withPlan(
+            plan.Repartition(self._plan, num_partitions=numPartitions, shuffle=True),
+            self._session,
+        )
 
     def describe(self, cols: List[ColumnRef]) -> Any:
         ...
diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py
index 1d5c80f510e..3bb5558d04b 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -652,6 +652,40 @@ class UnionAll(LogicalPlan):
         """
 
 
+class Repartition(LogicalPlan):
+    """Repartition Relation into a different number of partitions."""
+
+    def __init__(self, child: Optional["LogicalPlan"], num_partitions: int, shuffle: bool) -> None:
+        super().__init__(child)
+        self._num_partitions = num_partitions
+        self._shuffle = shuffle
+
+    def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
+        rel = proto.Relation()
+        if self._child is not None:
+            rel.repartition.input.CopyFrom(self._child.plan(session))
+        rel.repartition.shuffle = self._shuffle
+        rel.repartition.num_partitions = self._num_partitions
+        return rel
+
+    def print(self, indent: int = 0) -> str:
+        plan_name = "repartition" if self._shuffle else "coalesce"
+        c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child else ""
+        return f"{' ' * indent}<{plan_name} num_partitions={self._num_partitions}>\n{c_buf}"
+
+    def _repr_html_(self) -> str:
+        plan_name = "repartition" if self._shuffle else "coalesce"
+        return f"""
+        <ul>
+           <li>
+              <b>{plan_name}</b><br />
+              Child: {self._child_repr_()}
+              num_partitions: {self._num_partitions}
+           </li>
+        </ul>
+        """
+
+
 class SubqueryAlias(LogicalPlan):
     """Alias for a relation."""
 
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
index 468099cb5c9..6807a13a8c9 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
@@ -215,6 +215,23 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
         plan3 = df1.unionByName(df2, True)._plan.to_proto(self.connect)
         self.assertTrue(plan3.root.set_op.by_name)
 
+    def test_coalesce_and_repartition(self):
+        # SPARK-41026: test Coalesce and Repartition API in Python client.
+        df = self.connect.readTable(table_name=self.tbl_name)
+        plan1 = df.coalesce(10)._plan.to_proto(self.connect)
+        self.assertEqual(10, plan1.root.repartition.num_partitions)
+        self.assertFalse(plan1.root.repartition.shuffle)
+        plan2 = df.repartition(20)._plan.to_proto(self.connect)
+        self.assertTrue(plan2.root.repartition.shuffle)
+
+        with self.assertRaises(ValueError) as context:
+            df.coalesce(-1)._plan.to_proto(self.connect)
+        self.assertTrue("numPartitions must be positive" in str(context.exception))
+
+        with self.assertRaises(ValueError) as context:
+            df.repartition(-1)._plan.to_proto(self.connect)
+        self.assertTrue("numPartitions must be positive" in str(context.exception))
+
 
 if __name__ == "__main__":
     from pyspark.sql.tests.connect.test_connect_plan_only import *  # noqa: F401


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org