You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/05/14 08:25:06 UTC

git commit: Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition"

Repository: spark
Updated Branches:
  refs/heads/master c33b8dcbf -> 7bb9a521f


Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition"

This reverts commit 92cebada09a7e5a00ab48bcb350a9462949c33eb.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bb9a521
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bb9a521
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bb9a521

Branch: refs/heads/master
Commit: 7bb9a521f35eb19576c6cc2da3fd385910270e46
Parents: c33b8dc
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue May 13 23:24:51 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue May 13 23:24:51 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Partitioner.scala    | 61 --------------------
 .../org/apache/spark/PartitioningSuite.scala    | 34 -----------
 2 files changed, 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7bb9a521/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 6274796..9155159 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -156,64 +156,3 @@ class RangePartitioner[K : Ordering : ClassTag, V](
       false
   }
 }
-
-/**
- * A [[org.apache.spark.Partitioner]] that partitions records into specified bounds
- * Default value is 1000. Once all partitions have bounds elements, the partitioner
- * allocates 1 element per partition so eventually the smaller partitions are at most
- * off by 1 key compared to the larger partitions.
- */
-class BoundaryPartitioner[K : Ordering : ClassTag, V](
-                                                    partitions: Int,
-                                                    @transient rdd: RDD[_ <: Product2[K,V]],
-                                                    private val boundary: Int = 1000)
-  extends Partitioner {
-
-  // this array keeps track of keys assigned to a partition
-  // counts[0] refers to # of keys in partition 0 and so on
-  private val counts: Array[Int] = {
-    new Array[Int](numPartitions)
-  }
-
-  def numPartitions = math.abs(partitions)
-
-  /*
-  * Ideally, this should've been calculated based on # partitions and total keys
-  * But we are not calling count on RDD here to avoid calling an action.
-   * User has the flexibility of calling count and passing in any appropriate boundary
-   */
-  def keysPerPartition = boundary
-
-  var currPartition = 0
-
-  /*
-  * Pick current partition for the key until we hit the bound for keys / partition,
-  * start allocating to next partition at that time.
-  *
-  * NOTE: In case where we have lets say 2000 keys and user says 3 partitions with 500
-  * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 1001-1500 go to P3,
-  * after that, next keys go to one partition at a time. So 1501 goes to P1, 1502 goes to P2,
-  * 1503 goes to P3 and so on.
-   */
-  def getPartition(key: Any): Int = {
-    val partition = currPartition
-    counts(partition) = counts(partition) + 1
-    /*
-    * Since we are filling up a partition before moving to next one (this helps in maintaining
-    * order of keys, in certain cases, it is possible to end up with empty partitions, like
-    * 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition will be entirely
-    * empty.
-     */
-    if(counts(currPartition) >= keysPerPartition)
-      currPartition = (currPartition + 1) % numPartitions
-    partition
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case r: BoundaryPartitioner[_,_] =>
-      (r.counts.sameElements(counts) && r.boundary == boundary
-        && r.currPartition == currPartition)
-    case _ =>
-      false
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7bb9a521/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7d40395..7c30626 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -66,40 +66,6 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
     assert(descendingP4 != p4)
   }
 
-  test("BoundaryPartitioner equality") {
-    // Make an RDD where all the elements are the same so that the partition range bounds
-    // are deterministically all the same.
-    val rdd = sc.parallelize(1.to(4000)).map(x => (x, x))
-
-    val p2 = new BoundaryPartitioner(2, rdd, 1000)
-    val p4 = new BoundaryPartitioner(4, rdd, 1000)
-    val anotherP4 = new BoundaryPartitioner(4, rdd)
-
-    assert(p2 === p2)
-    assert(p4 === p4)
-    assert(p2 != p4)
-    assert(p4 != p2)
-    assert(p4 === anotherP4)
-    assert(anotherP4 === p4)
-  }
-
-  test("BoundaryPartitioner getPartition") {
-    val rdd = sc.parallelize(1.to(2000)).map(x => (x, x))
-    val partitioner = new BoundaryPartitioner(4, rdd,  500)
-    1.to(2000).map { element => {
-      val partition = partitioner.getPartition(element)
-      if (element <= 500) {
-        assert(partition === 0)
-      } else if (element > 501 && element <= 1000) {
-        assert(partition === 1)
-      } else if (element > 1001 && element <= 1500) {
-        assert(partition === 2)
-      } else if (element > 1501 && element <= 2000) {
-        assert(partition === 3)
-      }
-    }}
-  }
-
   test("RangePartitioner getPartition") {
     val rdd = sc.parallelize(1.to(2000)).map(x => (x, x))
     // We have different behaviour of getPartition for partitions with less than 1000 and more than