You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Lloyd Haris <ll...@gmail.com> on 2015/10/08 03:16:43 UTC

SparkSQL: First query execution is always slower than subsequent queries

Hi Spark Devs,

I am doing a performance evaluation of Spark using pyspark. I am using
Spark 1.5 with a Hadoop 2.6 cluster of 4 nodes and ran these tests on local
mode.

After a few dozen test executions, it turned out that the very first
SparkSQL query execution is always slower than the subsequent executions of
the same query.

First run:

15/10/08 11:15:35 INFO ParseDriver: Parsing command: SELECT CATAID, RA, DEC
FROM InputCatA
15/10/08 11:15:36 INFO ParseDriver: Parse Completed
15/10/08 11:15:36 INFO MemoryStore: ensureFreeSpace(484576) called with
curMem=0, maxMem=556038881
15/10/08 11:15:36 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 473.2 KB, free 529.8 MB)
15/10/08 11:15:37 INFO MemoryStore: ensureFreeSpace(45559) called with
curMem=484576, maxMem=556038881
15/10/08 11:15:37 INFO MemoryStore: Block broadcast_0_piece0 stored as
bytes in memory (estimated size 44.5 KB, free 529.8 MB)
15/10/08 11:15:37 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on localhost:59594 (size: 44.5 KB, free: 530.2 MB)
15/10/08 11:15:37 INFO SparkContext: Created broadcast 0 from collect at
<stdin>:3
15/10/08 11:15:37 INFO FileInputFormat: Total input paths to process : 4
15/10/08 11:15:37 INFO SparkContext: Starting job: collect at <stdin>:3
15/10/08 11:15:37 INFO DAGScheduler: Got job 0 (collect at <stdin>:3) with
4 output partitions
15/10/08 11:15:37 INFO DAGScheduler: Final stage: ResultStage 0(collect at
<stdin>:3)
15/10/08 11:15:37 INFO DAGScheduler: Parents of final stage: List()
15/10/08 11:15:37 INFO DAGScheduler: Missing parents: List()
15/10/08 11:15:37 INFO DAGScheduler: Submitting ResultStage 0
(MapPartitionsRDD[4] at collect at <stdin>:3), which has no missing parents
15/10/08 11:15:37 INFO MemoryStore: ensureFreeSpace(8896) called with
curMem=530135, maxMem=556038881
15/10/08 11:15:37 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 8.7 KB, free 529.8 MB)
15/10/08 11:15:37 INFO MemoryStore: ensureFreeSpace(4679) called with
curMem=539031, maxMem=556038881
15/10/08 11:15:37 INFO MemoryStore: Block broadcast_1_piece0 stored as
bytes in memory (estimated size 4.6 KB, free 529.8 MB)
15/10/08 11:15:37 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on localhost:59594 (size: 4.6 KB, free: 530.2 MB)
15/10/08 11:15:37 INFO SparkContext: Created broadcast 1 from broadcast at
DAGScheduler.scala:861
15/10/08 11:15:37 INFO DAGScheduler: Submitting 4 missing tasks from
ResultStage 0 (MapPartitionsRDD[4] at collect at <stdin>:3)
15/10/08 11:15:37 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
15/10/08 11:15:37 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
0, localhost, ANY, 2184 bytes)
15/10/08 11:15:37 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
1, localhost, ANY, 2184 bytes)
15/10/08 11:15:37 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
2, localhost, ANY, 2184 bytes)
15/10/08 11:15:37 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
3, localhost, ANY, 2184 bytes)
15/10/08 11:15:37 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/10/08 11:15:37 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
15/10/08 11:15:37 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
15/10/08 11:15:37 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/10/08 11:15:37 INFO HadoopRDD: Input split:
hdfs://<url>:8020/user/hive/warehouse/inputcata/part-m-00003:0+55353265
15/10/08 11:15:37 INFO HadoopRDD: Input split:
hdfs://<url>:8020/user/hive/warehouse/inputcata/part-m-00000:0+55149172
15/10/08 11:15:37 INFO HadoopRDD: Input split:
hdfs://<url>:8020/user/hive/warehouse/inputcata/part-m-00002:0+55418752
15/10/08 11:15:37 INFO HadoopRDD: Input split:
hdfs://<url>:8020/user/hive/warehouse/inputcata/part-m-00001:0+55083905
15/10/08 11:15:37 INFO deprecation: mapred.tip.id is deprecated. Instead,
use mapreduce.task.id
15/10/08 11:15:37 INFO deprecation: mapred.task.id is deprecated. Instead,
use mapreduce.task.attempt.id
15/10/08 11:15:37 INFO deprecation: mapred.task.is.map is deprecated.
Instead, use mapreduce.task.ismap
15/10/08 11:15:37 INFO deprecation: mapred.task.partition is deprecated.
Instead, use mapreduce.task.partition
15/10/08 11:15:37 INFO deprecation: mapred.job.id is deprecated. Instead,
use mapreduce.job.id
15/10/08 11:15:39 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1).
8919569 bytes result sent to driver
15/10/08 11:15:39 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3).
8919548 bytes result sent to driver
15/10/08 11:15:39 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2).
8926124 bytes result sent to driver
15/10/08 11:15:39 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
1) in 2186 ms on localhost (1/4)
15/10/08 11:15:39 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID
3) in 2203 ms on localhost (2/4)
15/10/08 11:15:39 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID
2) in 2208 ms on localhost (3/4)
15/10/08 11:15:39 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0).
8794038 bytes result sent to driver
15/10/08 11:15:39 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID
0) in 2395 ms on localhost (4/4)
15/10/08 11:15:39 INFO DAGScheduler: ResultStage 0 (collect at <stdin>:3)
finished in 2.403 s
15/10/08 11:15:39 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool
15/10/08 11:15:40 INFO DAGScheduler: Job 0 finished: collect at <stdin>:3,
took 2.468262 s
query executed
8.088552951812744

