You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/07/22 11:39:13 UTC

git commit: [SPARK-2612] [mllib] Fix data skew in ALS

Repository: spark
Updated Branches:
  refs/heads/master 81fec9922 -> 75db1742a


[SPARK-2612] [mllib] Fix data skew in ALS

Author: peng.zhang <pe...@xiaomi.com>

Closes #1521 from renozhang/fix-als and squashes the following commits:

b5727a4 [peng.zhang] Remove no need argument
1a4f7a0 [peng.zhang] Fix data skew in ALS


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75db1742
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75db1742
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75db1742

Branch: refs/heads/master
Commit: 75db1742abf9e08111ddf8f330e6561c5520a86c
Parents: 81fec99
Author: peng.zhang <pe...@xiaomi.com>
Authored: Tue Jul 22 02:39:07 2014 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Tue Jul 22 02:39:07 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/mllib/recommendation/ALS.scala      | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/75db1742/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index cc56fd6..15e8855 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -252,14 +252,14 @@ class ALS private (
         val YtY = Some(sc.broadcast(computeYtY(users)))
         val previousProducts = products
         products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
-          userPartitioner, rank, lambda, alpha, YtY)
+          rank, lambda, alpha, YtY)
         previousProducts.unpersist()
         logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
         products.setName(s"products-$iter").persist()
         val XtX = Some(sc.broadcast(computeYtY(products)))
         val previousUsers = users
         users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,
-          productPartitioner, rank, lambda, alpha, XtX)
+          rank, lambda, alpha, XtX)
         previousUsers.unpersist()
       }
     } else {
@@ -267,11 +267,11 @@ class ALS private (
         // perform ALS update
         logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
         products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
-          userPartitioner, rank, lambda, alpha, YtY = None)
+          rank, lambda, alpha, YtY = None)
         products.setName(s"products-$iter")
         logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
         users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,
-          productPartitioner, rank, lambda, alpha, YtY = None)
+          rank, lambda, alpha, YtY = None)
         users.setName(s"users-$iter")
       }
     }
@@ -464,7 +464,6 @@ class ALS private (
       products: RDD[(Int, Array[Array[Double]])],
       productOutLinks: RDD[(Int, OutLinkBlock)],
       userInLinks: RDD[(Int, InLinkBlock)],
-      productPartitioner: Partitioner,
       rank: Int,
       lambda: Double,
       alpha: Double,
@@ -477,7 +476,7 @@ class ALS private (
           }
         }
         toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) }
-    }.groupByKey(productPartitioner)
+    }.groupByKey(new HashPartitioner(numUserBlocks))
      .join(userInLinks)
      .mapValues{ case (messages, inLinkBlock) =>
         updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY)