You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/18 01:57:59 UTC
spark git commit: [SPARK-21410][CORE] Create less partitions for
RangePartitioner if RDD.count() is less than `partitions`
Repository: spark
Updated Branches:
refs/heads/master a8c6d0f64 -> 7aac755ba
[SPARK-21410][CORE] Create less partitions for RangePartitioner if RDD.count() is less than `partitions`
## What changes were proposed in this pull request?
Fix a bug in RangePartitioner:
In RangePartitioner(partitions: Int, rdd: RDD[]), RangePartitioner.numPartitions is wrong if the number of elements in RDD (rdd.count()) is less than number of partitions (partitions in constructor).
## How was this patch tested?
test as described in [SPARK-SPARK-21410](https://issues.apache.org/jira/browse/SPARK-21410
)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Zhang A Peng <zh...@cn.ibm.com>
Closes #18631 from apapi/fixRangePartitioner.numPartitions.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7aac755b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7aac755b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7aac755b
Branch: refs/heads/master
Commit: 7aac755ba05be3689ae25f4b9183b32fa3408b89
Parents: a8c6d0f
Author: Zhang A Peng <zh...@cn.ibm.com>
Authored: Tue Jul 18 09:57:53 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jul 18 09:57:53 2017 +0800
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/Partitioner.scala | 2 +-
core/src/test/scala/org/apache/spark/PartitioningSuite.scala | 6 ++++++
2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7aac755b/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index f83f527..1484f29 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -153,7 +153,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
- RangePartitioner.determineBounds(candidates, partitions)
+ RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7aac755b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 34c0178..dfe4c25 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -253,6 +253,12 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva
// Add other tests here for classes that should be able to handle empty partitions correctly
}
+
+ test("Number of elements in RDD is less than number of partitions") {
+ val rdd = sc.parallelize(1 to 3).map(x => (x, x))
+ val partitioner = new RangePartitioner(22, rdd)
+ assert(partitioner.numPartitions === 3)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org