You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Prabhu Joseph <pr...@gmail.com> on 2016/02/01 09:32:44 UTC

Spark job does not perform well when some RDD in memory and some on Disk

Hi All,


Sample Spark application which reads a logfile from hadoop (1.2GB - 5 RDD's
created each approx 250MB data) and there are two jobs. Job A gets the line
with "a" and the Job B gets the line with "b". The spark application is ran
multiple times, each time with
different executor memory, and enable/disable cache() function. Job A
performance is same in all the runs as it has to read the entire data first
time from Disk.

Spark Cluster - standalone mode with Spark Master, single worker node (12
cores, 16GB memory)

    val logData = sc.textFile(logFile, 2)
    var numAs = logData.filter(line => line.contains("a")).count()
    var numBs = logData.filter(line => line.contains("b")).count()


*Job B (which has 5 tasks) results below:*

*Run 1:* 1 executor with 2GB memory, 12 cores took 2 seconds [ran1 image]

    Since logData is not cached, the job B has to again read the 1.2GB data
from hadoop into memory and all the 5 tasks started parallel and each took
2 sec (29ms for GC) and the
 overall job completed in 2 seconds.

*Run 2:* 1 executor with 2GB memory, 12 cores and logData is cached took 4
seconds [ran2 image, ran2_cache image]

     val logData = sc.textFile(logFile, 2).cache()

     The Executor does not have enough memory to cache and hence again
needs to read the entire 1.2GB data from hadoop into memory.  But since the
cache() is used, leads to lot of GC pause leading to slowness in task
completion. Each task started parallel and
completed in 4 seconds (more than 1 sec for GC).

*Run 3: 1 executor with 6GB memory, 12 cores and logData is cached took 10
seconds [ran3 image]*

     The Executor has memory that can fit 4 RDD partitions into memory but
5th RDD it has to read from Hadoop. 4 tasks are started parallel and they
completed in 0.3 seconds without GC. But the 5th task which has to read RDD
from disk is started after 4 seconds, and gets completed in 2 seconds.
Analysing why the 5th task is not started parallel with other tasks or at
least why it is not started immediately after the other task completion.

*Run 4:* 1 executor with 16GB memory , 12 cores and logData is cached took
0.3 seconds [ran4 image]

     The executor has enough memory to cache all the 5 RDD. All 5 tasks are
started in parallel and gets completed within 0.3 seconds.


So Spark performs well when entire input data is in Memory or None. In case
of some RDD in memory and some from disk, there is a delay in scheduling
the fifth task, is it a expected behavior or a possible Bug.



Thanks,
Prabhu Joseph

Re: Spark job does not perform well when some RDD in memory and some on Disk

Posted by Prabhu Joseph <pr...@gmail.com>.
If spark.locality.wait is 0, then there are two performance issues:

   1. Task Scheduler won't wait to schedule the tasks as DATA_LOCAL, will
launch it immediately on some node even if it is less local. The
probability of tasks running as less local will be higher
and affect the overall Job Performance.
      2. In case of Executor having not enough heap memory, some tasks
which has RDD on cache and some other has on hadoop, and if
spark.locality.wait is 0, all the tasks will starts parallel and since the
Executor Process is both Memory and IO intensive, the GC will be high and
tasks will be slower.














On Thu, Feb 4, 2016 at 5:13 PM, Alonso Isidoro Roman <al...@gmail.com>
wrote:

> "But learned that it is better not to reduce it to 0."
>
> could you explain a bit more this sentence?
>
> thanks
>
> Alonso Isidoro Roman.
>
> Mis citas preferidas (de hoy) :
> "Si depurar es el proceso de quitar los errores de software, entonces
> programar debe ser el proceso de introducirlos..."
>  -  Edsger Dijkstra
>
> My favorite quotes (today):
> "If debugging is the process of removing software bugs, then programming
> must be the process of putting ..."
>   - Edsger Dijkstra
>
> "If you pay peanuts you get monkeys"
>
>
> 2016-02-04 11:33 GMT+01:00 Prabhu Joseph <pr...@gmail.com>:
>
>> Okay, the reason for the task delay within executor when some RDD in
>> memory and some in Hadoop i.e, Multiple Locality Levels NODE_LOCAL and ANY,
>> in this case Scheduler waits
>> for *spark.locality.wait *3 seconds default. During this period,
>> scheduler waits to launch a data-local task before giving up and launching
>> it on a less-local node.
>>
>> So after making it 0, all tasks started parallel. But learned that it is
>> better not to reduce it to 0.
>>
>>
>> On Mon, Feb 1, 2016 at 2:02 PM, Prabhu Joseph <prabhujose.gates@gmail.com
>> > wrote:
>>
>>> Hi All,
>>>
>>>
>>> Sample Spark application which reads a logfile from hadoop (1.2GB - 5
>>> RDD's created each approx 250MB data) and there are two jobs. Job A gets
>>> the line with "a" and the Job B gets the line with "b". The spark
>>> application is ran multiple times, each time with
>>> different executor memory, and enable/disable cache() function. Job A
>>> performance is same in all the runs as it has to read the entire data first
>>> time from Disk.
>>>
>>> Spark Cluster - standalone mode with Spark Master, single worker node
>>> (12 cores, 16GB memory)
>>>
>>>     val logData = sc.textFile(logFile, 2)
>>>     var numAs = logData.filter(line => line.contains("a")).count()
>>>     var numBs = logData.filter(line => line.contains("b")).count()
>>>
>>>
>>> *Job B (which has 5 tasks) results below:*
>>>
>>> *Run 1:* 1 executor with 2GB memory, 12 cores took 2 seconds [ran1
>>> image]
>>>
>>>     Since logData is not cached, the job B has to again read the 1.2GB
>>> data from hadoop into memory and all the 5 tasks started parallel and each
>>> took 2 sec (29ms for GC) and the
>>>  overall job completed in 2 seconds.
>>>
>>> *Run 2:* 1 executor with 2GB memory, 12 cores and logData is cached
>>> took 4 seconds [ran2 image, ran2_cache image]
>>>
>>>      val logData = sc.textFile(logFile, 2).cache()
>>>
>>>      The Executor does not have enough memory to cache and hence again
>>> needs to read the entire 1.2GB data from hadoop into memory.  But since the
>>> cache() is used, leads to lot of GC pause leading to slowness in task
>>> completion. Each task started parallel and
>>> completed in 4 seconds (more than 1 sec for GC).
>>>
>>> *Run 3: 1 executor with 6GB memory, 12 cores and logData is cached took
>>> 10 seconds [ran3 image]*
>>>
>>>      The Executor has memory that can fit 4 RDD partitions into memory
>>> but 5th RDD it has to read from Hadoop. 4 tasks are started parallel and
>>> they completed in 0.3 seconds without GC. But the 5th task which has to
>>> read RDD from disk is started after 4 seconds, and gets completed in 2
>>> seconds. Analysing why the 5th task is not started parallel with other
>>> tasks or at least why it is not started immediately after the other task
>>> completion.
>>>
>>> *Run 4:* 1 executor with 16GB memory , 12 cores and logData is cached
>>> took 0.3 seconds [ran4 image]
>>>
>>>      The executor has enough memory to cache all the 5 RDD. All 5 tasks
>>> are started in parallel and gets completed within 0.3 seconds.
>>>
>>>
>>> So Spark performs well when entire input data is in Memory or None. In
>>> case of some RDD in memory and some from disk, there is a delay in
>>> scheduling the fifth task, is it a expected behavior or a possible Bug.
>>>
>>>
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>>
>>>
>>>
>>
>

Re: Spark job does not perform well when some RDD in memory and some on Disk

Posted by Prabhu Joseph <pr...@gmail.com>.
If spark.locality.wait is 0, then there are two performance issues:

   1. Task Scheduler won't wait to schedule the tasks as DATA_LOCAL, will
launch it immediately on some node even if it is less local. The
probability of tasks running as less local will be higher
and affect the overall Job Performance.
      2. In case of Executor having not enough heap memory, some tasks
which has RDD on cache and some other has on hadoop, and if
spark.locality.wait is 0, all the tasks will starts parallel and since the
Executor Process is both Memory and IO intensive, the GC will be high and
tasks will be slower.














On Thu, Feb 4, 2016 at 5:13 PM, Alonso Isidoro Roman <al...@gmail.com>
wrote:

> "But learned that it is better not to reduce it to 0."
>
> could you explain a bit more this sentence?
>
> thanks
>
> Alonso Isidoro Roman.
>
> Mis citas preferidas (de hoy) :
> "Si depurar es el proceso de quitar los errores de software, entonces
> programar debe ser el proceso de introducirlos..."
>  -  Edsger Dijkstra
>
> My favorite quotes (today):
> "If debugging is the process of removing software bugs, then programming
> must be the process of putting ..."
>   - Edsger Dijkstra
>
> "If you pay peanuts you get monkeys"
>
>
> 2016-02-04 11:33 GMT+01:00 Prabhu Joseph <pr...@gmail.com>:
>
>> Okay, the reason for the task delay within executor when some RDD in
>> memory and some in Hadoop i.e, Multiple Locality Levels NODE_LOCAL and ANY,
>> in this case Scheduler waits
>> for *spark.locality.wait *3 seconds default. During this period,
>> scheduler waits to launch a data-local task before giving up and launching
>> it on a less-local node.
>>
>> So after making it 0, all tasks started parallel. But learned that it is
>> better not to reduce it to 0.
>>
>>
>> On Mon, Feb 1, 2016 at 2:02 PM, Prabhu Joseph <prabhujose.gates@gmail.com
>> > wrote:
>>
>>> Hi All,
>>>
>>>
>>> Sample Spark application which reads a logfile from hadoop (1.2GB - 5
>>> RDD's created each approx 250MB data) and there are two jobs. Job A gets
>>> the line with "a" and the Job B gets the line with "b". The spark
>>> application is ran multiple times, each time with
>>> different executor memory, and enable/disable cache() function. Job A
>>> performance is same in all the runs as it has to read the entire data first
>>> time from Disk.
>>>
>>> Spark Cluster - standalone mode with Spark Master, single worker node
>>> (12 cores, 16GB memory)
>>>
>>>     val logData = sc.textFile(logFile, 2)
>>>     var numAs = logData.filter(line => line.contains("a")).count()
>>>     var numBs = logData.filter(line => line.contains("b")).count()
>>>
>>>
>>> *Job B (which has 5 tasks) results below:*
>>>
>>> *Run 1:* 1 executor with 2GB memory, 12 cores took 2 seconds [ran1
>>> image]
>>>
>>>     Since logData is not cached, the job B has to again read the 1.2GB
>>> data from hadoop into memory and all the 5 tasks started parallel and each
>>> took 2 sec (29ms for GC) and the
>>>  overall job completed in 2 seconds.
>>>
>>> *Run 2:* 1 executor with 2GB memory, 12 cores and logData is cached
>>> took 4 seconds [ran2 image, ran2_cache image]
>>>
>>>      val logData = sc.textFile(logFile, 2).cache()
>>>
>>>      The Executor does not have enough memory to cache and hence again
>>> needs to read the entire 1.2GB data from hadoop into memory.  But since the
>>> cache() is used, leads to lot of GC pause leading to slowness in task
>>> completion. Each task started parallel and
>>> completed in 4 seconds (more than 1 sec for GC).
>>>
>>> *Run 3: 1 executor with 6GB memory, 12 cores and logData is cached took
>>> 10 seconds [ran3 image]*
>>>
>>>      The Executor has memory that can fit 4 RDD partitions into memory
>>> but 5th RDD it has to read from Hadoop. 4 tasks are started parallel and
>>> they completed in 0.3 seconds without GC. But the 5th task which has to
>>> read RDD from disk is started after 4 seconds, and gets completed in 2
>>> seconds. Analysing why the 5th task is not started parallel with other
>>> tasks or at least why it is not started immediately after the other task
>>> completion.
>>>
>>> *Run 4:* 1 executor with 16GB memory , 12 cores and logData is cached
>>> took 0.3 seconds [ran4 image]
>>>
>>>      The executor has enough memory to cache all the 5 RDD. All 5 tasks
>>> are started in parallel and gets completed within 0.3 seconds.
>>>
>>>
>>> So Spark performs well when entire input data is in Memory or None. In
>>> case of some RDD in memory and some from disk, there is a delay in
>>> scheduling the fifth task, is it a expected behavior or a possible Bug.
>>>
>>>
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>>
>>>
>>>
>>
>

Re: Spark job does not perform well when some RDD in memory and some on Disk

Posted by Alonso Isidoro Roman <al...@gmail.com>.
"But learned that it is better not to reduce it to 0."

could you explain a bit more this sentence?

thanks

Alonso Isidoro Roman.

Mis citas preferidas (de hoy) :
"Si depurar es el proceso de quitar los errores de software, entonces
programar debe ser el proceso de introducirlos..."
 -  Edsger Dijkstra

My favorite quotes (today):
"If debugging is the process of removing software bugs, then programming
must be the process of putting ..."
  - Edsger Dijkstra

"If you pay peanuts you get monkeys"


2016-02-04 11:33 GMT+01:00 Prabhu Joseph <pr...@gmail.com>:

> Okay, the reason for the task delay within executor when some RDD in
> memory and some in Hadoop i.e, Multiple Locality Levels NODE_LOCAL and ANY,
> in this case Scheduler waits
> for *spark.locality.wait *3 seconds default. During this period,
> scheduler waits to launch a data-local task before giving up and launching
> it on a less-local node.
>
> So after making it 0, all tasks started parallel. But learned that it is
> better not to reduce it to 0.
>
>
> On Mon, Feb 1, 2016 at 2:02 PM, Prabhu Joseph <pr...@gmail.com>
> wrote:
>
>> Hi All,
>>
>>
>> Sample Spark application which reads a logfile from hadoop (1.2GB - 5
>> RDD's created each approx 250MB data) and there are two jobs. Job A gets
>> the line with "a" and the Job B gets the line with "b". The spark
>> application is ran multiple times, each time with
>> different executor memory, and enable/disable cache() function. Job A
>> performance is same in all the runs as it has to read the entire data first
>> time from Disk.
>>
>> Spark Cluster - standalone mode with Spark Master, single worker node (12
>> cores, 16GB memory)
>>
>>     val logData = sc.textFile(logFile, 2)
>>     var numAs = logData.filter(line => line.contains("a")).count()
>>     var numBs = logData.filter(line => line.contains("b")).count()
>>
>>
>> *Job B (which has 5 tasks) results below:*
>>
>> *Run 1:* 1 executor with 2GB memory, 12 cores took 2 seconds [ran1 image]
>>
>>     Since logData is not cached, the job B has to again read the 1.2GB
>> data from hadoop into memory and all the 5 tasks started parallel and each
>> took 2 sec (29ms for GC) and the
>>  overall job completed in 2 seconds.
>>
>> *Run 2:* 1 executor with 2GB memory, 12 cores and logData is cached took
>> 4 seconds [ran2 image, ran2_cache image]
>>
>>      val logData = sc.textFile(logFile, 2).cache()
>>
>>      The Executor does not have enough memory to cache and hence again
>> needs to read the entire 1.2GB data from hadoop into memory.  But since the
>> cache() is used, leads to lot of GC pause leading to slowness in task
>> completion. Each task started parallel and
>> completed in 4 seconds (more than 1 sec for GC).
>>
>> *Run 3: 1 executor with 6GB memory, 12 cores and logData is cached took
>> 10 seconds [ran3 image]*
>>
>>      The Executor has memory that can fit 4 RDD partitions into memory
>> but 5th RDD it has to read from Hadoop. 4 tasks are started parallel and
>> they completed in 0.3 seconds without GC. But the 5th task which has to
>> read RDD from disk is started after 4 seconds, and gets completed in 2
>> seconds. Analysing why the 5th task is not started parallel with other
>> tasks or at least why it is not started immediately after the other task
>> completion.
>>
>> *Run 4:* 1 executor with 16GB memory , 12 cores and logData is cached
>> took 0.3 seconds [ran4 image]
>>
>>      The executor has enough memory to cache all the 5 RDD. All 5 tasks
>> are started in parallel and gets completed within 0.3 seconds.
>>
>>
>> So Spark performs well when entire input data is in Memory or None. In
>> case of some RDD in memory and some from disk, there is a delay in
>> scheduling the fifth task, is it a expected behavior or a possible Bug.
>>
>>
>>
>> Thanks,
>> Prabhu Joseph
>>
>>
>>
>>
>

Re: Spark job does not perform well when some RDD in memory and some on Disk

Posted by Prabhu Joseph <pr...@gmail.com>.
Okay, the reason for the task delay within executor when some RDD in memory
and some in Hadoop i.e, Multiple Locality Levels NODE_LOCAL and ANY, in
this case Scheduler waits
for *spark.locality.wait *3 seconds default. During this period, scheduler
waits to launch a data-local task before giving up and launching it on a
less-local node.

So after making it 0, all tasks started parallel. But learned that it is
better not to reduce it to 0.


On Mon, Feb 1, 2016 at 2:02 PM, Prabhu Joseph <pr...@gmail.com>
wrote:

> Hi All,
>
>
> Sample Spark application which reads a logfile from hadoop (1.2GB - 5
> RDD's created each approx 250MB data) and there are two jobs. Job A gets
> the line with "a" and the Job B gets the line with "b". The spark
> application is ran multiple times, each time with
> different executor memory, and enable/disable cache() function. Job A
> performance is same in all the runs as it has to read the entire data first
> time from Disk.
>
> Spark Cluster - standalone mode with Spark Master, single worker node (12
> cores, 16GB memory)
>
>     val logData = sc.textFile(logFile, 2)
>     var numAs = logData.filter(line => line.contains("a")).count()
>     var numBs = logData.filter(line => line.contains("b")).count()
>
>
> *Job B (which has 5 tasks) results below:*
>
> *Run 1:* 1 executor with 2GB memory, 12 cores took 2 seconds [ran1 image]
>
>     Since logData is not cached, the job B has to again read the 1.2GB
> data from hadoop into memory and all the 5 tasks started parallel and each
> took 2 sec (29ms for GC) and the
>  overall job completed in 2 seconds.
>
> *Run 2:* 1 executor with 2GB memory, 12 cores and logData is cached took
> 4 seconds [ran2 image, ran2_cache image]
>
>      val logData = sc.textFile(logFile, 2).cache()
>
>      The Executor does not have enough memory to cache and hence again
> needs to read the entire 1.2GB data from hadoop into memory.  But since the
> cache() is used, leads to lot of GC pause leading to slowness in task
> completion. Each task started parallel and
> completed in 4 seconds (more than 1 sec for GC).
>
> *Run 3: 1 executor with 6GB memory, 12 cores and logData is cached took 10
> seconds [ran3 image]*
>
>      The Executor has memory that can fit 4 RDD partitions into memory but
> 5th RDD it has to read from Hadoop. 4 tasks are started parallel and they
> completed in 0.3 seconds without GC. But the 5th task which has to read RDD
> from disk is started after 4 seconds, and gets completed in 2 seconds.
> Analysing why the 5th task is not started parallel with other tasks or at
> least why it is not started immediately after the other task completion.
>
> *Run 4:* 1 executor with 16GB memory , 12 cores and logData is cached
> took 0.3 seconds [ran4 image]
>
>      The executor has enough memory to cache all the 5 RDD. All 5 tasks
> are started in parallel and gets completed within 0.3 seconds.
>
>
> So Spark performs well when entire input data is in Memory or None. In
> case of some RDD in memory and some from disk, there is a delay in
> scheduling the fifth task, is it a expected behavior or a possible Bug.
>
>
>
> Thanks,
> Prabhu Joseph
>
>
>
>

Re: Spark job does not perform well when some RDD in memory and some on Disk

Posted by Prabhu Joseph <pr...@gmail.com>.
Okay, the reason for the task delay within executor when some RDD in memory
and some in Hadoop i.e, Multiple Locality Levels NODE_LOCAL and ANY, in
this case Scheduler waits
for *spark.locality.wait *3 seconds default. During this period, scheduler
waits to launch a data-local task before giving up and launching it on a
less-local node.

So after making it 0, all tasks started parallel. But learned that it is
better not to reduce it to 0.


On Mon, Feb 1, 2016 at 2:02 PM, Prabhu Joseph <pr...@gmail.com>
wrote:

> Hi All,
>
>
> Sample Spark application which reads a logfile from hadoop (1.2GB - 5
> RDD's created each approx 250MB data) and there are two jobs. Job A gets
> the line with "a" and the Job B gets the line with "b". The spark
> application is ran multiple times, each time with
> different executor memory, and enable/disable cache() function. Job A
> performance is same in all the runs as it has to read the entire data first
> time from Disk.
>
> Spark Cluster - standalone mode with Spark Master, single worker node (12
> cores, 16GB memory)
>
>     val logData = sc.textFile(logFile, 2)
>     var numAs = logData.filter(line => line.contains("a")).count()
>     var numBs = logData.filter(line => line.contains("b")).count()
>
>
> *Job B (which has 5 tasks) results below:*
>
> *Run 1:* 1 executor with 2GB memory, 12 cores took 2 seconds [ran1 image]
>
>     Since logData is not cached, the job B has to again read the 1.2GB
> data from hadoop into memory and all the 5 tasks started parallel and each
> took 2 sec (29ms for GC) and the
>  overall job completed in 2 seconds.
>
> *Run 2:* 1 executor with 2GB memory, 12 cores and logData is cached took
> 4 seconds [ran2 image, ran2_cache image]
>
>      val logData = sc.textFile(logFile, 2).cache()
>
>      The Executor does not have enough memory to cache and hence again
> needs to read the entire 1.2GB data from hadoop into memory.  But since the
> cache() is used, leads to lot of GC pause leading to slowness in task
> completion. Each task started parallel and
> completed in 4 seconds (more than 1 sec for GC).
>
> *Run 3: 1 executor with 6GB memory, 12 cores and logData is cached took 10
> seconds [ran3 image]*
>
>      The Executor has memory that can fit 4 RDD partitions into memory but
> 5th RDD it has to read from Hadoop. 4 tasks are started parallel and they
> completed in 0.3 seconds without GC. But the 5th task which has to read RDD
> from disk is started after 4 seconds, and gets completed in 2 seconds.
> Analysing why the 5th task is not started parallel with other tasks or at
> least why it is not started immediately after the other task completion.
>
> *Run 4:* 1 executor with 16GB memory , 12 cores and logData is cached
> took 0.3 seconds [ran4 image]
>
>      The executor has enough memory to cache all the 5 RDD. All 5 tasks
> are started in parallel and gets completed within 0.3 seconds.
>
>
> So Spark performs well when entire input data is in Memory or None. In
> case of some RDD in memory and some from disk, there is a delay in
> scheduling the fifth task, is it a expected behavior or a possible Bug.
>
>
>
> Thanks,
> Prabhu Joseph
>
>
>
>