You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/02/12 09:42:52 UTC

git commit: Merge pull request #578 from mengxr/rank.

Updated Branches:
  refs/heads/master 68b2c0d02 -> e733d655d


Merge pull request #578 from mengxr/rank.

SPARK-1076: zipWithIndex and zipWithUniqueId to RDD

Assign ranks to an ordered or unordered data set is a common operation. This could be done by first counting records in each partition and then assign ranks in parallel.

The purpose of assigning ranks to an unordered set is usually to get a unique id for each item, e.g., to map feature names to feature indices. In such cases, the assignment could be done without counting records, saving one spark job.

https://spark-project.atlassian.net/browse/SPARK-1076

== update ==
Because assigning ranks is very similar to Scala's zipWithIndex, I changed the method name to zipWithIndex and put the index in the value field.

Author: Xiangrui Meng <me...@databricks.com>

Closes #578 and squashes the following commits:

52a05e1 [Xiangrui Meng] changed assignRanks to zipWithIndex changed assignUniqueIds to zipWithUniqueId minor updates
756881c [Xiangrui Meng] simplified RankedRDD by implementing assignUniqueIds separately moved couting iterator size to Utils do not count items in the last partition and skip counting if there is only one partition
630868c [Xiangrui Meng] newline
21b434b [Xiangrui Meng] add assignRanks and assignUniqueIds to RDD


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e733d655
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e733d655
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e733d655

Branch: refs/heads/master
Commit: e733d655df6bf569d3d16fdd65c11ef3d2b9de16
Parents: 68b2c0d
Author: Xiangrui Meng <me...@databricks.com>
Authored: Wed Feb 12 00:42:42 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Feb 12 00:42:42 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 36 ++++++----
 .../apache/spark/rdd/ZippedWithIndexRDD.scala   | 69 ++++++++++++++++++++
 .../scala/org/apache/spark/util/Utils.scala     | 13 ++++
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 26 ++++++++
 .../org/apache/spark/util/UtilsSuite.scala      |  7 ++
 5 files changed, 139 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e733d655/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 8010bb6..ec8e311 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -775,18 +775,7 @@ abstract class RDD[T: ClassTag](
   /**
    * Return the number of elements in the RDD.
    */
-  def count(): Long = {
-    sc.runJob(this, (iter: Iterator[T]) => {
-      // Use a while loop to count the number of elements rather than iter.size because
-      // iter.size uses a for loop, which is slightly slower in current version of Scala.
-      var result = 0L
-      while (iter.hasNext) {
-        result += 1L
-        iter.next()
-      }
-      result
-    }).sum
-  }
+  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
 
   /**
    * (Experimental) Approximate version of count() that returns a potentially incomplete result
@@ -870,6 +859,29 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+   * Zips this RDD with its element indices. The ordering is first based on the partition index
+   * and then the ordering of items within each partition. So the first item in the first
+   * partition gets index 0, and the last item in the last partition receives the largest index.
+   * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
+   * This method needs to trigger a spark job when this RDD contains more than one partitions.
+   */
+  def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this)
+
+  /**
+   * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
+   * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
+   * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
+   */
+  def zipWithUniqueId(): RDD[(T, Long)] = {
+    val n = this.partitions.size
+    this.mapPartitionsWithIndex { case (k, iter) =>
+      iter.zipWithIndex.map { case (item, i) =>
+        (item, i * n + k)
+      }
+    }
+  }
+
+  /**
    * Take the first num elements of the RDD. It works by first scanning one partition, and use the
    * results from that partition to estimate the number of additional partitions needed to satisfy
    * the limit.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e733d655/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
new file mode 100644
index 0000000..5e08a46
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{TaskContext, Partition}
+import org.apache.spark.util.Utils
+
+private[spark]
+class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
+  extends Partition with Serializable {
+  override val index: Int = prev.index
+}
+
+/**
+ * Represents a RDD zipped with its element indices. The ordering is first based on the partition
+ * index and then the ordering of items within each partition. So the first item in the first
+ * partition gets index 0, and the last item in the last partition receives the largest index.
+ *
+ * @param prev parent RDD
+ * @tparam T parent RDD item type
+ */
+private[spark]
+class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {
+
+  override def getPartitions: Array[Partition] = {
+    val n = prev.partitions.size
+    val startIndices: Array[Long] =
+      if (n == 0) {
+        Array[Long]()
+      } else if (n == 1) {
+        Array(0L)
+      } else {
+        prev.context.runJob(
+          prev,
+          Utils.getIteratorSize _,
+          0 until n - 1, // do not need to count the last partition
+          false
+        ).scanLeft(0L)(_ + _)
+      }
+    firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] =
+    firstParent[T].preferredLocations(split.asInstanceOf[ZippedWithIndexRDDPartition].prev)
+
+  override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
+    val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
+    firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
+      (x._1, split.startIndex + x._2)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e733d655/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c201d0a..8749ab7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -855,4 +855,17 @@ private[spark] object Utils extends Logging {
     System.currentTimeMillis - start
   }
 
+  /**
+   * Counts the number of elements of an iterator using a while loop rather than calling
+   * [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower
+   * in the current version of Scala.
+   */
+  def getIteratorSize[T](iterator: Iterator[T]): Long = {
+    var count = 0L
+    while (iterator.hasNext) {
+      count += 1L
+      iterator.next()
+    }
+    count
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e733d655/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 879c4e5..308c7cc 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -525,4 +525,30 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(a.intersection(b).collect.sorted === intersection)
     assert(b.intersection(a).collect.sorted === intersection)
   }
+
+  test("zipWithIndex") {
+    val n = 10
+    val data = sc.parallelize(0 until n, 3)
+    val ranked = data.zipWithIndex()
+    ranked.collect().foreach { x =>
+      assert(x._1 === x._2)
+    }
+  }
+
+  test("zipWithIndex with a single partition") {
+    val n = 10
+    val data = sc.parallelize(0 until n, 1)
+    val ranked = data.zipWithIndex()
+    ranked.collect().foreach { x =>
+      assert(x._1 === x._2)
+    }
+  }
+
+  test("zipWithUniqueId") {
+    val n = 10
+    val data = sc.parallelize(0 until n, 3)
+    val ranked = data.zipWithUniqueId()
+    val ids = ranked.map(_._1).distinct().collect()
+    assert(ids.length === n)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e733d655/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 4684c8c..7030ba4 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -146,5 +146,12 @@ class UtilsSuite extends FunSuite {
     assert(bbuf.array.length === 8)
     assert(Utils.deserializeLongValue(bbuf.array) === testval)
   }
+
+  test("get iterator size") {
+    val empty = Seq[Int]()
+    assert(Utils.getIteratorSize(empty.toIterator) === 0L)
+    val iterator = Iterator.range(0, 5)
+    assert(Utils.getIteratorSize(iterator) === 5L)
+  }
 }