Second run:

15/10/08 11:16:39 INFO ParseDriver: Parsing command: SELECT CATAID, RA, DEC
FROM InputCatA
15/10/08 11:16:39 INFO ParseDriver: Parse Completed
15/10/08 11:16:40 INFO MemoryStore: ensureFreeSpace(484576) called with
curMem=543710, maxMem=556038881
15/10/08 11:16:40 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 473.2 KB, free 529.3 MB)
15/10/08 11:16:40 INFO MemoryStore: ensureFreeSpace(45559) called with
curMem=1028286, maxMem=556038881
15/10/08 11:16:40 INFO MemoryStore: Block broadcast_2_piece0 stored as
bytes in memory (estimated size 44.5 KB, free 529.3 MB)
15/10/08 11:16:40 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on localhost:59594 (size: 44.5 KB, free: 530.2 MB)
15/10/08 11:16:40 INFO SparkContext: Created broadcast 2 from collect at
<stdin>:3
15/10/08 11:16:40 INFO FileInputFormat: Total input paths to process : 4
15/10/08 11:16:40 INFO SparkContext: Starting job: collect at <stdin>:3
15/10/08 11:16:40 INFO DAGScheduler: Got job 1 (collect at <stdin>:3) with
4 output partitions
15/10/08 11:16:40 INFO DAGScheduler: Final stage: ResultStage 1(collect at
<stdin>:3)
15/10/08 11:16:40 INFO DAGScheduler: Parents of final stage: List()
15/10/08 11:16:40 INFO DAGScheduler: Missing parents: List()
15/10/08 11:16:40 INFO DAGScheduler: Submitting ResultStage 1
(MapPartitionsRDD[9] at collect at <stdin>:3), which has no missing parents
15/10/08 11:16:40 INFO MemoryStore: ensureFreeSpace(8896) called with
curMem=1073845, maxMem=556038881
15/10/08 11:16:40 INFO MemoryStore: Block broadcast_3 stored as values in
memory (estimated size 8.7 KB, free 529.2 MB)
15/10/08 11:16:40 INFO MemoryStore: ensureFreeSpace(4688) called with
curMem=1082741, maxMem=556038881
15/10/08 11:16:40 INFO MemoryStore: Block broadcast_3_piece0 stored as
bytes in memory (estimated size 4.6 KB, free 529.2 MB)
15/10/08 11:16:40 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
on localhost:59594 (size: 4.6 KB, free: 530.2 MB)
15/10/08 11:16:40 INFO SparkContext: Created broadcast 3 from broadcast at
DAGScheduler.scala:861
15/10/08 11:16:40 INFO DAGScheduler: Submitting 4 missing tasks from
ResultStage 1 (MapPartitionsRDD[9] at collect at <stdin>:3)
15/10/08 11:16:40 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks
15/10/08 11:16:40 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID
4, localhost, ANY, 2184 bytes)
15/10/08 11:16:40 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID
5, localhost, ANY, 2184 bytes)
15/10/08 11:16:40 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID
6, localhost, ANY, 2184 bytes)
15/10/08 11:16:40 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID
7, localhost, ANY, 2184 bytes)
15/10/08 11:16:40 INFO Executor: Running task 0.0 in stage 1.0 (TID 4)
15/10/08 11:16:40 INFO Executor: Running task 1.0 in stage 1.0 (TID 5)
15/10/08 11:16:40 INFO Executor: Running task 3.0 in stage 1.0 (TID 7)
15/10/08 11:16:40 INFO HadoopRDD: Input split:
hdfs://<url>:8020/user/hive/warehouse/inputcata/part-m-00001:0+55083905
15/10/08 11:16:40 INFO HadoopRDD: Input split:
hdfs://<url>:8020/user/hive/warehouse/inputcata/part-m-00000:0+55149172
15/10/08 11:16:40 INFO Executor: Running task 2.0 in stage 1.0 (TID 6)
15/10/08 11:16:40 INFO HadoopRDD: Input split:
hdfs://<url>:8020/user/hive/warehouse/inputcata/part-m-00003:0+55353265
15/10/08 11:16:40 INFO HadoopRDD: Input split:
hdfs://<url>:8020/user/hive/warehouse/inputcata/part-m-00002:0+55418752
15/10/08 11:16:41 INFO Executor: Finished task 1.0 in stage 1.0 (TID 5).
8925439 bytes result sent to driver
15/10/08 11:16:41 INFO Executor: Finished task 0.0 in stage 1.0 (TID 4).
8793850 bytes result sent to driver
15/10/08 11:16:41 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID
4) in 1238 ms on localhost (1/4)
15/10/08 11:16:41 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID
5) in 1265 ms on localhost (2/4)
15/10/08 11:16:41 INFO Executor: Finished task 3.0 in stage 1.0 (TID 7).
8925361 bytes result sent to driver
15/10/08 11:16:41 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID
7) in 1283 ms on localhost (3/4)
15/10/08 11:16:41 INFO Executor: Finished task 2.0 in stage 1.0 (TID 6).
8926039 bytes result sent to driver
15/10/08 11:16:41 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID
6) in 1419 ms on localhost (4/4)
15/10/08 11:16:41 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks
have all completed, from pool
15/10/08 11:16:41 INFO DAGScheduler: ResultStage 1 (collect at <stdin>:3)
finished in 1.421 s
15/10/08 11:16:41 INFO DAGScheduler: Job 1 finished: collect at <stdin>:3,
took 1.435883 s
query executed
5.115309000015259

