You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gerard Maas <ge...@gmail.com> on 2015/10/06 18:45:25 UTC

Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Hi,

We recently migrated our streaming jobs to the direct kafka receiver. Our
initial migration went quite fine but now we are seeing a weird zig-zag
performance pattern we cannot explain.
In alternating fashion, one task takes about 1 second to finish and the
next takes 7sec for a stable streaming rate.

Here are comparable metrics for two successive tasks:
*Slow*:


​

Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s303
20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s11011
20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549 s10010
*Fast*:

​

Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s404
20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s909
20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s11011
We have some custom metrics that measure wall-clock time of execution of
certain blocks of the job, like the time it takes to do the local
computations (RDD.foreachPartition closure) vs total time.
The difference between the slow and fast executing task is on the 'spark
computation time' which is wall-clock for the task scheduling
(DStream.foreachRDD closure)

e.g.
Slow task:

local computation time: 347.60968499999996, *spark computation time: 6930*,
metric collection: 70, total process: 7000, total_records: 4297

Fast task:
local computation time: 281.539042,* spark computation time: 263*, metric
collection: 138, total process: 401, total_records: 5002

We are currently running Spark 1.4.1. The load and the work to be done is
stable -this is on a dev env with that stuff under control.

Any ideas what this behavior could be?

thanks in advance,  Gerard.

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Posted by Jeff Nadler <jn...@srcginc.com>.
Gerard - any chance this is related to task locality waiting?    Can you
try (just as a diagnostic) something like this, does the unexpected delay
go away?

.set("spark.locality.wait", "0")


On Tue, Oct 6, 2015 at 12:00 PM, Gerard Maas <ge...@gmail.com> wrote:

> Hi Cody,
>
> The job is doing ETL from Kafka records to Cassandra. After a
> single filtering stage on Spark, the 'TL' part is done using the
> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>
> We have metrics on the executor work which we collect and add together,
> indicated here by 'local computation'.  As you can see, we also measure how
> much it cost us to measure :-)
> See how 'local work'  times are comparable.  What's not visible is the
> task scheduling and consuming the data from Kafka which becomes part of the
> 'spark computation' part.
>
> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>
> Are there metrics available somehow on the Kafka reading time?
>
> Slow TaskFast Tasklocal computation347.6281.53spark computation6930263metric
> collection70138wall clock process7000401total records processed42975002
>
> (time in ms)
>
> kr, Gerard.
>
>
> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <co...@koeninger.org> wrote:
>
>> Can you say anything more about what the job is doing?
>>
>> First thing I'd do is try to get some metrics on the time taken by your
>> code on the executors (e.g. when processing the iterator) to see if it's
>> consistent between the two situations.
>>
>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>> Our initial migration went quite fine but now we are seeing a weird zig-zag
>>> performance pattern we cannot explain.
>>> In alternating fashion, one task takes about 1 second to finish and the
>>> next takes 7sec for a stable streaming rate.
>>>
>>> Here are comparable metrics for two successive tasks:
>>> *Slow*:
>>>
>>>
>>> ​
>>>
>>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s30
>>> 320151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s
>>> 1101120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549
>>> s10010
>>> *Fast*:
>>>
>>> ​
>>>
>>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s4
>>> 0420151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s9
>>> 0920151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s
>>> 11011
>>> We have some custom metrics that measure wall-clock time of execution of
>>> certain blocks of the job, like the time it takes to do the local
>>> computations (RDD.foreachPartition closure) vs total time.
>>> The difference between the slow and fast executing task is on the 'spark
>>> computation time' which is wall-clock for the task scheduling
>>> (DStream.foreachRDD closure)
>>>
>>> e.g.
>>> Slow task:
>>>
>>> local computation time: 347.60968499999996, *spark computation time:
>>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>>
>>> Fast task:
>>> local computation time: 281.539042,* spark computation time: 263*,
>>> metric collection: 138, total process: 401, total_records: 5002
>>>
>>> We are currently running Spark 1.4.1. The load and the work to be done
>>> is stable -this is on a dev env with that stuff under control.
>>>
>>> Any ideas what this behavior could be?
>>>
>>> thanks in advance,  Gerard.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Posted by Cody Koeninger <co...@koeninger.org>.
When you say that the largest difference is from metrics.collect, how are
you measuring that? Wouldn't that be the difference between
max(partitionT1) and sparkT1, not sparkT0 and sparkT1?

As for further places to look, what's happening in the logs during that
time?  Are the number of messages per partition roughly equal (you should
be able to see that in the log line I mentioned earlier)?  Is there a
reason one of your executors is handling half as many tasks as the others?

On Wed, Oct 7, 2015 at 5:19 AM, Gerard Maas <ge...@gmail.com> wrote:

