You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/20 20:06:10 UTC

git commit: [SPARK-2598] RangePartitioner's binary search does not use the given Ordering

Repository: spark
Updated Branches:
  refs/heads/master 98ab41122 -> fa51b0fb5


[SPARK-2598] RangePartitioner's binary search does not use the given Ordering

We should fix this in branch-1.0 as well.

Author: Reynold Xin <rx...@apache.org>

Closes #1500 from rxin/rangePartitioner and squashes the following commits:

c0a94f5 [Reynold Xin] [SPARK-2598] RangePartitioner's binary search does not use the given Ordering.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa51b0fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa51b0fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa51b0fb

Branch: refs/heads/master
Commit: fa51b0fb5bee95a402c7b7f13dcf0b46cf5bb429
Parents: 98ab411
Author: Reynold Xin <rx...@apache.org>
Authored: Sun Jul 20 11:06:06 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Jul 20 11:06:06 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Partitioner.scala    |  4 +-
 .../org/apache/spark/util/CollectionsUtil.scala | 46 -------------------
 .../apache/spark/util/CollectionsUtils.scala    | 47 ++++++++++++++++++++
 .../org/apache/spark/PartitioningSuite.scala    | 14 ++++++
 4 files changed, 63 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fa51b0fb/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index ec99648..52c018b 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -134,8 +134,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   def getPartition(key: Any): Int = {
     val k = key.asInstanceOf[K]
     var partition = 0
-    if (rangeBounds.length < 1000) {
-      // If we have less than 100 partitions naive search
+    if (rangeBounds.length <= 128) {
+      // If we have less than 128 partitions naive search
       while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
         partition += 1
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/fa51b0fb/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala b/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala
deleted file mode 100644
index e4c254b..0000000
--- a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.util
-
-import scala.Array
-import scala.reflect._
-
-private[spark] object CollectionsUtils {
-  def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = {
-    classTag[K] match {
-      case ClassTag.Float =>
-        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])
-      case ClassTag.Double =>
-        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Double]], x.asInstanceOf[Double])
-      case ClassTag.Byte =>
-        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Byte]], x.asInstanceOf[Byte])
-      case ClassTag.Char =>
-        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Char]], x.asInstanceOf[Char])
-      case ClassTag.Short =>
-        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Short]], x.asInstanceOf[Short])
-      case ClassTag.Int =>
-        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Int]], x.asInstanceOf[Int])
-      case ClassTag.Long =>
-        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long])
-      case _ =>
-        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fa51b0fb/core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala b/core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala
new file mode 100644
index 0000000..85da284
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/CollectionsUtils.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util
+
+import scala.reflect.{classTag, ClassTag}
+
+private[spark] object CollectionsUtils {
+  def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = {
+    // For primitive keys, we can use the natural ordering. Otherwise, use the Ordering comparator.
+    classTag[K] match {
+      case ClassTag.Float =>
+        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])
+      case ClassTag.Double =>
+        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Double]], x.asInstanceOf[Double])
+      case ClassTag.Byte =>
+        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Byte]], x.asInstanceOf[Byte])
+      case ClassTag.Char =>
+        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Char]], x.asInstanceOf[Char])
+      case ClassTag.Short =>
+        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Short]], x.asInstanceOf[Short])
+      case ClassTag.Int =>
+        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Int]], x.asInstanceOf[Int])
+      case ClassTag.Long =>
+        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long])
+      case _ =>
+        val comparator = implicitly[Ordering[K]].asInstanceOf[java.util.Comparator[Any]]
+        (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x, comparator)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fa51b0fb/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7c30626..4658a08 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -91,6 +91,17 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
     }
   }
 
+  test("RangePartitioner for keys that are not Comparable (but with Ordering)") {
+    // Row does not extend Comparable, but has an implicit Ordering defined.
+    implicit object RowOrdering extends Ordering[Row] {
+      override def compare(x: Row, y: Row) = x.value - y.value
+    }
+
+    val rdd = sc.parallelize(1 to 4500).map(x => (Row(x), Row(x)))
+    val partitioner = new RangePartitioner(1500, rdd)
+    partitioner.getPartition(Row(100))
+  }
+
   test("HashPartitioner not equal to RangePartitioner") {
     val rdd = sc.parallelize(1 to 10).map(x => (x, x))
     val rangeP2 = new RangePartitioner(2, rdd)
@@ -177,3 +188,6 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
     // Add other tests here for classes that should be able to handle empty partitions correctly
   }
 }
+
+
+private sealed case class Row(value: Int)