As you can see from the log messages, the major difference I can see is the
highlighted part. Seems to me like(Yes I am newbie to Spark) it is trying
to create some Map/Reduce jobs and it's trying to use MR1 as Opposed to
MR2. Those messages are not printed on any subsequent query execution. If
you ran a different query which involved a completely different table/s
several times, there wouldn't be any significant difference in execution
times among those runs. So the delay is only with the very first run.

So my questions are:

1). Is that the reason why it's always slow in the first run? Or are there
any other reasons? Apparently it loads data to memory every time so it
shouldn't be something to do with disk read should it?

2). Does Spark use the Hadoop's Map Reduce engine under the hood? If so can
we configure it to use MR2 instead of MR1.

I might be completely wrong so if anyone can tell my why this is happening
that will be highly appreciated.

Cheers
Lloyd

Re: SparkSQL: First query execution is always slower than subsequent queries

Posted by Xiao Li <ga...@gmail.com>.
Hi, Lloyd,

Both runs are cold/warm? Memory/cache hit/miss could be a big factor if
your application is IO intensive. You need to monitor your system to
understand what is your bottleneck.

Good lucks,

Xiao Li

Re: SparkSQL: First query execution is always slower than subsequent queries

Posted by Michael Armbrust <mi...@databricks.com>.
-dev +user

1). Is that the reason why it's always slow in the first run? Or are there
> any other reasons? Apparently it loads data to memory every time so it
> shouldn't be something to do with disk read should it?
>

You are probably seeing the effect of the JVMs JIT.  The first run is
executing in interpreted mode.  Once the JVM sees its a hot piece of code
it will compile it to native code.  This applies both to Spark / Spark SQL
itself and (as of Spark 1.5) the code that we dynamically generate for
doing expression evaluation.  Multiple runs with the same expressions will
used cached code that might have been JITed.


> 2). Does Spark use the Hadoop's Map Reduce engine under the hood? If so
> can we configure it to use MR2 instead of MR1.
>

No, we do not use the map reduce engine for execution.  You can however
compile Spark to work with either version of hadoop for so you can access
HDFS, etc.

Re: SparkSQL: First query execution is always slower than subsequent queries

Posted by Michael Armbrust <mi...@databricks.com>.
-dev +user

1). Is that the reason why it's always slow in the first run? Or are there
> any other reasons? Apparently it loads data to memory every time so it
> shouldn't be something to do with disk read should it?
>

You are probably seeing the effect of the JVMs JIT.  The first run is
executing in interpreted mode.  Once the JVM sees its a hot piece of code
it will compile it to native code.  This applies both to Spark / Spark SQL
itself and (as of Spark 1.5) the code that we dynamically generate for
doing expression evaluation.  Multiple runs with the same expressions will
used cached code that might have been JITed.


> 2). Does Spark use the Hadoop's Map Reduce engine under the hood? If so
> can we configure it to use MR2 instead of MR1.
>

No, we do not use the map reduce engine for execution.  You can however
compile Spark to work with either version of hadoop for so you can access
HDFS, etc.