> Thanks for the feedback.
>
> Cassandra does not seem to be the issue. The time for writing to Cassandra
> is in the same order of magnitude (see below)
>
> The code structure is roughly as follows:
>
> dstream.filter(pred).foreachRDD{rdd =>
>   val sparkT0 = currentTimeMs
>   val metrics = rdd.mapPartitions{partition =>
>      val partitionT0 = currentTimeMs
>       partition.foreach{ transform andThen storeInCassandra _}
>      val partitionT1 = currentTimeMs
>      Seq(Metric( "local time", executor, partitionT1 - partitionT0,
> records)).iterator
>   }
>   //materialize the rdd
>   val allMetrics = metrics.collect()
>   val sparkT1 = currentTimeMs
>   val totalizedMetrics = // group by and reduce with sum
>   val sparkT2 = currentTimeMs
>   totalizedMetrics.foreach{ metric => gmetric.report(metric)}
> }
>
> Relating this code with the time table presented before (time in ms):
>
> How measured?Slow TaskFast Taskexecutor local totalizedMetrics347.6281.53spark
> computationsparkT1 - sparkT06930263metric collectionsparkT2 - sparkT170138wall
> clock processsparkT2 - sparkT07000401total records processed
> totalizedMetrics42975002
>
> What we observe is that the largest difference comes from the
> materialization of the RDD. This pattern repeats cyclically one on, one off.
>
> Any ideas where to further look?
>
> kr, Gerard.
>
>
> On Wed, Oct 7, 2015 at 1:33 AM, Tathagata Das <td...@databricks.com> wrote:
>
>> Good point!
>>
>> On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> I agree getting cassandra out of the picture is a good first step.
>>>
>>> But if you just do foreachRDD { _.count } recent versions of direct
>>> stream shouldn't do any work at all on the executor (since the number of
>>> messages in the rdd is known already)
>>>
>>> do a foreachPartition and println or count the iterator manually.
>>>
>>> On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das <td...@databricks.com>
>>> wrote:
>>>
>>>> Are sure that this is not related to Cassandra inserts? Could you just
>>>> do foreachRDD { _.count } instead  to keep Cassandra out of the picture and
>>>> then test this agian.
>>>>
>>>> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <at...@adobe.com>
>>>> wrote:
>>>>
>>>>> Also check if the Kafka cluster is still balanced. Maybe one of the
>>>>> brokers manages too many partitions, all the work will stay on that
>>>>> executor unless you repartition right after kakfka (and I'm not saying you
>>>>> should).
>>>>>
>>>>> Sent from my iPhone
>>>>>
>>>>> On 06 Oct 2015, at 22:17, Cody Koeninger <co...@koeninger.org> wrote:
>>>>>
>>>>> I'm not clear on what you're measuring.  Can you post relevant code
>>>>> snippets including the measurement code?
>>>>>
>>>>> As far as kafka metrics, nothing currently.  There is an info-level
>>>>> log message every time a kafka rdd iterator is instantiated,
>>>>>
>>>>>     log.info(s"Computing topic ${part.topic}, partition
>>>>> ${part.partition} " +
>>>>>
>>>>>       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>>>>
>>>>>
>>>>> If you log once you're done with an iterator you should be able to see
>>>>> the delta.
>>>>>
>>>>> The other thing to try is reduce the number of parts involved in the
>>>>> job to isolate it ... first thing I'd do there is take cassandra out of the
>>>>> equation.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <ge...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Cody,
>>>>>>
>>>>>> The job is doing ETL from Kafka records to Cassandra. After a
>>>>>> single filtering stage on Spark, the 'TL' part is done using the
>>>>>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>>>>>
>>>>>> We have metrics on the executor work which we collect and add
>>>>>> together, indicated here by 'local computation'.  As you can see, we also
>>>>>> measure how much it cost us to measure :-)
>>>>>> See how 'local work'  times are comparable.  What's not visible is
>>>>>> the task scheduling and consuming the data from Kafka which becomes part of
>>>>>> the 'spark computation' part.
>>>>>>
>>>>>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>>>>>
>>>>>> Are there metrics available somehow on the Kafka reading time?
>>>>>>
>>>>>> Slow Task Fast Task local computation 347.6 281.53 spark computation
>>>>>> 6930 263 metric collection 70 138 wall clock process 7000 401 total
>>>>>> records processed 4297 5002
>>>>>>
>>>>>> (time in ms)
>>>>>>
>>>>>> kr, Gerard.
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <co...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Can you say anything more about what the job is doing?
>>>>>>>
>>>>>>> First thing I'd do is try to get some metrics on the time taken by
>>>>>>> your code on the executors (e.g. when processing the iterator) to see if
>>>>>>> it's consistent between the two situations.
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <ge...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> We recently migrated our streaming jobs to the direct kafka
>>>>>>>> receiver. Our initial migration went quite fine but now we are seeing a
>>>>>>>> weird zig-zag performance pattern we cannot explain.
>>>>>>>> In alternating fashion, one task takes about 1 second to finish and
>>>>>>>> the next takes 7sec for a stable streaming rate.
>>>>>>>>
>>>>>>>> Here are comparable metrics for two successive tasks:
>>>>>>>> *Slow*:
>>>>>>>>
>>>>>>>>
>>>>>>>> ​
>>>>>>>>
>>>>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded
>>>>>>>> Tasks 20151006-044141-2408867082-5050-21047-S0
>>>>>>>> dnode-3.hdfs.private:36863 22 s 3 0 3
>>>>>>>> 20151006-044141-2408867082-5050-21047-S1 dnode-0.hdfs.private:43812 40
>>>>>>>> s 11 0 11 20151006-044141-2408867082-5050-21047-S4
>>>>>>>> dnode-5.hdfs.private:59945 49 s 10 0 10
>>>>>>>> *Fast*:
>>>>>>>>
>>>>>>>> ​
>>>>>>>>
>>>>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded
>>>>>>>> Tasks 20151006-044141-2408867082-5050-21047-S0
>>>>>>>> dnode-3.hdfs.private:36863 0.6 s 4 0 4
>>>>>>>> 20151006-044141-2408867082-5050-21047-S1 dnode-0.hdfs.private:43812 1
>>>>>>>> s 9 0 9 20151006-044141-2408867082-5050-21047-S4
>>>>>>>> dnode-5.hdfs.private:59945 1 s 11 0 11
>>>>>>>> We have some custom metrics that measure wall-clock time of
>>>>>>>> execution of certain blocks of the job, like the time it takes to do the
>>>>>>>> local computations (RDD.foreachPartition closure) vs total time.
>>>>>>>> The difference between the slow and fast executing task is on the
>>>>>>>> 'spark computation time' which is wall-clock for the task scheduling
>>>>>>>> (DStream.foreachRDD closure)
>>>>>>>>
>>>>>>>> e.g.
>>>>>>>> Slow task:
>>>>>>>>
>>>>>>>> local computation time: 347.60968499999996, *spark computation
>>>>>>>> time: 6930*, metric collection: 70, total process:
>>>>>>>> 7000, total_records: 4297
>>>>>>>>
>>>>>>>> Fast task:
>>>>>>>> local computation time: 281.539042,* spark computation time: 263*,
>>>>>>>> metric collection: 138, total process: 401, total_records: 5002
>>>>>>>>
>>>>>>>> We are currently running Spark 1.4.1. The load and the work to be
>>>>>>>> done is stable -this is on a dev env with that stuff under control.
>>>>>>>>
>>>>>>>> Any ideas what this behavior could be?
>>>>>>>>
>>>>>>>> thanks in advance,  Gerard.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

RE: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Posted by "Goodall, Mark (UK)" <ma...@baesystems.com>.
I would like to say that I have also had this issue.

In two situations, one using Accumulo to store information and also when running multiple streaming jobs within the same streaming context (e.g. multiple save to hdfs). In my case the situation worsens when one of the jobs, which has a long slideduration executes. After this, all other jobs take longer to execute. The situation continues until the batches are too delayed and the system is unstable.


From: Gerard Maas [mailto:gerard.maas@gmail.com]
Sent: 07 October 2015 11:19
To: Tathagata Das
Cc: Cody Koeninger; Adrian Tanase; spark users
Subject: Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka


*** WARNING ***
This message originates from outside our organisation, either from an external partner or the internet.
Consider carefully whether you should click on any links, open any attachments or reply.
For information regarding Red Flags that you can look out for in emails you receive, click here<http://intranet.ent.baesystems.com/howwework/security/spotlights/Documents/Red%20Flags.pdf>.
If you feel the email is suspicious, please follow this process<http://intranet.ent.baesystems.com/howwework/security/spotlights/Documents/Dealing%20With%20Suspicious%20Emails.pdf>.
Thanks for the feedback.

Cassandra does not seem to be the issue. The time for writing to Cassandra is in the same order of magnitude (see below)

The code structure is roughly as follows:

dstream.filter(pred).foreachRDD{rdd =>
  val sparkT0 = currentTimeMs
  val metrics = rdd.mapPartitions{partition =>
     val partitionT0 = currentTimeMs
      partition.foreach{ transform andThen storeInCassandra _}
     val partitionT1 = currentTimeMs
     Seq(Metric( "local time", executor, partitionT1 - partitionT0, records)).iterator
  }
  //materialize the rdd
  val allMetrics = metrics.collect()
  val sparkT1 = currentTimeMs
  val totalizedMetrics = // group by and reduce with sum
  val sparkT2 = currentTimeMs
  totalizedMetrics.foreach{ metric => gmetric.report(metric)}
}

Relating this code with the time table presented before (time in ms):


How measured?

Slow Task

Fast Task

executor local

totalizedMetrics

347.6

281.53

spark computation

sparkT1 - sparkT0

6930

263

metric collection

sparkT2 - sparkT1

70

138

wall clock process

sparkT2 - sparkT0

7000

401

total records processed

totalizedMetrics

4297

5002


What we observe is that the largest difference comes from the materialization of the RDD. This pattern repeats cyclically one on, one off.

Any ideas where to further look?

kr, Gerard.


On Wed, Oct 7, 2015 at 1:33 AM, Tathagata Das <td...@databricks.com>> wrote:
Good point!

On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger <co...@koeninger.org>> wrote:
I agree getting cassandra out of the picture is a good first step.

But if you just do foreachRDD { _.count } recent versions of direct stream shouldn't do any work at all on the executor (since the number of messages in the rdd is known already)

do a foreachPartition and println or count the iterator manually.

On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das <td...@databricks.com>> wrote:
Are sure that this is not related to Cassandra inserts? Could you just do foreachRDD { _.count } instead  to keep Cassandra out of the picture and then test this agian.

On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <at...@adobe.com>> wrote:
Also check if the Kafka cluster is still balanced. Maybe one of the brokers manages too many partitions, all the work will stay on that executor unless you repartition right after kakfka (and I'm not saying you should).

Sent from my iPhone

On 06 Oct 2015, at 22:17, Cody Koeninger <co...@koeninger.org>> wrote:
I'm not clear on what you're measuring.  Can you post relevant code snippets including the measurement code?

As far as kafka metrics, nothing currently.  There is an info-level log message every time a kafka rdd iterator is instantiated,


    log.info<http://log.info>(s"Computing topic ${part.topic}, partition ${part.partition} " +

      s"offsets ${part.fromOffset} -> ${part.untilOffset}")



If you log once you're done with an iterator you should be able to see the delta.

The other thing to try is reduce the number of parts involved in the job to isolate it ... first thing I'd do there is take cassandra out of the equation.



On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <ge...@gmail.com>> wrote:
Hi Cody,

The job is doing ETL from Kafka records to Cassandra. After a single filtering stage on Spark, the 'TL' part is done using the dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.

We have metrics on the executor work which we collect and add together, indicated here by 'local computation'.  As you can see, we also measure how much it cost us to measure :-)
See how 'local work'  times are comparable.  What's not visible is the task scheduling and consuming the data from Kafka which becomes part of the 'spark computation' part.

The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...

Are there metrics available somehow on the Kafka reading time?


Slow Task

Fast Task

local computation

347.6

281.53

spark computation

6930

263

metric collection

70

138

wall clock process

7000

401

total records processed

4297

5002


(time in ms)

kr, Gerard.


On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <co...@koeninger.org>> wrote:
Can you say anything more about what the job is doing?

First thing I'd do is try to get some metrics on the time taken by your code on the executors (e.g. when processing the iterator) to see if it's consistent between the two situations.

On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <ge...@gmail.com>> wrote:
Hi,

We recently migrated our streaming jobs to the direct kafka receiver. Our initial migration went quite fine but now we are seeing a weird zig-zag performance pattern we cannot explain.
In alternating fashion, one task takes about 1 second to finish and the next takes 7sec for a stable streaming rate.

Here are comparable metrics for two successive tasks:
Slow:

[cid:image001.jpg@01D10113.2392E190]
​

Executor ID

Address

Task Time

Total Tasks

Failed Tasks

Succeeded Tasks

20151006-044141-2408867082-5050-21047-S0

dnode-3.hdfs.private:36863

22 s

3

0

3

20151006-044141-2408867082-5050-21047-S1

dnode-0.hdfs.private:43812

40 s

11

0

11

20151006-044141-2408867082-5050-21047-S4

dnode-5.hdfs.private:59945

49 s

10

0

10

Fast:
[cid:image002.jpg@01D10113.2392E190]
​

Executor ID

Address

Task Time

Total Tasks

Failed Tasks

Succeeded Tasks

20151006-044141-2408867082-5050-21047-S0

dnode-3.hdfs.private:36863

0.6 s

4

0

4

20151006-044141-2408867082-5050-21047-S1

dnode-0.hdfs.private:43812

1 s

9

0

9

20151006-044141-2408867082-5050-21047-S4

dnode-5.hdfs.private:59945

1 s

11

0

11

We have some custom metrics that measure wall-clock time of execution of certain blocks of the job, like the time it takes to do the local computations (RDD.foreachPartition closure) vs total time.
The difference between the slow and fast executing task is on the 'spark computation time' which is wall-clock for the task scheduling (DStream.foreachRDD closure)

e.g.
Slow task:

local computation time: 347.60968499999996, spark computation time: 6930, metric collection: 70, total process: 7000, total_records: 4297

Fast task:
local computation time: 281.539042, spark computation time: 263, metric collection: 138, total process: 401, total_records: 5002

We are currently running Spark 1.4.1. The load and the work to be done is stable -this is on a dev env with that stuff under control.

Any ideas what this behavior could be?

thanks in advance,  Gerard.













********************************************************************
This email and any attachments are confidential to the intended
recipient and may also be privileged. If you are not the intended
recipient please delete it from your system and notify the sender.
You should not copy it or use it for any purpose nor disclose or
distribute its contents to any other person.
********************************************************************

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Posted by Gerard Maas <ge...@gmail.com>.
Thanks for the feedback.

