You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Peng Cheng (Jira)" <ji...@apache.org> on 2019/11/12 01:57:00 UTC
[jira] [Created] (SPARK-29852) Implement parallel preemptive
RDD.toLocalIterator and Dataset.toLocalIterator
Peng Cheng created SPARK-29852:
----------------------------------
Summary: Implement parallel preemptive RDD.toLocalIterator and Dataset.toLocalIterator
Key: SPARK-29852
URL: https://issues.apache.org/jira/browse/SPARK-29852
Project: Spark
Issue Type: New Feature
Components: Spark Core, SQL
Affects Versions: 2.4.4, 3.0.0
Reporter: Peng Cheng
Both RDD and Dataset APIs have 2 methods of collecting data from executors to driver:
# .collect() setup multiple threads in a job and dump all data from executor into drivers memory. This is great if data on driver needs to be accessible ASAP, but not as efficient if access to partitions can only happen sequentially, and outright risky if driver doesn't have enough memory to hold all data.
- the solution for issue SPARK-25224 partially alleviate this by delaying deserialisation of data in InternalRow format, such that only the much smaller serialised data needs to be entirely hold by driver memory. This solution does not abide O(1) memory consumption, thus does not scale to arbitrarily large dataset
# .toLocalIterator() fetch one partition in 1 job at a time, and fetching of the next partition does not start until sequential access to previous partition has concluded. This action abides O(1) memory consumption and is great if access to data is sequential and significantly slower than the speed where partitions can be shipped from a single executor, with 1 thread. It becomes inefficient when the sequential access to data has to wait for a relatively long time for the shipping of the next partition
The proposed solution is a crossover between two existing implementations: a concurrent subroutine that is both CPU and memory bounded. The solution allocate a fixed sized resource pool (by default = number of available CPU cores) that serves the shipping of partitions concurrently, and block sequential access to partitions' data until shipping is finished (which usually happens without blocking for partitionID >=2 due to the fact that shipping start much earlier and preemptively). Tenants of the resource pool can be GC'ed and evicted once sequential access to it's data has finished, which allows more partitions to be fetched much earlier than they are accessed. The maximum memory consumption is O(m * n), where m is the predefined concurrency and n is the size of the largest partition.
The following scala code snippet demonstrates a simple implementation:
(requires scala 2.11 + and ScalaTests)
{code:java}
package org.apache.spark.spike
import java.util.concurrent.ArrayBlockingQueue
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{FutureAction, SparkContext}
import org.scalatest.FunSpec
import scala.concurrent.Future
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
class ToLocalIteratorPreemptivelySpike extends FunSpec {
import ToLocalIteratorPreemptivelySpike._
lazy val sc: SparkContext = SparkSession.builder().master("local[*]").getOrCreate().sparkContext
it("can be much faster than toLocalIterator") {
val max = 80
val delay = 100
val slowRDD = sc.parallelize(1 to max, 8).map { v =>
Thread.sleep(delay)
v
}
val (r1, t1) = timed {
slowRDD.toLocalIterator.toList
}
val capacity = 4
val (r2, t2) = timed {
slowRDD.toLocalIteratorPreemptively(capacity).toList
}
assert(r1 == r2)
println(s"linear: $t1, preemptive: $t2")
assert(t1 > t2 * 2)
assert(t2 > max * delay / capacity)
}
}
object ToLocalIteratorPreemptivelySpike {
case class PartitionExecution[T: ClassTag](
@transient self: RDD[T],
id: Int
) {
def eager: this.type = {
AsArray.future
this
}
case object AsArray {
@transient lazy val future: FutureAction[Array[T]] = {
var result: Array[T] = null
val future = self.context.submitJob[T, Array[T], Array[T]](
self,
_.toArray,
Seq(id), { (_, data) =>
result = data
},
result
)
future
}
@transient lazy val now: Array[T] = future.get()
}
}
implicit class RDDFunctions[T: ClassTag](self: RDD[T]) {
import scala.concurrent.ExecutionContext.Implicits.global
def _toLocalIteratorPreemptively(capacity: Int): Iterator[Array[T]] = {
val executions = self.partitions.indices.map { ii =>
PartitionExecution(self, ii)
}
val buffer = new ArrayBlockingQueue[Try[PartitionExecution[T]]](capacity)
Future {
executions.foreach { exe =>
buffer.put(Success(exe)) // may be blocking due to capacity
exe.eager // non-blocking
}
}.onFailure {
case e: Throwable =>
buffer.put(Failure(e))
}
self.partitions.indices.toIterator.map { _ =>
val exe = buffer.take().get
exe.AsArray.now
}
}
def toLocalIteratorPreemptively(capacity: Int): Iterator[T] = {
_toLocalIteratorPreemptively(capacity).flatten
}
}
def timed[T](fn: => T): (T, Long) = {
val startTime = System.currentTimeMillis()
val result = fn
val endTime = System.currentTimeMillis()
(result, endTime - startTime)
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org