You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wenwj0 (via GitHub)" <gi...@apache.org> on 2023/05/30 12:00:24 UTC

[GitHub] [spark] wenwj0 opened a new pull request, #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

wenwj0 opened a new pull request, #41384:
URL: https://github.com/apache/spark/pull/41384

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   In mllib module, the kMeansPlusPlus is a local method which run in a single thread.
   I use scala parallel collections to change the array to ParVector, which supports parallel calculating.
   
   
   ### Why are the changes needed?
   The kMeansPlusPlus method runs in spark driver in a single thread. 
   If the input vectors are very large, it will take a long time to calculate the distance between every points.
   If we change this single-thread method to run in parallel, it will reduce a lot of time. 
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   All unit test cases passed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #41384:
URL: https://github.com/apache/spark/pull/41384#issuecomment-1728583712

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wenwj0 commented on pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "wenwj0 (via GitHub)" <gi...@apache.org>.
wenwj0 commented on PR #41384:
URL: https://github.com/apache/spark/pull/41384#issuecomment-1586519749

   I keep the original single-threaded method `kMeanPlusPlus`, and add `kMeanPlusPlusParallel` to use ParVector for parallelism, Although this implementation seems redundant, I can't find a better way to be compatible with `Array` and `ParVector`.
   
   PTAL @srowen @pan3793 @LuciferYang


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41384:
URL: https://github.com/apache/spark/pull/41384#discussion_r1210393260


