You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/17 00:22:02 UTC

[GitHub] [spark] zhengruifeng opened a new pull request, #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

zhengruifeng opened a new pull request, #37918:
URL: https://github.com/apache/spark/pull/37918

   ### What changes were proposed in this pull request?
   implement a new expression `CollectTopK`, which uses `Array` instead of `BoundedPriorityQueue` in ser/deser 
   
   
   ### Why are the changes needed?
   Reduce the shuffle size of ALS
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   existing testsuites
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #37918:
URL: https://github.com/apache/spark/pull/37918#issuecomment-1250413275

   Thanks. If the PR title is clear, +1 for that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r974838677


##########
mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala:
##########
@@ -496,18 +499,23 @@ class ALSModel private[ml] (
               .iterator.map { j => (srcId, dstIds(j), scores(j)) }
           }
         }
-      }
-    // We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output.
-    val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2))
-    val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn)
-      .toDF("id", "recommendations")
+      }.toDF(srcOutputColumn, dstOutputColumn, ratingColumn)
+
+    val aggFunc = CollectOrdered(struct(ratingColumn, dstOutputColumn).expr, num, true)
+      .toAggregateExpression(false)

Review Comment:
   sure, I think we don't want to expose it, so mark it `private[spark]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on PR #37918:
URL: https://github.com/apache/spark/pull/37918#issuecomment-1256260659

   Thanks! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen closed pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
srowen closed pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS
URL: https://github.com/apache/spark/pull/37918


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #37918:
URL: https://github.com/apache/spark/pull/37918#issuecomment-1249957261

   take the [`ALSExample`](https://github.com/apache/spark/blob/e1ea806b3075d279b5f08a29fe4c1ad6d3c4191a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala) for example:
   
   ```
   import org.apache.spark.ml.recommendation._
   
   case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
   
   def parseRating(str: String): Rating = {
       val fields = str.split("::")
       assert(fields.size == 4)
       Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
   }
   
   val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt").map(parseRating).toDF()
   
   val als = new ALS().setMaxIter(1).setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")
   
   val model = als.fit(ratings)
   
   model.recommendForAllItems(10).collect()
   ```
   
   before:
   <img width="1704" alt="image" src="https://user-images.githubusercontent.com/7322292/190832964-3f31bcd4-6bfb-445a-a339-b415f82719e3.png">
   
   
   
   after:
   <img width="1708" alt="image" src="https://user-images.githubusercontent.com/7322292/190832980-d0ccc2d3-f4d9-4801-9046-9d56f2fbab3c.png">
   
   
   the shuffle size in this case was reduced from `298.4 KiB` to `130.3 KiB`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #37918:
URL: https://github.com/apache/spark/pull/37918#issuecomment-1250789841

   cc @srowen @WeichenXu123 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r977468867


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -367,6 +367,9 @@ object functions {
    */
   def collect_set(columnName: String): Column = collect_set(Column(columnName))
 
+  private[spark] def collect_top_k(e: Column, num: Int, reverse: Boolean): Column =

Review Comment:
   Let's keep it private for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
srowen commented on PR #37918:
URL: https://github.com/apache/spark/pull/37918#issuecomment-1255026390

   Merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r974197580


##########
mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala:
##########
@@ -496,18 +499,23 @@ class ALSModel private[ml] (
               .iterator.map { j => (srcId, dstIds(j), scores(j)) }
           }
         }
-      }
-    // We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output.
-    val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2))
-    val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn)
-      .toDF("id", "recommendations")
+      }.toDF(srcOutputColumn, dstOutputColumn, ratingColumn)
+
+    val aggFunc = CollectOrdered(struct(ratingColumn, dstOutputColumn).expr, num, true)
+      .toAggregateExpression(false)

Review Comment:
   I think we can define a spark sql function and wrap this part within the function, like:
   
   ```
   def collect_top_k(ratingColumn, outputColumn) = {
      CollectOrdered(struct(ratingColumn, outputColumn).expr, num, true).toAggregateExpression(false)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r974195241


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##########
@@ -194,3 +194,44 @@ case class CollectSet(
   override protected def withNewChildInternal(newChild: Expression): CollectSet =
     copy(child = newChild)
 }
+
+/**
+ * Collect the top-k elements. This expression is dedicated only for MLLIB.
+ */
+case class CollectOrdered(

Review Comment:
   The naming is not clear,
   Why not call it `collect_top_k` ?
   
   and we can add it into spark.sql.functions `collect_top_k` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r977291080


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -367,6 +367,9 @@ object functions {
    */
   def collect_set(columnName: String): Column = collect_set(Column(columnName))
 
+  private[spark] def collect_top_k(e: Column, num: Int, reverse: Boolean): Column =

Review Comment:
   nit: shall we make it public ? It might be a useful function.
   
   We don't need to  do it in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r977466819


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -367,6 +367,9 @@ object functions {
    */
   def collect_set(columnName: String): Column = collect_set(Column(columnName))
 
+  private[spark] def collect_top_k(e: Column, num: Int, reverse: Boolean): Column =

Review Comment:
   I don't know, I also think it's useful and may further use it in Pandas-API-on-Spark.
   But I don't know whether it is suitable to be public @cloud-fan @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #37918:
URL: https://github.com/apache/spark/pull/37918#issuecomment-1250409906

   @dongjoon-hyun 
   
   > could you make an independent PR moving TopByKeyAggregator to CollectTopK because that is orthogonal from Reduce the shuffle size of ALS?
   
   It is just the moving from `TopByKeyAggregator` to `CollectTopK` that reduce the shuffle size, since the ser/deser is optimized in `CollectTopK`, let me update the PR description
   
   > In addition, we need a test coverage for CollectTopK because we remove TopByKeyAggregatorSuite.
   
   Sure, will update soon


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #37918:
URL: https://github.com/apache/spark/pull/37918#discussion_r974197580


##########
mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala:
##########
@@ -496,18 +499,23 @@ class ALSModel private[ml] (
               .iterator.map { j => (srcId, dstIds(j), scores(j)) }
           }
         }
-      }
-    // We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output.
-    val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2))
-    val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn)
-      .toDF("id", "recommendations")
+      }.toDF(srcOutputColumn, dstOutputColumn, ratingColumn)
+
+    val aggFunc = CollectOrdered(struct(ratingColumn, dstOutputColumn).expr, num, true)
+      .toAggregateExpression(false)

Review Comment:
   I think we can define a spark sql function and wrap this part within the function, like:
   
   ```
   def collect_top_k(ratingColumn, outputColumn) = {
      CollectOrdered(struct(ratingColumn, outputColumn).expr, num, true)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #37918: [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #37918:
URL: https://github.com/apache/spark/pull/37918#issuecomment-1255672550

   Thanks for the reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org