You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by WeichenXu123 <gi...@git.apache.org> on 2017/11/10 07:53:13 UTC

[GitHub] spark pull request #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendFo...

Github user WeichenXu123 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18624#discussion_r150170451
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala ---
    @@ -286,40 +288,119 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
           srcFeatures: RDD[(Int, Array[Double])],
           dstFeatures: RDD[(Int, Array[Double])],
           num: Int): RDD[(Int, Array[(Int, Double)])] = {
    -    val srcBlocks = blockify(srcFeatures)
    -    val dstBlocks = blockify(dstFeatures)
    -    val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) =>
    -      val m = srcIter.size
    -      val n = math.min(dstIter.size, num)
    -      val output = new Array[(Int, (Int, Double))](m * n)
    +    val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
    +    val dstBlocks = blockify(rank, dstFeatures)
    +    val ratings = srcBlocks.cartesian(dstBlocks).map {
    +      case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
    +        val m = srcIds.length
    +        val n = dstIds.length
    +        val dstIdMatrix = new Array[Int](m * num)
    +        val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity)
    +        val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2))
    +
    +        val ratings = srcFactors.transpose.multiply(dstFactors)
    +        var i = 0
    +        var j = 0
    +        while (i < m) {
    +          var k = 0
    +          while (k < n) {
    +            pq += dstIds(k) -> ratings(i, k)
    +            k += 1
    +          }
    +          k = 0
    +          pq.toArray.sortBy(-_._2).foreach { case (id, score) =>
    +            dstIdMatrix(j + k) = id
    +            scoreMatrix(j + k) = score
    +            k += 1
    +          }
    +          // pq.size maybe less than num, corner case
    +          j += num
    +          i += 1
    +          pq.clear()
    +        }
    +        (index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix, true)))
    +    }
    +    ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)(
    +      (rateSum, rate) => mergeFunc(rateSum, rate, num),
    +      (rateSum1, rateSum2) => mergeFunc(rateSum1, rateSum2, num)
    +    ).flatMap { case (index, (srcIds, dstIdMatrix, scoreMatrix)) =>
    +      // to avoid corner case that the number of items is less than recommendation num
    +      var col: Int = 0
    +      while (col < num && scoreMatrix(0, col) > Double.NegativeInfinity) {
    +        col += 1
    +      }
    +      val row = scoreMatrix.numRows
    +      val output = new Array[(Int, Array[(Int, Double)])](row)
           var i = 0
    -      val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
    -      srcIter.foreach { case (srcId, srcFactor) =>
    -        dstIter.foreach { case (dstId, dstFactor) =>
    -          // We use F2jBLAS which is faster than a call to native BLAS for vector dot product
    -          val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
    -          pq += dstId -> score
    +      while (i < row) {
    +        val factors = new Array[(Int, Double)](col)
    +        var j = 0
    +        while (j < col) {
    +          factors(j) = (dstIdMatrix(i * num + j), scoreMatrix(i, j))
    +          j += 1
             }
    -        pq.foreach { case (dstId, score) =>
    -          output(i) = (srcId, (dstId, score))
    -          i += 1
    +        output(i) = (srcIds(i), factors)
    +        i += 1
    +      }
    +     output.toSeq}
    +  }
    +
    +  private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix),
    +                        rate: (Array[Int], Array[Int], DenseMatrix),
    +                        num: Int): (Array[Int], Array[Int], DenseMatrix) = {
    +    if (rateSum._1 == null) {
    +      rate
    +    } else {
    +      val row = rateSum._3.numRows
    +      var i = 0
    +      val tempIdMatrix = new Array[Int](row * num)
    +      val tempScoreMatrix = Array.fill[Double](row * num)(Double.NegativeInfinity)
    +      while (i < row) {
    +        var j = 0
    +        var sum_index = 0
    +        var rate_index = 0
    +        val matrixIndex = i * num
    +        while (j < num) {
    +          if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) {
    +            tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex + rate_index)
    +            tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index)
    +            rate_index += 1
    +          } else {
    +            tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex + sum_index)
    +            tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index)
    +            sum_index += 1
    +          }
    +          j += 1
             }
    -        pq.clear()
    +        i += 1
           }
    -      output.toSeq
    +      (rateSum._1, tempIdMatrix, new DenseMatrix(row, num, tempScoreMatrix, true))
         }
    -    ratings.topByKey(num)(Ordering.by(_._2))
       }
     
       /**
        * Blockifies features to improve the efficiency of cartesian product
        * TODO: SPARK-20443 - expose blockSize as a param?
        */
    -  private def blockify(
    -      features: RDD[(Int, Array[Double])],
    -      blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = {
    +  def blockify(
    +      rank: Int,
    +      features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
    +    val blockSize = 2000 // TODO: tune the block size
    --- End diff --
    
    So will you add a parameter for this ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org