##########
mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala:
##########
@@ -46,7 +47,9 @@ private[mllib] object LocalKMeans extends Logging {
 
     // Initialize centers by sampling using the k-means++ procedure.
     centers(0) = pickWeighted(rand, points, weights).toDense
-    val costArray = points.map(EuclideanDistanceMeasure.fastSquaredDistance(_, centers(0)))
+    val pointsParVector = new ParVector(points.toVector)

Review Comment:
   Can you find the threshold for `points.length` that can get performance improvement?
   
   Not sure this always can get performance improvement, `points.toVector` will also be some costs,  it will create a new Vector and add all elements using tail recursion.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wenwj0 commented on pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "wenwj0 (via GitHub)" <gi...@apache.org>.
wenwj0 commented on PR #41384:
URL: https://github.com/apache/spark/pull/41384#issuecomment-1572033954

   This idea sounds good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #41384:
URL: https://github.com/apache/spark/pull/41384#discussion_r1210393260


##########
mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala:
##########
@@ -46,7 +47,9 @@ private[mllib] object LocalKMeans extends Logging {
 
     // Initialize centers by sampling using the k-means++ procedure.
     centers(0) = pickWeighted(rand, points, weights).toDense
-    val costArray = points.map(EuclideanDistanceMeasure.fastSquaredDistance(_, centers(0)))
+    val pointsParVector = new ParVector(points.toVector)

Review Comment:
   @wenwj0 Can you find the threshold for `points.length` that can get performance improvement?
   
   Not sure this always can get performance improvement, `points.toVector` will also be some costs,  it will create a new Vector and add all elements using tail recursion.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wenwj0 commented on pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "wenwj0 (via GitHub)" <gi...@apache.org>.
wenwj0 commented on PR #41384:
URL: https://github.com/apache/spark/pull/41384#issuecomment-1570224042

   @pan3793 Thanks for reply, I am not familiar with the ExecutionContext you mentioned above, I have tested in my machine and it seems that it used all cores for calculation. 
   
   Do you think I need to control the threads num?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.
URL: https://github.com/apache/spark/pull/41384


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #41384:
URL: https://github.com/apache/spark/pull/41384#issuecomment-1572048240

   Why use a different config? use spark.driver.cores at best


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on a diff in pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on code in PR #41384:
URL: https://github.com/apache/spark/pull/41384#discussion_r1214326896


##########
mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala:
##########
@@ -38,15 +41,29 @@ private[mllib] object LocalKMeans extends Logging {
       points: Array[VectorWithNorm],
       weights: Array[Double],
       k: Int,
-      maxIterations: Int
+      maxIterations: Int,
+      threadNumber: Int

Review Comment:
   Maybe make this an optional arg defaulting to -1 (meaning use max cores)?



##########
mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala:
##########
@@ -38,15 +41,29 @@ private[mllib] object LocalKMeans extends Logging {
       points: Array[VectorWithNorm],
       weights: Array[Double],
       k: Int,
-      maxIterations: Int
+      maxIterations: Int,
+      threadNumber: Int
   ): Array[VectorWithNorm] = {
     val rand = new Random(seed)
     val dimensions = points(0).vector.size
     val centers = new Array[VectorWithNorm](k)
 
     // Initialize centers by sampling using the k-means++ procedure.
     centers(0) = pickWeighted(rand, points, weights).toDense
-    val costArray = points.map(EuclideanDistanceMeasure.fastSquaredDistance(_, centers(0)))
+
+    val pointsParVector: Seq[VectorWithNorm] = threadNumber match {
+      case 1 => points
+      case _ =>
+        logInfo(s"kMeansPlusPlus run in thread nums: $threadNumber.")
+        val pointsPar = new ParVector(points.toVector)
+        val taskSupport = new ForkJoinTaskSupport(
+          ThreadUtils.newForkJoinPool("localKMeans-task-support", threadNumber))

Review Comment:
   shutdown() the ForkJoinPool with try-finally here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wenwj0 commented on pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "wenwj0 (via GitHub)" <gi...@apache.org>.
wenwj0 commented on PR #41384:
URL: https://github.com/apache/spark/pull/41384#issuecomment-1573287111

   I add a taskSupport to control the threadNumber and tested on my own machine, the configuration worked.
   
   PTAL @srowen @pan3793 @LuciferYang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #41384:
URL: https://github.com/apache/spark/pull/41384#issuecomment-1572017926

   I think the idea is that you're sharing a global thread pool to do this, which may have side effects and may use more cores than the driver is configured to use. It isn't terrible. But you could also just make and stop an executioncontext for this to separate it and configure it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wenwj0 commented on pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "wenwj0 (via GitHub)" <gi...@apache.org>.
wenwj0 commented on PR #41384:
URL: https://github.com/apache/spark/pull/41384#issuecomment-1568318816

   PTAL @srowen


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on PR #41384:
URL: https://github.com/apache/spark/pull/41384#issuecomment-1568405597

   This change causes the Driver to use `scala.concurrent.ExecutionContext.global` for calculation, is it reasonable to occupy more cores than `spark.driver.cores`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on PR #41384:
URL: https://github.com/apache/spark/pull/41384#issuecomment-1572027161

   I think we should make it configurable.
   
   For resource manager which typically does not enable cgroups, like YARN, the default global ExecutionContext may exhaust the bare-metal machine cores, it's too dangerous.
   
   The default value should be the `spark.driver.cores`(it typically is 1, in such case, we should fallback the existing code to avoid creating a new collection)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wenwj0 commented on a diff in pull request #41384: [SPARK-43297][ML]Use scala parallel collection ParVector to accelarate LocalKMeans.

Posted by "wenwj0 (via GitHub)" <gi...@apache.org>.
wenwj0 commented on code in PR #41384:
URL: https://github.com/apache/spark/pull/41384#discussion_r1211708341


##########
mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala:
##########
@@ -46,7 +47,9 @@ private[mllib] object LocalKMeans extends Logging {
 
     // Initialize centers by sampling using the k-means++ procedure.
     centers(0) = pickWeighted(rand, points, weights).toDense
-    val costArray = points.map(EuclideanDistanceMeasure.fastSquaredDistance(_, centers(0)))
+    val pointsParVector = new ParVector(points.toVector)

Review Comment:
   Thanks for reply, I think the benefits of parallelism outweigh the costs of vector copying.
   It's not easy to find the threshold, because the points are two-dimensionsal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org