You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/11/27 15:48:00 UTC

[jira] [Resolved] (SPARK-29852) Implement parallel preemptive RDD.toLocalIterator and Dataset.toLocalIterator

     [ https://issues.apache.org/jira/browse/SPARK-29852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean R. Owen resolved SPARK-29852.
----------------------------------
    Resolution: Duplicate

> Implement parallel preemptive RDD.toLocalIterator and Dataset.toLocalIterator
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-29852
>                 URL: https://issues.apache.org/jira/browse/SPARK-29852
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, SQL
>    Affects Versions: 2.4.4, 3.0.0
>            Reporter: Peng Cheng
>            Priority: Major
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> Both RDD and Dataset APIs have 2 methods of collecting data from executors to driver:
>  
>  # .collect() setup multiple threads in a job and dump all data from executor into drivers memory. This is great if data on driver needs to be accessible ASAP, but not as efficient if access to partitions can only happen sequentially, and outright risky if driver doesn't have enough memory to hold all data.
> - the solution for issue SPARK-25224 partially alleviate this by delaying deserialisation of data in InternalRow format, such that only the much smaller serialised data needs to be entirely hold by driver memory. This solution does not abide O(1) memory consumption, thus does not scale to arbitrarily large dataset
>  # .toLocalIterator() fetch one partition in 1 job at a time, and fetching of the next partition does not start until sequential access to previous partition has concluded. This action abides O(1) memory consumption and is great if access to data is sequential and significantly slower than the speed where partitions can be shipped from a single executor, with 1 thread. It becomes inefficient when the sequential access to data has to wait for a relatively long time for the shipping of the next partition
> The proposed solution is a crossover between two existing implementations: a concurrent subroutine that is both CPU and memory bounded. The solution allocate a fixed sized resource pool (by default = number of available CPU cores) that serves the shipping of partitions concurrently, and block sequential access to partitions' data until shipping is finished (which usually happens without blocking for partitionID >=2 due to the fact that shipping start much earlier and preemptively). Tenants of the resource pool can be GC'ed and evicted once sequential access to it's data has finished, which allows more partitions to be fetched much earlier than they are accessed. The maximum memory consumption is O(m * n), where m is the predefined concurrency and n is the size of the largest partition.
> The following scala code snippet demonstrates a simple implementation:
>  
> (requires scala 2.11 + and ScalaTests)
>  
> {code:java}
> package org.apache.spark.spike
> import java.util.concurrent.ArrayBlockingQueue
> import org.apache.spark.rdd.RDD
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.{FutureAction, SparkContext}
> import org.scalatest.FunSpec
> import scala.concurrent.Future
> import scala.language.implicitConversions
> import scala.reflect.ClassTag
> import scala.util.{Failure, Success, Try}
> class ToLocalIteratorPreemptivelySpike extends FunSpec {
>   import ToLocalIteratorPreemptivelySpike._
>   lazy val sc: SparkContext = SparkSession.builder().master("local[*]").getOrCreate().sparkContext
>   it("can be much faster than toLocalIterator") {
>     val max = 80
>     val delay = 100
>     val slowRDD = sc.parallelize(1 to max, 8).map { v =>
>       Thread.sleep(delay)
>       v
>     }
>     val (r1, t1) = timed {
>       slowRDD.toLocalIterator.toList
>     }
>     val capacity = 4
>     val (r2, t2) = timed {
>       slowRDD.toLocalIteratorPreemptively(capacity).toList
>     }
>     assert(r1 == r2)
>     println(s"linear: $t1, preemptive: $t2")
>     assert(t1 > t2 * 2)
>     assert(t2 > max * delay / capacity)
>   }
> }
> object ToLocalIteratorPreemptivelySpike {
>   case class PartitionExecution[T: ClassTag](
>       @transient self: RDD[T],
>       id: Int
>   ) {
>     def eager: this.type = {
>       AsArray.future
>       this
>     }
>     case object AsArray {
>       @transient lazy val future: FutureAction[Array[T]] = {
>         var result: Array[T] = null
>         val future = self.context.submitJob[T, Array[T], Array[T]](
>           self,
>           _.toArray,
>           Seq(id), { (_, data) =>
>             result = data
>           },
>           result
>         )
>         future
>       }
>       @transient lazy val now: Array[T] = future.get()
>     }
>   }
>   implicit class RDDFunctions[T: ClassTag](self: RDD[T]) {
>     import scala.concurrent.ExecutionContext.Implicits.global
>     def _toLocalIteratorPreemptively(capacity: Int): Iterator[Array[T]] = {
>       val executions = self.partitions.indices.map { ii =>
>         PartitionExecution(self, ii)
>       }
>       val buffer = new ArrayBlockingQueue[Try[PartitionExecution[T]]](capacity)
>       Future {
>         executions.foreach { exe =>
>           buffer.put(Success(exe)) // may be blocking due to capacity
>           exe.eager // non-blocking
>         }
>       }.onFailure {
>         case e: Throwable =>
>           buffer.put(Failure(e))
>       }
>       self.partitions.indices.toIterator.map { _ =>
>         val exe = buffer.take().get
>         exe.AsArray.now
>       }
>     }
>     def toLocalIteratorPreemptively(capacity: Int): Iterator[T] = {
>       _toLocalIteratorPreemptively(capacity).flatten
>     }
>   }
>   def timed[T](fn: => T): (T, Long) = {
>     val startTime = System.currentTimeMillis()
>     val result = fn
>     val endTime = System.currentTimeMillis()
>     (result, endTime - startTime)
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org