You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/11/30 06:34:22 UTC
[spark] branch master updated: [SPARK-41331][CONNECT][PYTHON] Add `orderBy` and `drop_duplicates`
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 92847d98810 [SPARK-41331][CONNECT][PYTHON] Add `orderBy` and `drop_duplicates`
92847d98810 is described below
commit 92847d98810280c9ddebea2e12a5e4945601f809
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Wed Nov 30 14:34:02 2022 +0800
[SPARK-41331][CONNECT][PYTHON] Add `orderBy` and `drop_duplicates`
### What changes were proposed in this pull request?
Add `orderBy` and `drop_duplicates`
### Why are the changes needed?
For API coverage
### Does this PR introduce _any_ user-facing change?
yes, new api
### How was this patch tested?
added test cases, since they are only alias, I just test them in plan-only
Closes #38846 from zhengruifeng/connect_df_orderby_dropduplicate.
Authored-by: Ruifeng Zheng <ru...@apache.org>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
python/pyspark/sql/connect/dataframe.py | 4 ++++
python/pyspark/sql/tests/connect/test_connect_plan_only.py | 14 ++++++++++++++
2 files changed, 18 insertions(+)
diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index ef70336e31d..034f410b1ad 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -299,6 +299,8 @@ class DataFrame(object):
plan.Deduplicate(child=self._plan, column_names=subset), session=self._session
)
+ drop_duplicates = dropDuplicates
+
def distinct(self) -> "DataFrame":
"""Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`.
@@ -513,6 +515,8 @@ class DataFrame(object):
plan.Sort(self._plan, columns=list(cols), is_global=True), session=self._session
)
+ orderBy = sort
+
def sortWithinPartitions(self, *cols: "ColumnOrName") -> "DataFrame":
"""Sort within each partition by a specific column"""
return DataFrame.withPlan(
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
index ed4b688cef2..337917335e3 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
@@ -193,6 +193,16 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
)
self.assertEqual(plan.root.sort.is_global, True)
+ plan = df.filter(df.col_name > 3).orderBy("col_a", "col_b")._plan.to_proto(self.connect)
+ self.assertEqual(
+ [
+ f.expression.unresolved_attribute.unparsed_identifier
+ for f in plan.root.sort.sort_fields
+ ],
+ ["col_a", "col_b"],
+ )
+ self.assertEqual(plan.root.sort.is_global, True)
+
plan = (
df.filter(df.col_name > 3)
.sortWithinPartitions("col_a", "col_b")
@@ -236,6 +246,10 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
self.assertEqual(deduplicate_on_all_columns_plan.root.deduplicate.all_columns_as_keys, True)
self.assertEqual(len(deduplicate_on_all_columns_plan.root.deduplicate.column_names), 0)
+ deduplicate_on_all_columns_plan = df.drop_duplicates()._plan.to_proto(self.connect)
+ self.assertEqual(deduplicate_on_all_columns_plan.root.deduplicate.all_columns_as_keys, True)
+ self.assertEqual(len(deduplicate_on_all_columns_plan.root.deduplicate.column_names), 0)
+
deduplicate_on_subset_columns_plan = df.dropDuplicates(["name", "height"])._plan.to_proto(
self.connect
)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org