You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/07/16 09:16:46 UTC

spark git commit: [SPARK-8893] Add runtime checks against non-positive number of partitions

Repository: spark
Updated Branches:
  refs/heads/master 0a795336d -> 011551620


[SPARK-8893] Add runtime checks against non-positive number of partitions

https://issues.apache.org/jira/browse/SPARK-8893

> What does `sc.parallelize(1 to 3).repartition(p).collect` return? I would expect `Array(1, 2, 3)` regardless of `p`. But if `p` < 1, it returns `Array()`. I think instead it should throw an `IllegalArgumentException`.

> I think the case is pretty clear for `p` < 0. But the behavior for `p` = 0 is also error prone. In fact that's how I found this strange behavior. I used `rdd.repartition(a/b)` with positive `a` and `b`, but `a/b` was rounded down to zero and the results surprised me. I'd prefer an exception instead of unexpected (corrupt) results.

Author: Daniel Darabos <da...@gmail.com>

Closes #7285 from darabos/patch-1 and squashes the following commits:

decba82 [Daniel Darabos] Allow repartitioning empty RDDs to zero partitions.
97de852 [Daniel Darabos] Allow zero partition count in HashPartitioner
f6ba5fb [Daniel Darabos] Use require() for simpler syntax.
d5e3df8 [Daniel Darabos] Require positive number of partitions in HashPartitioner
897c628 [Daniel Darabos] Require positive maxPartitions in CoalescedRDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01155162
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01155162
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01155162

Branch: refs/heads/master
Commit: 011551620faa87107a787530f074af3d9be7e695
Parents: 0a79533
Author: Daniel Darabos <da...@gmail.com>
Authored: Thu Jul 16 08:16:54 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Jul 16 08:16:54 2015 +0100

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/Partitioner.scala      | 2 ++
 core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 5 ++++-
 2 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01155162/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 82889bc..ad68512 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -76,6 +76,8 @@ object Partitioner {
  * produce an unexpected or incorrect result.
  */
 class HashPartitioner(partitions: Int) extends Partitioner {
+  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
+
   def numPartitions: Int = partitions
 
   def getPartition(key: Any): Int = key match {

http://git-wip-us.apache.org/repos/asf/spark/blob/01155162/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 663eebb..90d9735 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -69,7 +69,7 @@ private[spark] case class CoalescedRDDPartition(
  * the preferred location of each new partition overlaps with as many preferred locations of its
  * parent partitions
  * @param prev RDD to be coalesced
- * @param maxPartitions number of desired partitions in the coalesced RDD
+ * @param maxPartitions number of desired partitions in the coalesced RDD (must be positive)
  * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
  */
 private[spark] class CoalescedRDD[T: ClassTag](
@@ -78,6 +78,9 @@ private[spark] class CoalescedRDD[T: ClassTag](
     balanceSlack: Double = 0.10)
   extends RDD[T](prev.context, Nil) {  // Nil since we implement getDependencies
 
+  require(maxPartitions > 0 || maxPartitions == prev.partitions.length,
+    s"Number of partitions ($maxPartitions) must be positive.")
+
   override def getPartitions: Array[Partition] = {
     val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org