Cassandra does not seem to be the issue. The time for writing to Cassandra
is in the same order of magnitude (see below)

The code structure is roughly as follows:

dstream.filter(pred).foreachRDD{rdd =>
  val sparkT0 = currentTimeMs
  val metrics = rdd.mapPartitions{partition =>
     val partitionT0 = currentTimeMs
      partition.foreach{ transform andThen storeInCassandra _}
     val partitionT1 = currentTimeMs
     Seq(Metric( "local time", executor, partitionT1 - partitionT0,
records)).iterator
  }
  //materialize the rdd
  val allMetrics = metrics.collect()
  val sparkT1 = currentTimeMs
  val totalizedMetrics = // group by and reduce with sum
  val sparkT2 = currentTimeMs
  totalizedMetrics.foreach{ metric => gmetric.report(metric)}
}

Relating this code with the time table presented before (time in ms):

How measured?Slow TaskFast Taskexecutor local totalizedMetrics347.6281.53spark
computationsparkT1 - sparkT06930263metric collectionsparkT2 - sparkT170138wall
clock processsparkT2 - sparkT07000401total records processedtotalizedMetrics
42975002

What we observe is that the largest difference comes from the
materialization of the RDD. This pattern repeats cyclically one on, one off.

Any ideas where to further look?

kr, Gerard.


On Wed, Oct 7, 2015 at 1:33 AM, Tathagata Das <td...@databricks.com> wrote:

> Good point!
>
> On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger <co...@koeninger.org> wrote:
>
>> I agree getting cassandra out of the picture is a good first step.
>>
>> But if you just do foreachRDD { _.count } recent versions of direct
>> stream shouldn't do any work at all on the executor (since the number of
>> messages in the rdd is known already)
>>
>> do a foreachPartition and println or count the iterator manually.
>>
>> On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> Are sure that this is not related to Cassandra inserts? Could you just
>>> do foreachRDD { _.count } instead  to keep Cassandra out of the picture and
>>> then test this agian.
>>>
>>> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <at...@adobe.com>
>>> wrote:
>>>
>>>> Also check if the Kafka cluster is still balanced. Maybe one of the
>>>> brokers manages too many partitions, all the work will stay on that
>>>> executor unless you repartition right after kakfka (and I'm not saying you
>>>> should).
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On 06 Oct 2015, at 22:17, Cody Koeninger <co...@koeninger.org> wrote:
>>>>
>>>> I'm not clear on what you're measuring.  Can you post relevant code
>>>> snippets including the measurement code?
>>>>
>>>> As far as kafka metrics, nothing currently.  There is an info-level log
>>>> message every time a kafka rdd iterator is instantiated,
>>>>
>>>>     log.info(s"Computing topic ${part.topic}, partition
>>>> ${part.partition} " +
>>>>
>>>>       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>>>
>>>>
>>>> If you log once you're done with an iterator you should be able to see
>>>> the delta.
>>>>
>>>> The other thing to try is reduce the number of parts involved in the
>>>> job to isolate it ... first thing I'd do there is take cassandra out of the
>>>> equation.
>>>>
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <ge...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Cody,
>>>>>
>>>>> The job is doing ETL from Kafka records to Cassandra. After a
>>>>> single filtering stage on Spark, the 'TL' part is done using the
>>>>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>>>>
>>>>> We have metrics on the executor work which we collect and add
>>>>> together, indicated here by 'local computation'.  As you can see, we also
>>>>> measure how much it cost us to measure :-)
>>>>> See how 'local work'  times are comparable.  What's not visible is the
>>>>> task scheduling and consuming the data from Kafka which becomes part of the
>>>>> 'spark computation' part.
>>>>>
>>>>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>>>>
>>>>> Are there metrics available somehow on the Kafka reading time?
>>>>>
>>>>> Slow Task Fast Task local computation 347.6 281.53 spark computation
>>>>> 6930 263 metric collection 70 138 wall clock process 7000 401 total
>>>>> records processed 4297 5002
>>>>>
>>>>> (time in ms)
>>>>>
>>>>> kr, Gerard.
>>>>>
>>>>>
>>>>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> Can you say anything more about what the job is doing?
>>>>>>
>>>>>> First thing I'd do is try to get some metrics on the time taken by
>>>>>> your code on the executors (e.g. when processing the iterator) to see if
>>>>>> it's consistent between the two situations.
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <ge...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We recently migrated our streaming jobs to the direct kafka
>>>>>>> receiver. Our initial migration went quite fine but now we are seeing a
>>>>>>> weird zig-zag performance pattern we cannot explain.
>>>>>>> In alternating fashion, one task takes about 1 second to finish and
>>>>>>> the next takes 7sec for a stable streaming rate.
>>>>>>>
>>>>>>> Here are comparable metrics for two successive tasks:
>>>>>>> *Slow*:
>>>>>>>
>>>>>>>
>>>>>>> ​
>>>>>>>
>>>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded
>>>>>>> Tasks 20151006-044141-2408867082-5050-21047-S0
>>>>>>> dnode-3.hdfs.private:36863 22 s 3 0 3
>>>>>>> 20151006-044141-2408867082-5050-21047-S1 dnode-0.hdfs.private:43812 40
>>>>>>> s 11 0 11 20151006-044141-2408867082-5050-21047-S4
>>>>>>> dnode-5.hdfs.private:59945 49 s 10 0 10
>>>>>>> *Fast*:
>>>>>>>
>>>>>>> ​
>>>>>>>
>>>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded
>>>>>>> Tasks 20151006-044141-2408867082-5050-21047-S0
>>>>>>> dnode-3.hdfs.private:36863 0.6 s 4 0 4
>>>>>>> 20151006-044141-2408867082-5050-21047-S1 dnode-0.hdfs.private:43812 1
>>>>>>> s 9 0 9 20151006-044141-2408867082-5050-21047-S4
>>>>>>> dnode-5.hdfs.private:59945 1 s 11 0 11
>>>>>>> We have some custom metrics that measure wall-clock time of
>>>>>>> execution of certain blocks of the job, like the time it takes to do the
>>>>>>> local computations (RDD.foreachPartition closure) vs total time.
>>>>>>> The difference between the slow and fast executing task is on the
>>>>>>> 'spark computation time' which is wall-clock for the task scheduling
>>>>>>> (DStream.foreachRDD closure)
>>>>>>>
>>>>>>> e.g.
>>>>>>> Slow task:
>>>>>>>
>>>>>>> local computation time: 347.60968499999996, *spark computation
>>>>>>> time: 6930*, metric collection: 70, total process:
>>>>>>> 7000, total_records: 4297
>>>>>>>
>>>>>>> Fast task:
>>>>>>> local computation time: 281.539042,* spark computation time: 263*,
>>>>>>> metric collection: 138, total process: 401, total_records: 5002
>>>>>>>
>>>>>>> We are currently running Spark 1.4.1. The load and the work to be
>>>>>>> done is stable -this is on a dev env with that stuff under control.
>>>>>>>
>>>>>>> Any ideas what this behavior could be?
>>>>>>>
>>>>>>> thanks in advance,  Gerard.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Posted by Tathagata Das <td...@databricks.com>.
Good point!

