You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/11/09 09:36:03 UTC
[spark] branch master updated: [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 736be3116c7 [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client
736be3116c7 is described below
commit 736be3116c7c13c82eac91f426ee6e96753c9cf5
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Wed Nov 9 17:35:43 2022 +0800
[SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client
### What changes were proposed in this pull request?
Following up https://github.com/apache/spark/pull/38529, with `Reparitition` proto we can support `Coalesce` and `Repartition` API in Python client.
### Why are the changes needed?
Improve API coverage
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
UT
Closes #38549 from amaliujia/support_coalesce_in_python.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
python/pyspark/sql/connect/dataframe.py | 49 +++++++++++++++++++++-
python/pyspark/sql/connect/plan.py | 34 +++++++++++++++
.../sql/tests/connect/test_connect_plan_only.py | 17 ++++++++
3 files changed, 98 insertions(+), 2 deletions(-)
diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index 64b2e54f0ef..c6877707ad2 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -156,8 +156,53 @@ class DataFrame(object):
def crossJoin(self, other: "DataFrame") -> "DataFrame":
...
- def coalesce(self, num_partitions: int) -> "DataFrame":
- ...
+ def coalesce(self, numPartitions: int) -> "DataFrame":
+ """
+ Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions.
+
+ Coalesce does not trigger a shuffle.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ numPartitions : int
+ specify the target number of partitions
+
+ Returns
+ -------
+ :class:`DataFrame`
+ """
+ if not numPartitions > 0:
+ raise ValueError("numPartitions must be positive.")
+ return DataFrame.withPlan(
+ plan.Repartition(self._plan, num_partitions=numPartitions, shuffle=False),
+ self._session,
+ )
+
+ def repartition(self, numPartitions: int) -> "DataFrame":
+ """
+ Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions.
+
+ Repartition will shuffle source partition into partitions specified by numPartitions.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ numPartitions : int
+ specify the target number of partitions
+
+ Returns
+ -------
+ :class:`DataFrame`
+ """
+ if not numPartitions > 0:
+ raise ValueError("numPartitions must be positive.")
+ return DataFrame.withPlan(
+ plan.Repartition(self._plan, num_partitions=numPartitions, shuffle=True),
+ self._session,
+ )
def describe(self, cols: List[ColumnRef]) -> Any:
...
diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py
index 1d5c80f510e..3bb5558d04b 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -652,6 +652,40 @@ class UnionAll(LogicalPlan):
"""
+class Repartition(LogicalPlan):
+ """Repartition Relation into a different number of partitions."""
+
+ def __init__(self, child: Optional["LogicalPlan"], num_partitions: int, shuffle: bool) -> None:
+ super().__init__(child)
+ self._num_partitions = num_partitions
+ self._shuffle = shuffle
+
+ def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
+ rel = proto.Relation()
+ if self._child is not None:
+ rel.repartition.input.CopyFrom(self._child.plan(session))
+ rel.repartition.shuffle = self._shuffle
+ rel.repartition.num_partitions = self._num_partitions
+ return rel
+
+ def print(self, indent: int = 0) -> str:
+ plan_name = "repartition" if self._shuffle else "coalesce"
+ c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child else ""
+ return f"{' ' * indent}<{plan_name} num_partitions={self._num_partitions}>\n{c_buf}"
+
+ def _repr_html_(self) -> str:
+ plan_name = "repartition" if self._shuffle else "coalesce"
+ return f"""
+ <ul>
+ <li>
+ <b>{plan_name}</b><br />
+ Child: {self._child_repr_()}
+ num_partitions: {self._num_partitions}
+ </li>
+ </ul>
+ """
+
+
class SubqueryAlias(LogicalPlan):
"""Alias for a relation."""
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
index 468099cb5c9..6807a13a8c9 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
@@ -215,6 +215,23 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
plan3 = df1.unionByName(df2, True)._plan.to_proto(self.connect)
self.assertTrue(plan3.root.set_op.by_name)
+ def test_coalesce_and_repartition(self):
+ # SPARK-41026: test Coalesce and Repartition API in Python client.
+ df = self.connect.readTable(table_name=self.tbl_name)
+ plan1 = df.coalesce(10)._plan.to_proto(self.connect)
+ self.assertEqual(10, plan1.root.repartition.num_partitions)
+ self.assertFalse(plan1.root.repartition.shuffle)
+ plan2 = df.repartition(20)._plan.to_proto(self.connect)
+ self.assertTrue(plan2.root.repartition.shuffle)
+
+ with self.assertRaises(ValueError) as context:
+ df.coalesce(-1)._plan.to_proto(self.connect)
+ self.assertTrue("numPartitions must be positive" in str(context.exception))
+
+ with self.assertRaises(ValueError) as context:
+ df.repartition(-1)._plan.to_proto(self.connect)
+ self.assertTrue("numPartitions must be positive" in str(context.exception))
+
if __name__ == "__main__":
from pyspark.sql.tests.connect.test_connect_plan_only import * # noqa: F401
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org