You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by WeichenXu123 <gi...@git.apache.org> on 2017/11/10 07:53:13 UTC
[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...
Github user WeichenXu123 commented on a diff in the pull request:
https://github.com/apache/spark/pull/18624#discussion_r150170451
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala ---
@@ -286,40 +288,119 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
- val srcBlocks = blockify(srcFeatures)
- val dstBlocks = blockify(dstFeatures)
- val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) =>
- val m = srcIter.size
- val n = math.min(dstIter.size, num)
- val output = new Array[(Int, (Int, Double))](m * n)
+ val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
+ val dstBlocks = blockify(rank, dstFeatures)
+ val ratings = srcBlocks.cartesian(dstBlocks).map {
+ case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
+ val m = srcIds.length
+ val n = dstIds.length
+ val dstIdMatrix = new Array[Int](m * num)
+ val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity)
+ val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2))
+
+ val ratings = srcFactors.transpose.multiply(dstFactors)
+ var i = 0
+ var j = 0
+ while (i < m) {
+ var k = 0
+ while (k < n) {
+ pq += dstIds(k) -> ratings(i, k)
+ k += 1
+ }
+ k = 0
+ pq.toArray.sortBy(-_._2).foreach { case (id, score) =>
+ dstIdMatrix(j + k) = id
+ scoreMatrix(j + k) = score
+ k += 1
+ }
+ // pq.size maybe less than num, corner case
+ j += num
+ i += 1
+ pq.clear()
+ }
+ (index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix, true)))
+ }
+ ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)(
+ (rateSum, rate) => mergeFunc(rateSum, rate, num),
+ (rateSum1, rateSum2) => mergeFunc(rateSum1, rateSum2, num)
+ ).flatMap { case (index, (srcIds, dstIdMatrix, scoreMatrix)) =>
+ // to avoid corner case that the number of items is less than recommendation num
+ var col: Int = 0
+ while (col < num && scoreMatrix(0, col) > Double.NegativeInfinity) {
+ col += 1
+ }
+ val row = scoreMatrix.numRows
+ val output = new Array[(Int, Array[(Int, Double)])](row)
var i = 0
- val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
- srcIter.foreach { case (srcId, srcFactor) =>
- dstIter.foreach { case (dstId, dstFactor) =>
- // We use F2jBLAS which is faster than a call to native BLAS for vector dot product
- val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
- pq += dstId -> score
+ while (i < row) {
+ val factors = new Array[(Int, Double)](col)
+ var j = 0
+ while (j < col) {
+ factors(j) = (dstIdMatrix(i * num + j), scoreMatrix(i, j))
+ j += 1
}
- pq.foreach { case (dstId, score) =>
- output(i) = (srcId, (dstId, score))
- i += 1
+ output(i) = (srcIds(i), factors)
+ i += 1
+ }
+ output.toSeq}
+ }
+
+ private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix),
+ rate: (Array[Int], Array[Int], DenseMatrix),
+ num: Int): (Array[Int], Array[Int], DenseMatrix) = {
+ if (rateSum._1 == null) {
+ rate
+ } else {
+ val row = rateSum._3.numRows
+ var i = 0
+ val tempIdMatrix = new Array[Int](row * num)
+ val tempScoreMatrix = Array.fill[Double](row * num)(Double.NegativeInfinity)
+ while (i < row) {
+ var j = 0
+ var sum_index = 0
+ var rate_index = 0
+ val matrixIndex = i * num
+ while (j < num) {
+ if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) {
+ tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex + rate_index)
+ tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index)
+ rate_index += 1
+ } else {
+ tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex + sum_index)
+ tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index)
+ sum_index += 1
+ }
+ j += 1
}
- pq.clear()
+ i += 1
}
- output.toSeq
+ (rateSum._1, tempIdMatrix, new DenseMatrix(row, num, tempScoreMatrix, true))
}
- ratings.topByKey(num)(Ordering.by(_._2))
}
/**
* Blockifies features to improve the efficiency of cartesian product
* TODO: SPARK-20443 - expose blockSize as a param?
*/
- private def blockify(
- features: RDD[(Int, Array[Double])],
- blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = {
+ def blockify(
+ rank: Int,
+ features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
+ val blockSize = 2000 // TODO: tune the block size
--- End diff --
So will you add a parameter for this ?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org