On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger <co...@koeninger.org> wrote:

> I agree getting cassandra out of the picture is a good first step.
>
> But if you just do foreachRDD { _.count } recent versions of direct stream
> shouldn't do any work at all on the executor (since the number of messages
> in the rdd is known already)
>
> do a foreachPartition and println or count the iterator manually.
>
> On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das <td...@databricks.com> wrote:
>
>> Are sure that this is not related to Cassandra inserts? Could you just do
>> foreachRDD { _.count } instead  to keep Cassandra out of the picture and
>> then test this agian.
>>
>> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <at...@adobe.com> wrote:
>>
>>> Also check if the Kafka cluster is still balanced. Maybe one of the
>>> brokers manages too many partitions, all the work will stay on that
>>> executor unless you repartition right after kakfka (and I'm not saying you
>>> should).
>>>
>>> Sent from my iPhone
>>>
>>> On 06 Oct 2015, at 22:17, Cody Koeninger <co...@koeninger.org> wrote:
>>>
>>> I'm not clear on what you're measuring.  Can you post relevant code
>>> snippets including the measurement code?
>>>
>>> As far as kafka metrics, nothing currently.  There is an info-level log
>>> message every time a kafka rdd iterator is instantiated,
>>>
>>>     log.info(s"Computing topic ${part.topic}, partition
>>> ${part.partition} " +
>>>
>>>       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>>
>>>
>>> If you log once you're done with an iterator you should be able to see
>>> the delta.
>>>
>>> The other thing to try is reduce the number of parts involved in the job
>>> to isolate it ... first thing I'd do there is take cassandra out of the
>>> equation.
>>>
>>>
>>>
>>> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <ge...@gmail.com>
>>> wrote:
>>>
>>>> Hi Cody,
>>>>
>>>> The job is doing ETL from Kafka records to Cassandra. After a
>>>> single filtering stage on Spark, the 'TL' part is done using the
>>>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>>>
>>>> We have metrics on the executor work which we collect and add together,
>>>> indicated here by 'local computation'.  As you can see, we also measure how
>>>> much it cost us to measure :-)
>>>> See how 'local work'  times are comparable.  What's not visible is the
>>>> task scheduling and consuming the data from Kafka which becomes part of the
>>>> 'spark computation' part.
>>>>
>>>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>>>
>>>> Are there metrics available somehow on the Kafka reading time?
>>>>
>>>> Slow Task Fast Task local computation 347.6 281.53 spark computation
>>>> 6930 263 metric collection 70 138 wall clock process 7000 401 total
>>>> records processed 4297 5002
>>>>
>>>> (time in ms)
>>>>
>>>> kr, Gerard.
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Can you say anything more about what the job is doing?
>>>>>
>>>>> First thing I'd do is try to get some metrics on the time taken by
>>>>> your code on the executors (e.g. when processing the iterator) to see if
>>>>> it's consistent between the two situations.
>>>>>
>>>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <ge...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>>>>> Our initial migration went quite fine but now we are seeing a weird zig-zag
>>>>>> performance pattern we cannot explain.
>>>>>> In alternating fashion, one task takes about 1 second to finish and
>>>>>> the next takes 7sec for a stable streaming rate.
>>>>>>
>>>>>> Here are comparable metrics for two successive tasks:
>>>>>> *Slow*:
>>>>>>
>>>>>>
>>>>>> ​
>>>>>>
>>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded
>>>>>> Tasks 20151006-044141-2408867082-5050-21047-S0
>>>>>> dnode-3.hdfs.private:36863 22 s 3 0 3
>>>>>> 20151006-044141-2408867082-5050-21047-S1 dnode-0.hdfs.private:43812 40
>>>>>> s 11 0 11 20151006-044141-2408867082-5050-21047-S4
>>>>>> dnode-5.hdfs.private:59945 49 s 10 0 10
>>>>>> *Fast*:
>>>>>>
>>>>>> ​
>>>>>>
>>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded
>>>>>> Tasks 20151006-044141-2408867082-5050-21047-S0
>>>>>> dnode-3.hdfs.private:36863 0.6 s 4 0 4
>>>>>> 20151006-044141-2408867082-5050-21047-S1 dnode-0.hdfs.private:43812 1
>>>>>> s 9 0 9 20151006-044141-2408867082-5050-21047-S4
>>>>>> dnode-5.hdfs.private:59945 1 s 11 0 11
>>>>>> We have some custom metrics that measure wall-clock time of execution
>>>>>> of certain blocks of the job, like the time it takes to do the local
>>>>>> computations (RDD.foreachPartition closure) vs total time.
>>>>>> The difference between the slow and fast executing task is on the
>>>>>> 'spark computation time' which is wall-clock for the task scheduling
>>>>>> (DStream.foreachRDD closure)
>>>>>>
>>>>>> e.g.
>>>>>> Slow task:
>>>>>>
>>>>>> local computation time: 347.60968499999996, *spark computation time:
>>>>>> 6930*, metric collection: 70, total process: 7000, total_records:
>>>>>> 4297
>>>>>>
>>>>>> Fast task:
>>>>>> local computation time: 281.539042,* spark computation time: 263*,
>>>>>> metric collection: 138, total process: 401, total_records: 5002
>>>>>>
>>>>>> We are currently running Spark 1.4.1. The load and the work to be
>>>>>> done is stable -this is on a dev env with that stuff under control.
>>>>>>
>>>>>> Any ideas what this behavior could be?
>>>>>>
>>>>>> thanks in advance,  Gerard.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Posted by Cody Koeninger <co...@koeninger.org>.
I agree getting cassandra out of the picture is a good first step.

But if you just do foreachRDD { _.count } recent versions of direct stream
shouldn't do any work at all on the executor (since the number of messages
in the rdd is known already)

do a foreachPartition and println or count the iterator manually.

On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das <td...@databricks.com> wrote:

