You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/02/11 10:23:20 UTC

spark git commit: [SPARK-22624][PYSPARK] Expose range partitioning shuffle introduced by spark-22614

Repository: spark
Updated Branches:
  refs/heads/master 8acb51f08 -> eacb62fbb


[SPARK-22624][PYSPARK] Expose range partitioning shuffle introduced by spark-22614

## What changes were proposed in this pull request?

 Expose range partitioning shuffle introduced by spark-22614

## How was this patch tested?

Unit test in dataframe.py

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: xubo245 <60...@qq.com>

Closes #20456 from xubo245/SPARK22624_PysparkRangePartition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eacb62fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eacb62fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eacb62fb

Branch: refs/heads/master
Commit: eacb62fbbed317fd0e972102838af231385d54d8
Parents: 8acb51f
Author: xubo245 <60...@qq.com>
Authored: Sun Feb 11 19:23:15 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Sun Feb 11 19:23:15 2018 +0900

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py | 45 ++++++++++++++++++++++++++++++++++++
 python/pyspark/sql/tests.py     | 28 ++++++++++++++++++++++
 2 files changed, 73 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eacb62fb/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index faee870..5cc8b63 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -667,6 +667,51 @@ class DataFrame(object):
         else:
             raise TypeError("numPartitions should be an int or Column")
 
+    @since("2.4.0")
+    def repartitionByRange(self, numPartitions, *cols):
+        """
+        Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The
+        resulting DataFrame is range partitioned.
+
+        ``numPartitions`` can be an int to specify the target number of partitions or a Column.
+        If it is a Column, it will be used as the first partitioning column. If not specified,
+        the default number of partitions is used.
+
+        At least one partition-by expression must be specified.
+        When no explicit sort order is specified, "ascending nulls first" is assumed.
+
+        >>> df.repartitionByRange(2, "age").rdd.getNumPartitions()
+        2
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  5|  Bob|
+        +---+-----+
+        >>> df.repartitionByRange(1, "age").rdd.getNumPartitions()
+        1
+        >>> data = df.repartitionByRange("age")
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  5|  Bob|
+        +---+-----+
+        """
+        if isinstance(numPartitions, int):
+            if len(cols) == 0:
+                return ValueError("At least one partition-by expression must be specified.")
+            else:
+                return DataFrame(
+                    self._jdf.repartitionByRange(numPartitions, self._jcols(*cols)), self.sql_ctx)
+        elif isinstance(numPartitions, (basestring, Column)):
+            cols = (numPartitions,) + cols
+            return DataFrame(self._jdf.repartitionByRange(self._jcols(*cols)), self.sql_ctx)
+        else:
+            raise TypeError("numPartitions should be an int, string or Column")
+
     @since(1.3)
     def distinct(self):
         """Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`.

http://git-wip-us.apache.org/repos/asf/spark/blob/eacb62fb/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 4bc59fd..fe89bd0 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -2148,6 +2148,34 @@ class SQLTests(ReusedSQLTestCase):
         result = df.select(functions.expr("length(a)")).collect()[0].asDict()
         self.assertEqual(13, result["length(a)"])
 
+    def test_repartitionByRange_dataframe(self):
+        schema = StructType([
+            StructField("name", StringType(), True),
+            StructField("age", IntegerType(), True),
+            StructField("height", DoubleType(), True)])
+
+        df1 = self.spark.createDataFrame(
+            [(u'Bob', 27, 66.0), (u'Alice', 10, 10.0), (u'Bob', 10, 66.0)], schema)
+        df2 = self.spark.createDataFrame(
+            [(u'Alice', 10, 10.0), (u'Bob', 10, 66.0), (u'Bob', 27, 66.0)], schema)
+
+        # test repartitionByRange(numPartitions, *cols)
+        df3 = df1.repartitionByRange(2, "name", "age")
+        self.assertEqual(df3.rdd.getNumPartitions(), 2)
+        self.assertEqual(df3.rdd.first(), df2.rdd.first())
+        self.assertEqual(df3.rdd.take(3), df2.rdd.take(3))
+
+        # test repartitionByRange(numPartitions, *cols)
+        df4 = df1.repartitionByRange(3, "name", "age")
+        self.assertEqual(df4.rdd.getNumPartitions(), 3)
+        self.assertEqual(df4.rdd.first(), df2.rdd.first())
+        self.assertEqual(df4.rdd.take(3), df2.rdd.take(3))
+
+        # test repartitionByRange(*cols)
+        df5 = df1.repartitionByRange("name", "age")
+        self.assertEqual(df5.rdd.first(), df2.rdd.first())
+        self.assertEqual(df5.rdd.take(3), df2.rdd.take(3))
+
     def test_replace(self):
         schema = StructType([
             StructField("name", StringType(), True),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org