You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/02/11 10:23:20 UTC
spark git commit: [SPARK-22624][PYSPARK] Expose range partitioning
shuffle introduced by spark-22614
Repository: spark
Updated Branches:
refs/heads/master 8acb51f08 -> eacb62fbb
[SPARK-22624][PYSPARK] Expose range partitioning shuffle introduced by spark-22614
## What changes were proposed in this pull request?
Expose range partitioning shuffle introduced by spark-22614
## How was this patch tested?
Unit test in dataframe.py
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: xubo245 <60...@qq.com>
Closes #20456 from xubo245/SPARK22624_PysparkRangePartition.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eacb62fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eacb62fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eacb62fb
Branch: refs/heads/master
Commit: eacb62fbbed317fd0e972102838af231385d54d8
Parents: 8acb51f
Author: xubo245 <60...@qq.com>
Authored: Sun Feb 11 19:23:15 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Sun Feb 11 19:23:15 2018 +0900
----------------------------------------------------------------------
python/pyspark/sql/dataframe.py | 45 ++++++++++++++++++++++++++++++++++++
python/pyspark/sql/tests.py | 28 ++++++++++++++++++++++
2 files changed, 73 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/eacb62fb/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index faee870..5cc8b63 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -667,6 +667,51 @@ class DataFrame(object):
else:
raise TypeError("numPartitions should be an int or Column")
+ @since("2.4.0")
+ def repartitionByRange(self, numPartitions, *cols):
+ """
+ Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The
+ resulting DataFrame is range partitioned.
+
+ ``numPartitions`` can be an int to specify the target number of partitions or a Column.
+ If it is a Column, it will be used as the first partitioning column. If not specified,
+ the default number of partitions is used.
+
+ At least one partition-by expression must be specified.
+ When no explicit sort order is specified, "ascending nulls first" is assumed.
+
+ >>> df.repartitionByRange(2, "age").rdd.getNumPartitions()
+ 2
+ >>> df.show()
+ +---+-----+
+ |age| name|
+ +---+-----+
+ | 2|Alice|
+ | 5| Bob|
+ +---+-----+
+ >>> df.repartitionByRange(1, "age").rdd.getNumPartitions()
+ 1
+ >>> data = df.repartitionByRange("age")
+ >>> df.show()
+ +---+-----+
+ |age| name|
+ +---+-----+
+ | 2|Alice|
+ | 5| Bob|
+ +---+-----+
+ """
+ if isinstance(numPartitions, int):
+ if len(cols) == 0:
+ return ValueError("At least one partition-by expression must be specified.")
+ else:
+ return DataFrame(
+ self._jdf.repartitionByRange(numPartitions, self._jcols(*cols)), self.sql_ctx)
+ elif isinstance(numPartitions, (basestring, Column)):
+ cols = (numPartitions,) + cols
+ return DataFrame(self._jdf.repartitionByRange(self._jcols(*cols)), self.sql_ctx)
+ else:
+ raise TypeError("numPartitions should be an int, string or Column")
+
@since(1.3)
def distinct(self):
"""Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`.
http://git-wip-us.apache.org/repos/asf/spark/blob/eacb62fb/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 4bc59fd..fe89bd0 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -2148,6 +2148,34 @@ class SQLTests(ReusedSQLTestCase):
result = df.select(functions.expr("length(a)")).collect()[0].asDict()
self.assertEqual(13, result["length(a)"])
+ def test_repartitionByRange_dataframe(self):
+ schema = StructType([
+ StructField("name", StringType(), True),
+ StructField("age", IntegerType(), True),
+ StructField("height", DoubleType(), True)])
+
+ df1 = self.spark.createDataFrame(
+ [(u'Bob', 27, 66.0), (u'Alice', 10, 10.0), (u'Bob', 10, 66.0)], schema)
+ df2 = self.spark.createDataFrame(
+ [(u'Alice', 10, 10.0), (u'Bob', 10, 66.0), (u'Bob', 27, 66.0)], schema)
+
+ # test repartitionByRange(numPartitions, *cols)
+ df3 = df1.repartitionByRange(2, "name", "age")
+ self.assertEqual(df3.rdd.getNumPartitions(), 2)
+ self.assertEqual(df3.rdd.first(), df2.rdd.first())
+ self.assertEqual(df3.rdd.take(3), df2.rdd.take(3))
+
+ # test repartitionByRange(numPartitions, *cols)
+ df4 = df1.repartitionByRange(3, "name", "age")
+ self.assertEqual(df4.rdd.getNumPartitions(), 3)
+ self.assertEqual(df4.rdd.first(), df2.rdd.first())
+ self.assertEqual(df4.rdd.take(3), df2.rdd.take(3))
+
+ # test repartitionByRange(*cols)
+ df5 = df1.repartitionByRange("name", "age")
+ self.assertEqual(df5.rdd.first(), df2.rdd.first())
+ self.assertEqual(df5.rdd.take(3), df2.rdd.take(3))
+
def test_replace(self):
schema = StructType([
StructField("name", StringType(), True),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org