> Are sure that this is not related to Cassandra inserts? Could you just do
> foreachRDD { _.count } instead  to keep Cassandra out of the picture and
> then test this agian.
>
> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <at...@adobe.com> wrote:
>
>> Also check if the Kafka cluster is still balanced. Maybe one of the
>> brokers manages too many partitions, all the work will stay on that
>> executor unless you repartition right after kakfka (and I'm not saying you
>> should).
>>
>> Sent from my iPhone
>>
>> On 06 Oct 2015, at 22:17, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> I'm not clear on what you're measuring.  Can you post relevant code
>> snippets including the measurement code?
>>
>> As far as kafka metrics, nothing currently.  There is an info-level log
>> message every time a kafka rdd iterator is instantiated,
>>
>>     log.info(s"Computing topic ${part.topic}, partition
>> ${part.partition} " +
>>
>>       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>
>>
>> If you log once you're done with an iterator you should be able to see
>> the delta.
>>
>> The other thing to try is reduce the number of parts involved in the job
>> to isolate it ... first thing I'd do there is take cassandra out of the
>> equation.
>>
>>
>>
>> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> Hi Cody,
>>>
>>> The job is doing ETL from Kafka records to Cassandra. After a
>>> single filtering stage on Spark, the 'TL' part is done using the
>>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>>
>>> We have metrics on the executor work which we collect and add together,
>>> indicated here by 'local computation'.  As you can see, we also measure how
>>> much it cost us to measure :-)
>>> See how 'local work'  times are comparable.  What's not visible is the
>>> task scheduling and consuming the data from Kafka which becomes part of the
>>> 'spark computation' part.
>>>
>>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>>
>>> Are there metrics available somehow on the Kafka reading time?
>>>
>>> Slow Task Fast Task local computation 347.6 281.53 spark computation
>>> 6930 263 metric collection 70 138 wall clock process 7000 401 total
>>> records processed 4297 5002
>>>
>>> (time in ms)
>>>
>>> kr, Gerard.
>>>
>>>
>>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> Can you say anything more about what the job is doing?
>>>>
>>>> First thing I'd do is try to get some metrics on the time taken by your
>>>> code on the executors (e.g. when processing the iterator) to see if it's
>>>> consistent between the two situations.
>>>>
>>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <ge...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>>>> Our initial migration went quite fine but now we are seeing a weird zig-zag
>>>>> performance pattern we cannot explain.
>>>>> In alternating fashion, one task takes about 1 second to finish and
>>>>> the next takes 7sec for a stable streaming rate.
>>>>>
>>>>> Here are comparable metrics for two successive tasks:
>>>>> *Slow*:
>>>>>
>>>>>
>>>>> ​
>>>>>
>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks
>>>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 22
>>>>> s 3 0 3 20151006-044141-2408867082-5050-21047-S1
>>>>> dnode-0.hdfs.private:43812 40 s 11 0 11
>>>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 49
>>>>> s 10 0 10
>>>>> *Fast*:
>>>>>
>>>>> ​
>>>>>
>>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks
>>>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 0.6
>>>>> s 4 0 4 20151006-044141-2408867082-5050-21047-S1
>>>>> dnode-0.hdfs.private:43812 1 s 9 0 9
>>>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 1
>>>>> s 11 0 11
>>>>> We have some custom metrics that measure wall-clock time of execution
>>>>> of certain blocks of the job, like the time it takes to do the local
>>>>> computations (RDD.foreachPartition closure) vs total time.
>>>>> The difference between the slow and fast executing task is on the
>>>>> 'spark computation time' which is wall-clock for the task scheduling
>>>>> (DStream.foreachRDD closure)
>>>>>
>>>>> e.g.
>>>>> Slow task:
>>>>>
>>>>> local computation time: 347.60968499999996, *spark computation time:
>>>>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>>>>
>>>>> Fast task:
>>>>> local computation time: 281.539042,* spark computation time: 263*,
>>>>> metric collection: 138, total process: 401, total_records: 5002
>>>>>
>>>>> We are currently running Spark 1.4.1. The load and the work to be done
>>>>> is stable -this is on a dev env with that stuff under control.
>>>>>
>>>>> Any ideas what this behavior could be?
>>>>>
>>>>> thanks in advance,  Gerard.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Posted by Tathagata Das <td...@databricks.com>.
Are sure that this is not related to Cassandra inserts? Could you just do
foreachRDD { _.count } instead  to keep Cassandra out of the picture and
then test this agian.

On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <at...@adobe.com> wrote:

> Also check if the Kafka cluster is still balanced. Maybe one of the
> brokers manages too many partitions, all the work will stay on that
> executor unless you repartition right after kakfka (and I'm not saying you
> should).
>
> Sent from my iPhone
>
> On 06 Oct 2015, at 22:17, Cody Koeninger <co...@koeninger.org> wrote:
>
> I'm not clear on what you're measuring.  Can you post relevant code
> snippets including the measurement code?
>
> As far as kafka metrics, nothing currently.  There is an info-level log
> message every time a kafka rdd iterator is instantiated,
>
>     log.info(s"Computing topic ${part.topic}, partition ${part.partition}
> " +
>
>       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>
>
> If you log once you're done with an iterator you should be able to see the
> delta.
>
> The other thing to try is reduce the number of parts involved in the job
> to isolate it ... first thing I'd do there is take cassandra out of the
> equation.
>
>
>
> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <ge...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> The job is doing ETL from Kafka records to Cassandra. After a
>> single filtering stage on Spark, the 'TL' part is done using the
>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>
>> We have metrics on the executor work which we collect and add together,
>> indicated here by 'local computation'.  As you can see, we also measure how
>> much it cost us to measure :-)
>> See how 'local work'  times are comparable.  What's not visible is the
>> task scheduling and consuming the data from Kafka which becomes part of the
>> 'spark computation' part.
>>
>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>
>> Are there metrics available somehow on the Kafka reading time?
>>
>> Slow Task Fast Task local computation 347.6 281.53 spark computation 6930
>> 263 metric collection 70 138 wall clock process 7000 401 total records
>> processed 4297 5002
>>
>> (time in ms)
>>
>> kr, Gerard.
>>
>>
>> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Can you say anything more about what the job is doing?
>>>
>>> First thing I'd do is try to get some metrics on the time taken by your
>>> code on the executors (e.g. when processing the iterator) to see if it's
>>> consistent between the two situations.
>>>
>>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <ge...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>>> Our initial migration went quite fine but now we are seeing a weird zig-zag
>>>> performance pattern we cannot explain.
>>>> In alternating fashion, one task takes about 1 second to finish and the
>>>> next takes 7sec for a stable streaming rate.
>>>>
>>>> Here are comparable metrics for two successive tasks:
>>>> *Slow*:
>>>>
>>>>
>>>> ​
>>>>
>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks
>>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 22
>>>> s 3 0 3 20151006-044141-2408867082-5050-21047-S1
>>>> dnode-0.hdfs.private:43812 40 s 11 0 11
>>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 49
>>>> s 10 0 10
>>>> *Fast*:
>>>>
>>>> ​
>>>>
>>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks
>>>> 20151006-044141-2408867082-5050-21047-S0 dnode-3.hdfs.private:36863 0.6
>>>> s 4 0 4 20151006-044141-2408867082-5050-21047-S1
>>>> dnode-0.hdfs.private:43812 1 s 9 0 9
>>>> 20151006-044141-2408867082-5050-21047-S4 dnode-5.hdfs.private:59945 1 s
>>>> 11 0 11
>>>> We have some custom metrics that measure wall-clock time of execution
>>>> of certain blocks of the job, like the time it takes to do the local
>>>> computations (RDD.foreachPartition closure) vs total time.
>>>> The difference between the slow and fast executing task is on the
>>>> 'spark computation time' which is wall-clock for the task scheduling
>>>> (DStream.foreachRDD closure)
>>>>
>>>> e.g.
>>>> Slow task:
>>>>
>>>> local computation time: 347.60968499999996, *spark computation time:
>>>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>>>
>>>> Fast task:
>>>> local computation time: 281.539042,* spark computation time: 263*,
>>>> metric collection: 138, total process: 401, total_records: 5002
>>>>
>>>> We are currently running Spark 1.4.1. The load and the work to be done
>>>> is stable -this is on a dev env with that stuff under control.
>>>>
>>>> Any ideas what this behavior could be?
>>>>
>>>> thanks in advance,  Gerard.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Posted by Adrian Tanase <at...@adobe.com>.
Also check if the Kafka cluster is still balanced. Maybe one of the brokers manages too many partitions, all the work will stay on that executor unless you repartition right after kakfka (and I'm not saying you should).

Sent from my iPhone

On 06 Oct 2015, at 22:17, Cody Koeninger <co...@koeninger.org>> wrote:

I'm not clear on what you're measuring.  Can you post relevant code snippets including the measurement code?

As far as kafka metrics, nothing currently.  There is an info-level log message every time a kafka rdd iterator is instantiated,


    log.info<http://log.info>(s"Computing topic ${part.topic}, partition ${part.partition} " +

      s"offsets ${part.fromOffset} -> ${part.untilOffset}")


If you log once you're done with an iterator you should be able to see the delta.

The other thing to try is reduce the number of parts involved in the job to isolate it ... first thing I'd do there is take cassandra out of the equation.


On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <ge...@gmail.com>> wrote:
Hi Cody,

The job is doing ETL from Kafka records to Cassandra. After a single filtering stage on Spark, the 'TL' part is done using the dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.

We have metrics on the executor work which we collect and add together, indicated here by 'local computation'.  As you can see, we also measure how much it cost us to measure :-)
See how 'local work'  times are comparable.  What's not visible is the task scheduling and consuming the data from Kafka which becomes part of the 'spark computation' part.

The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...

Are there metrics available somehow on the Kafka reading time?

        Slow Task       Fast Task
local computation       347.6   281.53
spark computation       6930    263
metric collection       70      138
wall clock process      7000    401
total records processed 4297    5002

(time in ms)

kr, Gerard.


On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <co...@koeninger.org>> wrote:
Can you say anything more about what the job is doing?

First thing I'd do is try to get some metrics on the time taken by your code on the executors (e.g. when processing the iterator) to see if it's consistent between the two situations.

On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <ge...@gmail.com>> wrote:
Hi,

We recently migrated our streaming jobs to the direct kafka receiver. Our initial migration went quite fine but now we are seeing a weird zig-zag performance pattern we cannot explain.
In alternating fashion, one task takes about 1 second to finish and the next takes 7sec for a stable streaming rate.

Here are comparable metrics for two successive tasks:
Slow:

[cid:ii_iffl5b190_1503e000655dffb4]
?

Executor ID     Address Task Time       Total Tasks     Failed Tasks    Succeeded Tasks
20151006-044141-2408867082-5050-21047-S0        dnode-3.hdfs.private:36863      22 s    3       0       3
20151006-044141-2408867082-5050-21047-S1        dnode-0.hdfs.private:43812      40 s    11      0       11
20151006-044141-2408867082-5050-21047-S4        dnode-5.hdfs.private:59945      49 s    10      0       10
Fast:
[cid:ii_iffl61cv1_1503e008c35b6709]
?

Executor ID     Address Task Time       Total Tasks     Failed Tasks    Succeeded Tasks
20151006-044141-2408867082-5050-21047-S0        dnode-3.hdfs.private:36863      0.6 s   4       0       4
20151006-044141-2408867082-5050-21047-S1        dnode-0.hdfs.private:43812      1 s     9       0       9
20151006-044141-2408867082-5050-21047-S4        dnode-5.hdfs.private:59945      1 s     11      0       11
We have some custom metrics that measure wall-clock time of execution of certain blocks of the job, like the time it takes to do the local computations (RDD.foreachPartition closure) vs total time.
The difference between the slow and fast executing task is on the 'spark computation time' which is wall-clock for the task scheduling (DStream.foreachRDD closure)

e.g.
Slow task:

local computation time: 347.60968499999996, spark computation time: 6930, metric collection: 70, total process: 7000, total_records: 4297

Fast task:
local computation time: 281.539042, spark computation time: 263, metric collection: 138, total process: 401, total_records: 5002

We are currently running Spark 1.4.1. The load and the work to be done is stable -this is on a dev env with that stuff under control.

Any ideas what this behavior could be?

thanks in advance,  Gerard.










Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Posted by Cody Koeninger <co...@koeninger.org>.
I'm not clear on what you're measuring.  Can you post relevant code
snippets including the measurement code?

As far as kafka metrics, nothing currently.  There is an info-level log
message every time a kafka rdd iterator is instantiated,

    log.info(s"Computing topic ${part.topic}, partition ${part.partition} "
+

      s"offsets ${part.fromOffset} -> ${part.untilOffset}")


If you log once you're done with an iterator you should be able to see the
delta.

The other thing to try is reduce the number of parts involved in the job to
isolate it ... first thing I'd do there is take cassandra out of the
equation.



On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <ge...@gmail.com> wrote:

> Hi Cody,
>
> The job is doing ETL from Kafka records to Cassandra. After a
> single filtering stage on Spark, the 'TL' part is done using the
> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>
> We have metrics on the executor work which we collect and add together,
> indicated here by 'local computation'.  As you can see, we also measure how
> much it cost us to measure :-)
> See how 'local work'  times are comparable.  What's not visible is the
> task scheduling and consuming the data from Kafka which becomes part of the
> 'spark computation' part.
>
> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>
> Are there metrics available somehow on the Kafka reading time?
>
> Slow TaskFast Tasklocal computation347.6281.53spark computation6930263metric
> collection70138wall clock process7000401total records processed42975002
>
> (time in ms)
>
> kr, Gerard.
>
>
> On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <co...@koeninger.org> wrote:
>
>> Can you say anything more about what the job is doing?
>>
>> First thing I'd do is try to get some metrics on the time taken by your
>> code on the executors (e.g. when processing the iterator) to see if it's
>> consistent between the two situations.
>>
>> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We recently migrated our streaming jobs to the direct kafka receiver.
>>> Our initial migration went quite fine but now we are seeing a weird zig-zag
>>> performance pattern we cannot explain.
>>> In alternating fashion, one task takes about 1 second to finish and the
>>> next takes 7sec for a stable streaming rate.
>>>
>>> Here are comparable metrics for two successive tasks:
>>> *Slow*:
>>>
>>>
>>> ​
>>>
>>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s30
>>> 320151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s
>>> 1101120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549
>>> s10010
>>> *Fast*:
>>>
>>> ​
>>>
>>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s4
>>> 0420151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s9
>>> 0920151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s
>>> 11011
>>> We have some custom metrics that measure wall-clock time of execution of
>>> certain blocks of the job, like the time it takes to do the local
>>> computations (RDD.foreachPartition closure) vs total time.
>>> The difference between the slow and fast executing task is on the 'spark
>>> computation time' which is wall-clock for the task scheduling
>>> (DStream.foreachRDD closure)
>>>
>>> e.g.
>>> Slow task:
>>>
>>> local computation time: 347.60968499999996, *spark computation time:
>>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>>
>>> Fast task:
>>> local computation time: 281.539042,* spark computation time: 263*,
>>> metric collection: 138, total process: 401, total_records: 5002
>>>
>>> We are currently running Spark 1.4.1. The load and the work to be done
>>> is stable -this is on a dev env with that stuff under control.
>>>
>>> Any ideas what this behavior could be?
>>>
>>> thanks in advance,  Gerard.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Posted by Gerard Maas <ge...@gmail.com>.
Hi Cody,

The job is doing ETL from Kafka records to Cassandra. After a
single filtering stage on Spark, the 'TL' part is done using the
dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.

We have metrics on the executor work which we collect and add together,
indicated here by 'local computation'.  As you can see, we also measure how
much it cost us to measure :-)
See how 'local work'  times are comparable.  What's not visible is the task
scheduling and consuming the data from Kafka which becomes part of the
'spark computation' part.

The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...

Are there metrics available somehow on the Kafka reading time?

Slow TaskFast Tasklocal computation347.6281.53spark computation6930263metric
collection70138wall clock process7000401total records processed42975002

(time in ms)

kr, Gerard.


On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Can you say anything more about what the job is doing?
>
> First thing I'd do is try to get some metrics on the time taken by your
> code on the executors (e.g. when processing the iterator) to see if it's
> consistent between the two situations.
>
> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <ge...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We recently migrated our streaming jobs to the direct kafka receiver. Our
>> initial migration went quite fine but now we are seeing a weird zig-zag
>> performance pattern we cannot explain.
>> In alternating fashion, one task takes about 1 second to finish and the
>> next takes 7sec for a stable streaming rate.
>>
>> Here are comparable metrics for two successive tasks:
>> *Slow*:
>>
>>
>> ​
>>
>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s303
>> 20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s110
>> 1120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549 s
>> 10010
>> *Fast*:
>>
>> ​
>>
>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s40
>> 420151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s909
>> 20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s110
>> 11
>> We have some custom metrics that measure wall-clock time of execution of
>> certain blocks of the job, like the time it takes to do the local
>> computations (RDD.foreachPartition closure) vs total time.
>> The difference between the slow and fast executing task is on the 'spark
>> computation time' which is wall-clock for the task scheduling
>> (DStream.foreachRDD closure)
>>
>> e.g.
>> Slow task:
>>
>> local computation time: 347.60968499999996, *spark computation time:
>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>
>> Fast task:
>> local computation time: 281.539042,* spark computation time: 263*,
>> metric collection: 138, total process: 401, total_records: 5002
>>
>> We are currently running Spark 1.4.1. The load and the work to be done is
>> stable -this is on a dev env with that stuff under control.
>>
>> Any ideas what this behavior could be?
>>
>> thanks in advance,  Gerard.
>>
>>
>>
>>
>>
>>
>>
>

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

Posted by Cody Koeninger <co...@koeninger.org>.
Can you say anything more about what the job is doing?

First thing I'd do is try to get some metrics on the time taken by your
code on the executors (e.g. when processing the iterator) to see if it's
consistent between the two situations.

On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <ge...@gmail.com> wrote:

> Hi,
>
> We recently migrated our streaming jobs to the direct kafka receiver. Our
> initial migration went quite fine but now we are seeing a weird zig-zag
> performance pattern we cannot explain.
> In alternating fashion, one task takes about 1 second to finish and the
> next takes 7sec for a stable streaming rate.
>
> Here are comparable metrics for two successive tasks:
> *Slow*:
>
>
> ​
>
> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s303
> 20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s110
> 1120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549 s10
> 010
> *Fast*:
>
> ​
>
> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s404
> 20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s909
> 20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s11011
> We have some custom metrics that measure wall-clock time of execution of
> certain blocks of the job, like the time it takes to do the local
> computations (RDD.foreachPartition closure) vs total time.
> The difference between the slow and fast executing task is on the 'spark
> computation time' which is wall-clock for the task scheduling
> (DStream.foreachRDD closure)
>
> e.g.
> Slow task:
>
> local computation time: 347.60968499999996, *spark computation time: 6930*,
> metric collection: 70, total process: 7000, total_records: 4297
>
> Fast task:
> local computation time: 281.539042,* spark computation time: 263*, metric
> collection: 138, total process: 401, total_records: 5002
>
> We are currently running Spark 1.4.1. The load and the work to be done is
> stable -this is on a dev env with that stuff under control.
>
> Any ideas what this behavior could be?
>
> thanks in advance,  Gerard.
>
>
>
>
>
>
>