You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/10/17 09:55:58 UTC
[jira] [Resolved] (SPARK-17951) BlockFetch with multiple threads
slows down after spark 1.6
[ https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-17951.
-------------------------------
Resolution: Not A Problem
This does not show a slow-down in an actual Spark operation though, and doesn't show a significant change anyway. To proceed, you'd want to show that calling a user-facing API is significantly slower, and ideally, with some profiling details that suggests why or what is slow.
> BlockFetch with multiple threads slows down after spark 1.6
> -----------------------------------------------------------
>
> Key: SPARK-17951
> URL: https://issues.apache.org/jira/browse/SPARK-17951
> Project: Spark
> Issue Type: Bug
> Components: Block Manager, Spark Core
> Affects Versions: 1.6.2
> Environment: cluster with 8 node, each node has 28 cores. 10Gb network
> Reporter: ding
>
> The following code demonstrates the issue:
> {code}
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName(s"BMTest")
> val size = 3344570
> val sc = new SparkContext(conf)
> val data = sc.parallelize(1 to 100, 8)
> var accum = sc.accumulator(0.0, "get remote bytes")
> var i = 0
> while(i < 91) {
> accum = sc.accumulator(0.0, "get remote bytes")
> val test = data.mapPartitionsWithIndex { (pid, iter) =>
> val N = size
> val bm = SparkEnv.get.blockManager
> val blockId = TaskResultBlockId(10*i + pid)
> val test = new Array[Byte](N)
> Random.nextBytes(test)
> val buffer = ByteBuffer.allocate(N)
> buffer.limit(N)
> buffer.put(test)
> bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
> Iterator(1)
> }.count()
>
> data.mapPartitionsWithIndex { (pid, iter) =>
> val before = System.nanoTime()
>
> val bm = SparkEnv.get.blockManager
> (0 to 7).map(s => {
> Future {
> val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
> }
> }).map(Await.result(_, Duration.Inf))
>
> accum.add((System.nanoTime() - before) / 1e9)
> Iterator(1)
> }.count()
> println("get remote bytes take: " + accum.value/8)
> i += 1
> }
> }
> {code}
> In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while
> in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s
> However if fetch block in single thread, the gap is much smaller.
> spark1.6.2 get remote bytes: 0.21 s
> spark1.5.1 get remote bytes: 0.20 s
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org