You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/02/19 18:22:37 UTC

[GitHub] zsxwing commented on a change in pull request #21913: [SPARK-24005][CORE] Remove usage of Scala’s parallel collection

zsxwing commented on a change in pull request #21913: [SPARK-24005][CORE] Remove usage of Scala’s parallel collection
URL: https://github.com/apache/spark/pull/21913#discussion_r258168424
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
 ##########
 @@ -254,4 +258,62 @@ private[spark] object ThreadUtils {
       executor.shutdownNow()
     }
   }
+
+  /**
+   * Transforms input collection by applying the given function to each element in parallel fashion.
+   * Comparing to the map() method of Scala parallel collections, this method can be interrupted
+   * at any time. This is useful on canceling of task execution, for example.
+   *
+   * @param in - the input collection which should be transformed in parallel.
+   * @param prefix - the prefix assigned to the underlying thread pool.
+   * @param maxThreads - maximum number of thread can be created during execution.
+   * @param f - the lambda function will be applied to each element of `in`.
+   * @tparam I - the type of elements in the input collection.
+   * @tparam O - the type of elements in resulted collection.
+   * @return new collection in which each element was given from the input collection `in` by
+   *         applying the lambda function `f`.
+   */
+  def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]]
+      (in: Col[I], prefix: String, maxThreads: Int)
+      (f: I => O)
+      (implicit
+        cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
+        cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]] // for Future.sequence
+      ): Col[O] = {
+    val pool = newForkJoinPool(prefix, maxThreads)
+    try {
+      implicit val ec = ExecutionContext.fromExecutor(pool)
+
+      parmap(in)(f)
+    } finally {
+      pool.shutdownNow()
 
 Review comment:
   @ConeyLiu this line interrupts the tasks in the thread pool. Scala `par` doesn't do this. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org