You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/11/04 18:57:13 UTC
git commit: [Spark-4060] [MLlib] exposing special rdd functions to
the public
Repository: spark
Updated Branches:
refs/heads/master bcecd73fd -> f90ad5d42
[Spark-4060] [MLlib] exposing special rdd functions to the public
Author: Niklas Wilcke <1w...@informatik.uni-hamburg.de>
Closes #2907 from numbnut/master and squashes the following commits:
7f7c767 [Niklas Wilcke] [Spark-4060] [MLlib] exposing special rdd functions to the public, #2907
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f90ad5d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f90ad5d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f90ad5d4
Branch: refs/heads/master
Commit: f90ad5d426cb726079c490a9bb4b1100e2b4e602
Parents: bcecd73
Author: Niklas Wilcke <1w...@informatik.uni-hamburg.de>
Authored: Tue Nov 4 09:57:03 2014 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Tue Nov 4 09:57:03 2014 -0800
----------------------------------------------------------------------
.../apache/spark/mllib/evaluation/AreaUnderCurve.scala | 2 +-
.../scala/org/apache/spark/mllib/rdd/RDDFunctions.scala | 11 ++++++-----
.../scala/org/apache/spark/mllib/rdd/SlidingRDD.scala | 5 +++--
.../org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala | 6 +++---
4 files changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f90ad5d4/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
index 7858ec6..078fbfb 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala
@@ -43,7 +43,7 @@ private[evaluation] object AreaUnderCurve {
*/
def of(curve: RDD[(Double, Double)]): Double = {
curve.sliding(2).aggregate(0.0)(
- seqOp = (auc: Double, points: Seq[(Double, Double)]) => auc + trapezoid(points),
+ seqOp = (auc: Double, points: Array[(Double, Double)]) => auc + trapezoid(points),
combOp = _ + _
)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f90ad5d4/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
index b5e403b..57c0768 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.rdd
import scala.language.implicitConversions
import scala.reflect.ClassTag
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
@@ -28,8 +29,8 @@ import org.apache.spark.util.Utils
/**
* Machine learning specific RDD functions.
*/
-private[mllib]
-class RDDFunctions[T: ClassTag](self: RDD[T]) {
+@DeveloperApi
+class RDDFunctions[T: ClassTag](self: RDD[T]) extends Serializable {
/**
* Returns a RDD from grouping items of its parent RDD in fixed size blocks by passing a sliding
@@ -39,10 +40,10 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) {
* trigger a Spark job if the parent RDD has more than one partitions and the window size is
* greater than 1.
*/
- def sliding(windowSize: Int): RDD[Seq[T]] = {
+ def sliding(windowSize: Int): RDD[Array[T]] = {
require(windowSize > 0, s"Sliding window size must be positive, but got $windowSize.")
if (windowSize == 1) {
- self.map(Seq(_))
+ self.map(Array(_))
} else {
new SlidingRDD[T](self, windowSize)
}
@@ -112,7 +113,7 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) {
}
}
-private[mllib]
+@DeveloperApi
object RDDFunctions {
/** Implicit conversion from an RDD to RDDFunctions. */
http://git-wip-us.apache.org/repos/asf/spark/blob/f90ad5d4/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
index dd80782..35e81fc 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
@@ -45,15 +45,16 @@ class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T]
*/
private[mllib]
class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
- extends RDD[Seq[T]](parent) {
+ extends RDD[Array[T]](parent) {
require(windowSize > 1, s"Window size must be greater than 1, but got $windowSize.")
- override def compute(split: Partition, context: TaskContext): Iterator[Seq[T]] = {
+ override def compute(split: Partition, context: TaskContext): Iterator[Array[T]] = {
val part = split.asInstanceOf[SlidingRDDPartition[T]]
(firstParent[T].iterator(part.prev, context) ++ part.tail)
.sliding(windowSize)
.withPartial(false)
+ .map(_.toArray)
}
override def getPreferredLocations(split: Partition): Seq[String] =
http://git-wip-us.apache.org/repos/asf/spark/blob/f90ad5d4/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
index 27a19f7..4ef67a4 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
@@ -42,9 +42,9 @@ class RDDFunctionsSuite extends FunSuite with LocalSparkContext {
val data = Seq(Seq(1, 2, 3), Seq.empty[Int], Seq(4), Seq.empty[Int], Seq(5, 6, 7))
val rdd = sc.parallelize(data, data.length).flatMap(s => s)
assert(rdd.partitions.size === data.length)
- val sliding = rdd.sliding(3)
- val expected = data.flatMap(x => x).sliding(3).toList
- assert(sliding.collect().toList === expected)
+ val sliding = rdd.sliding(3).collect().toSeq.map(_.toSeq)
+ val expected = data.flatMap(x => x).sliding(3).toSeq.map(_.toSeq)
+ assert(sliding === expected)
}
test("treeAggregate") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org