You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Eyal Farago (JIRA)" <ji...@apache.org> on 2017/11/22 07:59:00 UTC

[jira] [Created] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming

Eyal Farago created SPARK-22579:
-----------------------------------

             Summary: BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
                 Key: SPARK-22579
                 URL: https://issues.apache.org/jira/browse/SPARK-22579
             Project: Spark
          Issue Type: Bug
          Components: Block Manager, Spark Core
    Affects Versions: 2.1.0
            Reporter: Eyal Farago


when an RDD partition is cached on an executor bu the task requiring it is running on another executor (process locality ANY), the cached partition is fetched via BlockManager.getRemoteValues which delegates to BlockManager.getRemoteBytes, both calls are blocking.
in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes cluster, cached to disk. rough math shows that average partition size is 700MB.
looking at spark UI it was obvious that tasks running with process locality 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I was able to capture thread dumps of executors executing remote tasks and got this stake trace:

{quote}Thread ID	Thread Name	Thread State	Thread Locks
1521	Executor task launch worker-1000	WAITING	Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978})
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
scala.concurrent.Await$.result(package.scala:190)
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104)
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582)
org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550)
org.apache.spark.storage.BlockManager.get(BlockManager.scala:638)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote}

digging into the code showed that the block manager first fetches all bytes (getRemoteBytes) and then wraps it with a deserialization stream, this has several draw backs:
1. blocking, requesting executor is blocked while the remote executor is serving the block.
2. potentially large memory footprint on requesting executor, in my use case a 700mb of raw bytes stored in a ChunkedByteBuffer.
3. inefficient, requesting side usually don't need all values at once as it consumes the values via an iterator.
4. potentially large memory footprint on serving executor, in case the block is cached in deserialized form the serving executor has to serialize it into a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU intensive, memory footprint can be reduced by using a limited buffer for serialization 'spilling' to the response stream.

I suggest improving this either by implementing full streaming mechanism or some kind of pagination mechanism, in addition the requesting executor should be able to make progress with the data it already has, blocking only when local buffer is exhausted and remote side didn't deliver the next chunk of the stream (or page in case of pagination) yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org