You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Guillermo Ortiz <ko...@gmail.com> on 2015/07/30 14:41:35 UTC

Problems with JobScheduler

I have some problem with the JobScheduler. I have executed same code in two
cluster. I read from three topics in Kafka with DirectStream so I have
three tasks.

I have check YARN and there aren't more jobs launched.

The cluster where I have troubles I got this logs:

15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID
72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID
73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB)
15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB)
15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID
74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID
72) in 208 ms on xxxxxxxxx (1/3)
15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID
74) in 49 ms on xxxxxxxxx (2/3)
*15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 1438259580000 ms*
*15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms*
*15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 1438259590000 ms*
*15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms*
*15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 1438259600000 ms*
*15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms*
*15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 1438259610000 ms*
*15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms*
*15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 1438259620000 ms*
*15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms*
*15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 1438259630000 ms*
*15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms*
15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID
73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
have all completed, from pool
15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
MetricsSpark.scala:67) finished in 60.379 s
15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
MetricsSpark.scala:67, took 60.391761 s
15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
1438258210000 ms.0 from job set of time 1438258210000 ms
15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
1438258210000 ms (execution: 60.399 s)
15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
1438258215000 ms.0 from job set of time 1438258215000 ms

There are *always *a minute of delay in the third task, when I have
executed same code in another cluster there isn't this delay in the
JobScheduler. I checked the configuration in YARN in both clusters and it
seems the same.

The log in the cluster is working good is

15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0 (TID
279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0 (TID
280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB)
15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0 (TID
281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0 (TID
279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB)
15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0 (TID
281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0 (TID
280) in 519 ms on xxxxxxxxx (3/3)
15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
MetricsSpark.scala:67) finished in 0.522 s
15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks
have all completed, from pool
15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
MetricsSpark.scala:67, took 0.531323 s
15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
1438259855000 ms.0 from job set of time 1438259855000 ms
15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
1438259855000 ms (execution: 0.540 s)
15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence list

Any clue about where I could take a look? Number of cpus in YARN is enough.
I executing YARN with same options (--master yarn-server with 1g of memory
in both)

Re: Problems with JobScheduler

Posted by Guillermo Ortiz <ko...@gmail.com>.
I detected the error. The final step is to index data in ElasticSearch, The
elasticSearch in one of the cluster is overhelmed and it doesn't work
correctly.
I linked the cluster which doesn't work with another ES and don't get any
delay.

Sorry,  it wasn't relationed with Spark!




2015-07-31 9:15 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:

> It doesn't make sense to me. Because in the another cluster process all
> data in less than a second.
> Anyway, I'm going to set that parameter.
>
> 2015-07-31 0:36 GMT+02:00 Tathagata Das <td...@databricks.com>:
>
>> Yes, and that is indeed the problem. It is trying to process all the data
>> in Kafka, and therefore taking 60 seconds. You need to set the rate limits
>> for that.
>>
>> On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> If you don't set it, there is no maximum rate, it will get everything
>>> from the end of the last batch to the maximum available offset
>>>
>>> On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz <ko...@gmail.com>
>>> wrote:
>>>
>>>> The difference is that one recives more data than the others two. I can
>>>> pass thought parameters the topics, so, I could execute the code trying
>>>> with one topic and figure out with one is the topic, although I guess that
>>>> it's the topics which gets more data.
>>>>
>>>> Anyway it's pretty weird those delays in just one of the cluster even
>>>> if the another one is not running.
>>>> I have seen the parameter "spark.streaming.kafka.maxRatePerPartition",
>>>> I haven't set any value for this parameter, how does it work if this
>>>> parameter doesn't have a value?
>>>>
>>>> 2015-07-30 16:32 GMT+02:00 Cody Koeninger <co...@koeninger.org>:
>>>>
>>>>> If the jobs are running on different topicpartitions, what's different
>>>>> about them?  Is one of them 120x the throughput of the other, for
>>>>> instance?  You should be able to eliminate cluster config as a difference
>>>>> by running the same topic partition on the different clusters and comparing
>>>>> the results.
>>>>>
>>>>> On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz <konstt2000@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> I have three topics with one partition each topic. So each jobs run
>>>>>> about one topics.
>>>>>>
>>>>>> 2015-07-30 16:20 GMT+02:00 Cody Koeninger <co...@koeninger.org>:
>>>>>>
>>>>>>> Just so I'm clear, the difference in timing you're talking about is
>>>>>>> this:
>>>>>>>
>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>>>>> MetricsSpark.scala:67, took 60.391761 s
>>>>>>>
>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>>>>> MetricsSpark.scala:67, took 0.531323 s
>>>>>>>
>>>>>>>
>>>>>>> Are those jobs running on the same topicpartition?
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz <
>>>>>>> konstt2000@gmail.com> wrote:
>>>>>>>
>>>>>>>> I read about maxRatePerPartition parameter, I haven't set this
>>>>>>>> parameter. Could it be the problem?? Although this wouldn't explain why it
>>>>>>>> doesn't work in one of the clusters.
>>>>>>>>
>>>>>>>> 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>>>>>>
>>>>>>>>> They just share the kafka, the rest of resources are independents.
>>>>>>>>> I tried to stop one cluster and execute just the cluster isn't working but
>>>>>>>>> it happens the same.
>>>>>>>>>
>>>>>>>>> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> I have some problem with the JobScheduler. I have executed same
>>>>>>>>>> code in two cluster. I read from three topics in Kafka with DirectStream so
>>>>>>>>>> I have three tasks.
>>>>>>>>>>
>>>>>>>>>> I have check YARN and there aren't more jobs launched.
>>>>>>>>>>
>>>>>>>>>> The cluster where I have troubles I got this logs:
>>>>>>>>>>
>>>>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>>>>> 24.0 (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
>>>>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>>>>> 24.0 (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
>>>>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added
>>>>>>>>>> broadcast_24_piece0 in memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free:
>>>>>>>>>> 530.3 MB)
>>>>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added
>>>>>>>>>> broadcast_24_piece0 in memory on xxxxxxxxx:43477 (size: 1802.0 B, free:
>>>>>>>>>> 530.3 MB)
>>>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>>>>> 24.0 (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
>>>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>>>>> 24.0 (TID 72) in 208 ms on xxxxxxxxx (1/3)
>>>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>>>>> 24.0 (TID 74) in 49 ms on xxxxxxxxx (2/3)
>>>>>>>>>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259580000 ms*
>>>>>>>>>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259585000 ms*
>>>>>>>>>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259590000 ms*
>>>>>>>>>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259595000 ms*
>>>>>>>>>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259600000 ms*
>>>>>>>>>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259605000 ms*
>>>>>>>>>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259610000 ms*
>>>>>>>>>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259615000 ms*
>>>>>>>>>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259620000 ms*
>>>>>>>>>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259625000 ms*
>>>>>>>>>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259630000 ms*
>>>>>>>>>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
>>>>>>>>>> 1438259635000 ms*
>>>>>>>>>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>>>>> 24.0 (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
>>>>>>>>>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
>>>>>>>>>> tasks have all completed, from pool
>>>>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
>>>>>>>>>> MetricsSpark.scala:67) finished in 60.379 s
>>>>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD
>>>>>>>>>> at MetricsSpark.scala:67, took 60.391761 s
>>>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
>>>>>>>>>> 1438258210000 ms.0 from job set of time 1438258210000 ms
>>>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for
>>>>>>>>>> time 1438258210000 ms (execution: 60.399 s)
>>>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
>>>>>>>>>> 1438258215000 ms.0 from job set of time 1438258215000 ms
>>>>>>>>>>
>>>>>>>>>> There are *always *a minute of delay in the third task, when I
>>>>>>>>>> have executed same code in another cluster there isn't this delay in the
>>>>>>>>>> JobScheduler. I checked the configuration in YARN in both clusters and it
>>>>>>>>>> seems the same.
>>>>>>>>>>
>>>>>>>>>> The log in the cluster is working good is
>>>>>>>>>>
>>>>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3
>>>>>>>>>> tasks
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>>>>> 93.0 (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>>>>> 93.0 (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
>>>>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added
>>>>>>>>>> broadcast_93_piece0 in memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B,
>>>>>>>>>> free: 530.3 MB)
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>>>>> 93.0 (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>>>>> 93.0 (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
>>>>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added
>>>>>>>>>> broadcast_93_piece0 in memory on xxxxxxxxx:49886 (size: 1801.0 B, free:
>>>>>>>>>> 530.3 MB)
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>>>>> 93.0 (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
>>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>>>>> 93.0 (TID 280) in 519 ms on xxxxxxxxx (3/3)
>>>>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
>>>>>>>>>> MetricsSpark.scala:67) finished in 0.522 s
>>>>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose
>>>>>>>>>> tasks have all completed, from pool
>>>>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD
>>>>>>>>>> at MetricsSpark.scala:67, took 0.531323 s
>>>>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
>>>>>>>>>> 1438259855000 ms.0 from job set of time 1438259855000 ms
>>>>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for
>>>>>>>>>> time 1438259855000 ms (execution: 0.540 s)
>>>>>>>>>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from
>>>>>>>>>> persistence list
>>>>>>>>>>
>>>>>>>>>> Any clue about where I could take a look? Number of cpus in YARN
>>>>>>>>>> is enough. I executing YARN with same options (--master yarn-server with 1g
>>>>>>>>>> of memory in both)
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problems with JobScheduler

Posted by Guillermo Ortiz <ko...@gmail.com>.
It doesn't make sense to me. Because in the another cluster process all
data in less than a second.
Anyway, I'm going to set that parameter.

2015-07-31 0:36 GMT+02:00 Tathagata Das <td...@databricks.com>:

> Yes, and that is indeed the problem. It is trying to process all the data
> in Kafka, and therefore taking 60 seconds. You need to set the rate limits
> for that.
>
> On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> If you don't set it, there is no maximum rate, it will get everything
>> from the end of the last batch to the maximum available offset
>>
>> On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz <ko...@gmail.com>
>> wrote:
>>
>>> The difference is that one recives more data than the others two. I can
>>> pass thought parameters the topics, so, I could execute the code trying
>>> with one topic and figure out with one is the topic, although I guess that
>>> it's the topics which gets more data.
>>>
>>> Anyway it's pretty weird those delays in just one of the cluster even if
>>> the another one is not running.
>>> I have seen the parameter "spark.streaming.kafka.maxRatePerPartition",
>>> I haven't set any value for this parameter, how does it work if this
>>> parameter doesn't have a value?
>>>
>>> 2015-07-30 16:32 GMT+02:00 Cody Koeninger <co...@koeninger.org>:
>>>
>>>> If the jobs are running on different topicpartitions, what's different
>>>> about them?  Is one of them 120x the throughput of the other, for
>>>> instance?  You should be able to eliminate cluster config as a difference
>>>> by running the same topic partition on the different clusters and comparing
>>>> the results.
>>>>
>>>> On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz <ko...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have three topics with one partition each topic. So each jobs run
>>>>> about one topics.
>>>>>
>>>>> 2015-07-30 16:20 GMT+02:00 Cody Koeninger <co...@koeninger.org>:
>>>>>
>>>>>> Just so I'm clear, the difference in timing you're talking about is
>>>>>> this:
>>>>>>
>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>>>> MetricsSpark.scala:67, took 60.391761 s
>>>>>>
>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>>>> MetricsSpark.scala:67, took 0.531323 s
>>>>>>
>>>>>>
>>>>>> Are those jobs running on the same topicpartition?
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz <
>>>>>> konstt2000@gmail.com> wrote:
>>>>>>
>>>>>>> I read about maxRatePerPartition parameter, I haven't set this
>>>>>>> parameter. Could it be the problem?? Although this wouldn't explain why it
>>>>>>> doesn't work in one of the clusters.
>>>>>>>
>>>>>>> 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>>>>>
>>>>>>>> They just share the kafka, the rest of resources are independents.
>>>>>>>> I tried to stop one cluster and execute just the cluster isn't working but
>>>>>>>> it happens the same.
>>>>>>>>
>>>>>>>> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>>>>>>
>>>>>>>>> I have some problem with the JobScheduler. I have executed same
>>>>>>>>> code in two cluster. I read from three topics in Kafka with DirectStream so
>>>>>>>>> I have three tasks.
>>>>>>>>>
>>>>>>>>> I have check YARN and there aren't more jobs launched.
>>>>>>>>>
>>>>>>>>> The cluster where I have troubles I got this logs:
>>>>>>>>>
>>>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>>>> 24.0 (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
>>>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>>>> 24.0 (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
>>>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
>>>>>>>>> in memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB)
>>>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
>>>>>>>>> in memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB)
>>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>>>> 24.0 (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
>>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>>>> 24.0 (TID 72) in 208 ms on xxxxxxxxx (1/3)
>>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>>>> 24.0 (TID 74) in 49 ms on xxxxxxxxx (2/3)
>>>>>>>>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259580000 ms*
>>>>>>>>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259585000 ms*
>>>>>>>>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259590000 ms*
>>>>>>>>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259595000 ms*
>>>>>>>>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259600000 ms*
>>>>>>>>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259605000 ms*
>>>>>>>>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259610000 ms*
>>>>>>>>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259615000 ms*
>>>>>>>>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259620000 ms*
>>>>>>>>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259625000 ms*
>>>>>>>>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259630000 ms*
>>>>>>>>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
>>>>>>>>> 1438259635000 ms*
>>>>>>>>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>>>> 24.0 (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
>>>>>>>>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
>>>>>>>>> tasks have all completed, from pool
>>>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
>>>>>>>>> MetricsSpark.scala:67) finished in 60.379 s
>>>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD
>>>>>>>>> at MetricsSpark.scala:67, took 60.391761 s
>>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
>>>>>>>>> 1438258210000 ms.0 from job set of time 1438258210000 ms
>>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for
>>>>>>>>> time 1438258210000 ms (execution: 60.399 s)
>>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
>>>>>>>>> 1438258215000 ms.0 from job set of time 1438258215000 ms
>>>>>>>>>
>>>>>>>>> There are *always *a minute of delay in the third task, when I
>>>>>>>>> have executed same code in another cluster there isn't this delay in the
>>>>>>>>> JobScheduler. I checked the configuration in YARN in both clusters and it
>>>>>>>>> seems the same.
>>>>>>>>>
>>>>>>>>> The log in the cluster is working good is
>>>>>>>>>
>>>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3
>>>>>>>>> tasks
>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>>>> 93.0 (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>>>> 93.0 (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
>>>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0
>>>>>>>>> in memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB)
>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>>>> 93.0 (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>>>> 93.0 (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
>>>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0
>>>>>>>>> in memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB)
>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>>>> 93.0 (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
>>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>>>> 93.0 (TID 280) in 519 ms on xxxxxxxxx (3/3)
>>>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
>>>>>>>>> MetricsSpark.scala:67) finished in 0.522 s
>>>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose
>>>>>>>>> tasks have all completed, from pool
>>>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD
>>>>>>>>> at MetricsSpark.scala:67, took 0.531323 s
>>>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
>>>>>>>>> 1438259855000 ms.0 from job set of time 1438259855000 ms
>>>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
>>>>>>>>> 1438259855000 ms (execution: 0.540 s)
>>>>>>>>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence
>>>>>>>>> list
>>>>>>>>>
>>>>>>>>> Any clue about where I could take a look? Number of cpus in YARN
>>>>>>>>> is enough. I executing YARN with same options (--master yarn-server with 1g
>>>>>>>>> of memory in both)
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problems with JobScheduler

Posted by Tathagata Das <td...@databricks.com>.
Yes, and that is indeed the problem. It is trying to process all the data
in Kafka, and therefore taking 60 seconds. You need to set the rate limits
for that.

On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger <co...@koeninger.org> wrote:

> If you don't set it, there is no maximum rate, it will get everything from
> the end of the last batch to the maximum available offset
>
> On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz <ko...@gmail.com>
> wrote:
>
>> The difference is that one recives more data than the others two. I can
>> pass thought parameters the topics, so, I could execute the code trying
>> with one topic and figure out with one is the topic, although I guess that
>> it's the topics which gets more data.
>>
>> Anyway it's pretty weird those delays in just one of the cluster even if
>> the another one is not running.
>> I have seen the parameter "spark.streaming.kafka.maxRatePerPartition", I
>> haven't set any value for this parameter, how does it work if this
>> parameter doesn't have a value?
>>
>> 2015-07-30 16:32 GMT+02:00 Cody Koeninger <co...@koeninger.org>:
>>
>>> If the jobs are running on different topicpartitions, what's different
>>> about them?  Is one of them 120x the throughput of the other, for
>>> instance?  You should be able to eliminate cluster config as a difference
>>> by running the same topic partition on the different clusters and comparing
>>> the results.
>>>
>>> On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz <ko...@gmail.com>
>>> wrote:
>>>
>>>> I have three topics with one partition each topic. So each jobs run
>>>> about one topics.
>>>>
>>>> 2015-07-30 16:20 GMT+02:00 Cody Koeninger <co...@koeninger.org>:
>>>>
>>>>> Just so I'm clear, the difference in timing you're talking about is
>>>>> this:
>>>>>
>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>>> MetricsSpark.scala:67, took 60.391761 s
>>>>>
>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>>> MetricsSpark.scala:67, took 0.531323 s
>>>>>
>>>>>
>>>>> Are those jobs running on the same topicpartition?
>>>>>
>>>>>
>>>>> On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz <konstt2000@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> I read about maxRatePerPartition parameter, I haven't set this
>>>>>> parameter. Could it be the problem?? Although this wouldn't explain why it
>>>>>> doesn't work in one of the clusters.
>>>>>>
>>>>>> 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>>>>
>>>>>>> They just share the kafka, the rest of resources are independents. I
>>>>>>> tried to stop one cluster and execute just the cluster isn't working but it
>>>>>>> happens the same.
>>>>>>>
>>>>>>> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>>>>>
>>>>>>>> I have some problem with the JobScheduler. I have executed same
>>>>>>>> code in two cluster. I read from three topics in Kafka with DirectStream so
>>>>>>>> I have three tasks.
>>>>>>>>
>>>>>>>> I have check YARN and there aren't more jobs launched.
>>>>>>>>
>>>>>>>> The cluster where I have troubles I got this logs:
>>>>>>>>
>>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>>> 24.0 (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
>>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>>> 24.0 (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
>>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
>>>>>>>> in memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB)
>>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
>>>>>>>> in memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB)
>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>>> 24.0 (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>>> 24.0 (TID 72) in 208 ms on xxxxxxxxx (1/3)
>>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>>> 24.0 (TID 74) in 49 ms on xxxxxxxxx (2/3)
>>>>>>>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259580000 ms*
>>>>>>>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259585000 ms*
>>>>>>>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259590000 ms*
>>>>>>>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259595000 ms*
>>>>>>>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259600000 ms*
>>>>>>>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259605000 ms*
>>>>>>>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259610000 ms*
>>>>>>>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259615000 ms*
>>>>>>>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259620000 ms*
>>>>>>>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259625000 ms*
>>>>>>>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259630000 ms*
>>>>>>>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
>>>>>>>> 1438259635000 ms*
>>>>>>>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>>> 24.0 (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
>>>>>>>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
>>>>>>>> tasks have all completed, from pool
>>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
>>>>>>>> MetricsSpark.scala:67) finished in 60.379 s
>>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>>>>>> MetricsSpark.scala:67, took 60.391761 s
>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
>>>>>>>> 1438258210000 ms.0 from job set of time 1438258210000 ms
>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for
>>>>>>>> time 1438258210000 ms (execution: 60.399 s)
>>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
>>>>>>>> 1438258215000 ms.0 from job set of time 1438258215000 ms
>>>>>>>>
>>>>>>>> There are *always *a minute of delay in the third task, when I
>>>>>>>> have executed same code in another cluster there isn't this delay in the
>>>>>>>> JobScheduler. I checked the configuration in YARN in both clusters and it
>>>>>>>> seems the same.
>>>>>>>>
>>>>>>>> The log in the cluster is working good is
>>>>>>>>
>>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3
>>>>>>>> tasks
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>>> 93.0 (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>>> 93.0 (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
>>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0
>>>>>>>> in memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB)
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>>> 93.0 (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>>> 93.0 (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
>>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0
>>>>>>>> in memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB)
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>>> 93.0 (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
>>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>>> 93.0 (TID 280) in 519 ms on xxxxxxxxx (3/3)
>>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
>>>>>>>> MetricsSpark.scala:67) finished in 0.522 s
>>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose
>>>>>>>> tasks have all completed, from pool
>>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>>>>>> MetricsSpark.scala:67, took 0.531323 s
>>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
>>>>>>>> 1438259855000 ms.0 from job set of time 1438259855000 ms
>>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
>>>>>>>> 1438259855000 ms (execution: 0.540 s)
>>>>>>>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence
>>>>>>>> list
>>>>>>>>
>>>>>>>> Any clue about where I could take a look? Number of cpus in YARN is
>>>>>>>> enough. I executing YARN with same options (--master yarn-server with 1g of
>>>>>>>> memory in both)
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problems with JobScheduler

Posted by Cody Koeninger <co...@koeninger.org>.
If you don't set it, there is no maximum rate, it will get everything from
the end of the last batch to the maximum available offset

On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz <ko...@gmail.com>
wrote:

> The difference is that one recives more data than the others two. I can
> pass thought parameters the topics, so, I could execute the code trying
> with one topic and figure out with one is the topic, although I guess that
> it's the topics which gets more data.
>
> Anyway it's pretty weird those delays in just one of the cluster even if
> the another one is not running.
> I have seen the parameter "spark.streaming.kafka.maxRatePerPartition", I
> haven't set any value for this parameter, how does it work if this
> parameter doesn't have a value?
>
> 2015-07-30 16:32 GMT+02:00 Cody Koeninger <co...@koeninger.org>:
>
>> If the jobs are running on different topicpartitions, what's different
>> about them?  Is one of them 120x the throughput of the other, for
>> instance?  You should be able to eliminate cluster config as a difference
>> by running the same topic partition on the different clusters and comparing
>> the results.
>>
>> On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz <ko...@gmail.com>
>> wrote:
>>
>>> I have three topics with one partition each topic. So each jobs run
>>> about one topics.
>>>
>>> 2015-07-30 16:20 GMT+02:00 Cody Koeninger <co...@koeninger.org>:
>>>
>>>> Just so I'm clear, the difference in timing you're talking about is
>>>> this:
>>>>
>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>> MetricsSpark.scala:67, took 60.391761 s
>>>>
>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>> MetricsSpark.scala:67, took 0.531323 s
>>>>
>>>>
>>>> Are those jobs running on the same topicpartition?
>>>>
>>>>
>>>> On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz <ko...@gmail.com>
>>>> wrote:
>>>>
>>>>> I read about maxRatePerPartition parameter, I haven't set this
>>>>> parameter. Could it be the problem?? Although this wouldn't explain why it
>>>>> doesn't work in one of the clusters.
>>>>>
>>>>> 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>>>
>>>>>> They just share the kafka, the rest of resources are independents. I
>>>>>> tried to stop one cluster and execute just the cluster isn't working but it
>>>>>> happens the same.
>>>>>>
>>>>>> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>>>>
>>>>>>> I have some problem with the JobScheduler. I have executed same code
>>>>>>> in two cluster. I read from three topics in Kafka with DirectStream so I
>>>>>>> have three tasks.
>>>>>>>
>>>>>>> I have check YARN and there aren't more jobs launched.
>>>>>>>
>>>>>>> The cluster where I have troubles I got this logs:
>>>>>>>
>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>> 24.0 (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
>>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>> 24.0 (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
>>>>>>> in memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB)
>>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
>>>>>>> in memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB)
>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>> 24.0 (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>> 24.0 (TID 72) in 208 ms on xxxxxxxxx (1/3)
>>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>> 24.0 (TID 74) in 49 ms on xxxxxxxxx (2/3)
>>>>>>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259580000 ms*
>>>>>>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259585000 ms*
>>>>>>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259590000 ms*
>>>>>>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259595000 ms*
>>>>>>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259600000 ms*
>>>>>>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259605000 ms*
>>>>>>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259610000 ms*
>>>>>>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259615000 ms*
>>>>>>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259620000 ms*
>>>>>>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259625000 ms*
>>>>>>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259630000 ms*
>>>>>>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
>>>>>>> 1438259635000 ms*
>>>>>>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>> 24.0 (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
>>>>>>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
>>>>>>> tasks have all completed, from pool
>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
>>>>>>> MetricsSpark.scala:67) finished in 60.379 s
>>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>>>>> MetricsSpark.scala:67, took 60.391761 s
>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
>>>>>>> 1438258210000 ms.0 from job set of time 1438258210000 ms
>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for
>>>>>>> time 1438258210000 ms (execution: 60.399 s)
>>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
>>>>>>> 1438258215000 ms.0 from job set of time 1438258215000 ms
>>>>>>>
>>>>>>> There are *always *a minute of delay in the third task, when I have
>>>>>>> executed same code in another cluster there isn't this delay in the
>>>>>>> JobScheduler. I checked the configuration in YARN in both clusters and it
>>>>>>> seems the same.
>>>>>>>
>>>>>>> The log in the cluster is working good is
>>>>>>>
>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3
>>>>>>> tasks
>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>>> 93.0 (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>>> 93.0 (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0
>>>>>>> in memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB)
>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>>> 93.0 (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>>> 93.0 (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
>>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0
>>>>>>> in memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB)
>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>>> 93.0 (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
>>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>>> 93.0 (TID 280) in 519 ms on xxxxxxxxx (3/3)
>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
>>>>>>> MetricsSpark.scala:67) finished in 0.522 s
>>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose
>>>>>>> tasks have all completed, from pool
>>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>>>>> MetricsSpark.scala:67, took 0.531323 s
>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
>>>>>>> 1438259855000 ms.0 from job set of time 1438259855000 ms
>>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
>>>>>>> 1438259855000 ms (execution: 0.540 s)
>>>>>>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence
>>>>>>> list
>>>>>>>
>>>>>>> Any clue about where I could take a look? Number of cpus in YARN is
>>>>>>> enough. I executing YARN with same options (--master yarn-server with 1g of
>>>>>>> memory in both)
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problems with JobScheduler

Posted by Guillermo Ortiz <ko...@gmail.com>.
The difference is that one recives more data than the others two. I can
pass thought parameters the topics, so, I could execute the code trying
with one topic and figure out with one is the topic, although I guess that
it's the topics which gets more data.

Anyway it's pretty weird those delays in just one of the cluster even if
the another one is not running.
I have seen the parameter "spark.streaming.kafka.maxRatePerPartition", I
haven't set any value for this parameter, how does it work if this
parameter doesn't have a value?

2015-07-30 16:32 GMT+02:00 Cody Koeninger <co...@koeninger.org>:

> If the jobs are running on different topicpartitions, what's different
> about them?  Is one of them 120x the throughput of the other, for
> instance?  You should be able to eliminate cluster config as a difference
> by running the same topic partition on the different clusters and comparing
> the results.
>
> On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz <ko...@gmail.com>
> wrote:
>
>> I have three topics with one partition each topic. So each jobs run about
>> one topics.
>>
>> 2015-07-30 16:20 GMT+02:00 Cody Koeninger <co...@koeninger.org>:
>>
>>> Just so I'm clear, the difference in timing you're talking about is this:
>>>
>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>> MetricsSpark.scala:67, took 60.391761 s
>>>
>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>> MetricsSpark.scala:67, took 0.531323 s
>>>
>>>
>>> Are those jobs running on the same topicpartition?
>>>
>>>
>>> On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz <ko...@gmail.com>
>>> wrote:
>>>
>>>> I read about maxRatePerPartition parameter, I haven't set this
>>>> parameter. Could it be the problem?? Although this wouldn't explain why it
>>>> doesn't work in one of the clusters.
>>>>
>>>> 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>>
>>>>> They just share the kafka, the rest of resources are independents. I
>>>>> tried to stop one cluster and execute just the cluster isn't working but it
>>>>> happens the same.
>>>>>
>>>>> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>>>
>>>>>> I have some problem with the JobScheduler. I have executed same code
>>>>>> in two cluster. I read from three topics in Kafka with DirectStream so I
>>>>>> have three tasks.
>>>>>>
>>>>>> I have check YARN and there aren't more jobs launched.
>>>>>>
>>>>>> The cluster where I have troubles I got this logs:
>>>>>>
>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>> 24.0 (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
>>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>> 24.0 (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
>>>>>> memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB)
>>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
>>>>>> memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB)
>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>> 24.0 (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>> 24.0 (TID 72) in 208 ms on xxxxxxxxx (1/3)
>>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>> 24.0 (TID 74) in 49 ms on xxxxxxxxx (2/3)
>>>>>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
>>>>>> 1438259580000 ms*
>>>>>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
>>>>>> 1438259585000 ms*
>>>>>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
>>>>>> 1438259590000 ms*
>>>>>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
>>>>>> 1438259595000 ms*
>>>>>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
>>>>>> 1438259600000 ms*
>>>>>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
>>>>>> 1438259605000 ms*
>>>>>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
>>>>>> 1438259610000 ms*
>>>>>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
>>>>>> 1438259615000 ms*
>>>>>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
>>>>>> 1438259620000 ms*
>>>>>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
>>>>>> 1438259625000 ms*
>>>>>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
>>>>>> 1438259630000 ms*
>>>>>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
>>>>>> 1438259635000 ms*
>>>>>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>> 24.0 (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
>>>>>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
>>>>>> tasks have all completed, from pool
>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
>>>>>> MetricsSpark.scala:67) finished in 60.379 s
>>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>>>> MetricsSpark.scala:67, took 60.391761 s
>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
>>>>>> 1438258210000 ms.0 from job set of time 1438258210000 ms
>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
>>>>>> 1438258210000 ms (execution: 60.399 s)
>>>>>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
>>>>>> 1438258215000 ms.0 from job set of time 1438258215000 ms
>>>>>>
>>>>>> There are *always *a minute of delay in the third task, when I have
>>>>>> executed same code in another cluster there isn't this delay in the
>>>>>> JobScheduler. I checked the configuration in YARN in both clusters and it
>>>>>> seems the same.
>>>>>>
>>>>>> The log in the cluster is working good is
>>>>>>
>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3
>>>>>> tasks
>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage
>>>>>> 93.0 (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage
>>>>>> 93.0 (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
>>>>>> memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB)
>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage
>>>>>> 93.0 (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage
>>>>>> 93.0 (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
>>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
>>>>>> memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB)
>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage
>>>>>> 93.0 (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
>>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage
>>>>>> 93.0 (TID 280) in 519 ms on xxxxxxxxx (3/3)
>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
>>>>>> MetricsSpark.scala:67) finished in 0.522 s
>>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose
>>>>>> tasks have all completed, from pool
>>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>>>> MetricsSpark.scala:67, took 0.531323 s
>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
>>>>>> 1438259855000 ms.0 from job set of time 1438259855000 ms
>>>>>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
>>>>>> 1438259855000 ms (execution: 0.540 s)
>>>>>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence
>>>>>> list
>>>>>>
>>>>>> Any clue about where I could take a look? Number of cpus in YARN is
>>>>>> enough. I executing YARN with same options (--master yarn-server with 1g of
>>>>>> memory in both)
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problems with JobScheduler

Posted by Cody Koeninger <co...@koeninger.org>.
If the jobs are running on different topicpartitions, what's different
about them?  Is one of them 120x the throughput of the other, for
instance?  You should be able to eliminate cluster config as a difference
by running the same topic partition on the different clusters and comparing
the results.

On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz <ko...@gmail.com>
wrote:

> I have three topics with one partition each topic. So each jobs run about
> one topics.
>
> 2015-07-30 16:20 GMT+02:00 Cody Koeninger <co...@koeninger.org>:
>
>> Just so I'm clear, the difference in timing you're talking about is this:
>>
>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>> MetricsSpark.scala:67, took 60.391761 s
>>
>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>> MetricsSpark.scala:67, took 0.531323 s
>>
>>
>> Are those jobs running on the same topicpartition?
>>
>>
>> On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz <ko...@gmail.com>
>> wrote:
>>
>>> I read about maxRatePerPartition parameter, I haven't set this
>>> parameter. Could it be the problem?? Although this wouldn't explain why it
>>> doesn't work in one of the clusters.
>>>
>>> 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>
>>>> They just share the kafka, the rest of resources are independents. I
>>>> tried to stop one cluster and execute just the cluster isn't working but it
>>>> happens the same.
>>>>
>>>> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>>
>>>>> I have some problem with the JobScheduler. I have executed same code
>>>>> in two cluster. I read from three topics in Kafka with DirectStream so I
>>>>> have three tasks.
>>>>>
>>>>> I have check YARN and there aren't more jobs launched.
>>>>>
>>>>> The cluster where I have troubles I got this logs:
>>>>>
>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
>>>>> (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
>>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
>>>>> (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
>>>>> memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB)
>>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
>>>>> memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB)
>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
>>>>> (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
>>>>> (TID 72) in 208 ms on xxxxxxxxx (1/3)
>>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
>>>>> (TID 74) in 49 ms on xxxxxxxxx (2/3)
>>>>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
>>>>> 1438259580000 ms*
>>>>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
>>>>> 1438259585000 ms*
>>>>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
>>>>> 1438259590000 ms*
>>>>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
>>>>> 1438259595000 ms*
>>>>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
>>>>> 1438259600000 ms*
>>>>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
>>>>> 1438259605000 ms*
>>>>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
>>>>> 1438259610000 ms*
>>>>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
>>>>> 1438259615000 ms*
>>>>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
>>>>> 1438259620000 ms*
>>>>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
>>>>> 1438259625000 ms*
>>>>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
>>>>> 1438259630000 ms*
>>>>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
>>>>> 1438259635000 ms*
>>>>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
>>>>> (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
>>>>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
>>>>> tasks have all completed, from pool
>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
>>>>> MetricsSpark.scala:67) finished in 60.379 s
>>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>>> MetricsSpark.scala:67, took 60.391761 s
>>>>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
>>>>> 1438258210000 ms.0 from job set of time 1438258210000 ms
>>>>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
>>>>> 1438258210000 ms (execution: 60.399 s)
>>>>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
>>>>> 1438258215000 ms.0 from job set of time 1438258215000 ms
>>>>>
>>>>> There are *always *a minute of delay in the third task, when I have
>>>>> executed same code in another cluster there isn't this delay in the
>>>>> JobScheduler. I checked the configuration in YARN in both clusters and it
>>>>> seems the same.
>>>>>
>>>>> The log in the cluster is working good is
>>>>>
>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
>>>>> (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
>>>>> (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
>>>>> memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB)
>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
>>>>> (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
>>>>> (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
>>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
>>>>> memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB)
>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0
>>>>> (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
>>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0
>>>>> (TID 280) in 519 ms on xxxxxxxxx (3/3)
>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
>>>>> MetricsSpark.scala:67) finished in 0.522 s
>>>>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose
>>>>> tasks have all completed, from pool
>>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>>> MetricsSpark.scala:67, took 0.531323 s
>>>>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
>>>>> 1438259855000 ms.0 from job set of time 1438259855000 ms
>>>>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
>>>>> 1438259855000 ms (execution: 0.540 s)
>>>>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence list
>>>>>
>>>>> Any clue about where I could take a look? Number of cpus in YARN is
>>>>> enough. I executing YARN with same options (--master yarn-server with 1g of
>>>>> memory in both)
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Problems with JobScheduler

Posted by Guillermo Ortiz <ko...@gmail.com>.
I have three topics with one partition each topic. So each jobs run about
one topics.

2015-07-30 16:20 GMT+02:00 Cody Koeninger <co...@koeninger.org>:

> Just so I'm clear, the difference in timing you're talking about is this:
>
> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
> MetricsSpark.scala:67, took 60.391761 s
>
> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
> MetricsSpark.scala:67, took 0.531323 s
>
>
> Are those jobs running on the same topicpartition?
>
>
> On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz <ko...@gmail.com>
> wrote:
>
>> I read about maxRatePerPartition parameter, I haven't set this
>> parameter. Could it be the problem?? Although this wouldn't explain why it
>> doesn't work in one of the clusters.
>>
>> 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>
>>> They just share the kafka, the rest of resources are independents. I
>>> tried to stop one cluster and execute just the cluster isn't working but it
>>> happens the same.
>>>
>>> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>>
>>>> I have some problem with the JobScheduler. I have executed same code in
>>>> two cluster. I read from three topics in Kafka with DirectStream so I have
>>>> three tasks.
>>>>
>>>> I have check YARN and there aren't more jobs launched.
>>>>
>>>> The cluster where I have troubles I got this logs:
>>>>
>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
>>>> (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
>>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
>>>> (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
>>>> memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB)
>>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
>>>> memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB)
>>>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
>>>> (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
>>>> (TID 72) in 208 ms on xxxxxxxxx (1/3)
>>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
>>>> (TID 74) in 49 ms on xxxxxxxxx (2/3)
>>>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 1438259580000
>>>> ms*
>>>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000
>>>> ms*
>>>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 1438259590000
>>>> ms*
>>>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000
>>>> ms*
>>>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 1438259600000
>>>> ms*
>>>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000
>>>> ms*
>>>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 1438259610000
>>>> ms*
>>>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000
>>>> ms*
>>>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 1438259620000
>>>> ms*
>>>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000
>>>> ms*
>>>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 1438259630000
>>>> ms*
>>>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000
>>>> ms*
>>>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
>>>> (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
>>>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
>>>> have all completed, from pool
>>>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
>>>> MetricsSpark.scala:67) finished in 60.379 s
>>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>>> MetricsSpark.scala:67, took 60.391761 s
>>>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
>>>> 1438258210000 ms.0 from job set of time 1438258210000 ms
>>>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
>>>> 1438258210000 ms (execution: 60.399 s)
>>>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
>>>> 1438258215000 ms.0 from job set of time 1438258215000 ms
>>>>
>>>> There are *always *a minute of delay in the third task, when I have
>>>> executed same code in another cluster there isn't this delay in the
>>>> JobScheduler. I checked the configuration in YARN in both clusters and it
>>>> seems the same.
>>>>
>>>> The log in the cluster is working good is
>>>>
>>>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
>>>> (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
>>>> (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
>>>> memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB)
>>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
>>>> (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
>>>> (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
>>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
>>>> memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB)
>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0
>>>> (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
>>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0
>>>> (TID 280) in 519 ms on xxxxxxxxx (3/3)
>>>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
>>>> MetricsSpark.scala:67) finished in 0.522 s
>>>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks
>>>> have all completed, from pool
>>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>>> MetricsSpark.scala:67, took 0.531323 s
>>>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
>>>> 1438259855000 ms.0 from job set of time 1438259855000 ms
>>>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
>>>> 1438259855000 ms (execution: 0.540 s)
>>>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence list
>>>>
>>>> Any clue about where I could take a look? Number of cpus in YARN is
>>>> enough. I executing YARN with same options (--master yarn-server with 1g of
>>>> memory in both)
>>>>
>>>
>>>
>>
>

Re: Problems with JobScheduler

Posted by Cody Koeninger <co...@koeninger.org>.
Just so I'm clear, the difference in timing you're talking about is this:

15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
MetricsSpark.scala:67, took 60.391761 s

15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
MetricsSpark.scala:67, took 0.531323 s


Are those jobs running on the same topicpartition?


On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz <ko...@gmail.com>
wrote:

> I read about maxRatePerPartition parameter, I haven't set this parameter.
> Could it be the problem?? Although this wouldn't explain why it doesn't
> work in one of the clusters.
>
> 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>
>> They just share the kafka, the rest of resources are independents. I
>> tried to stop one cluster and execute just the cluster isn't working but it
>> happens the same.
>>
>> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>>
>>> I have some problem with the JobScheduler. I have executed same code in
>>> two cluster. I read from three topics in Kafka with DirectStream so I have
>>> three tasks.
>>>
>>> I have check YARN and there aren't more jobs launched.
>>>
>>> The cluster where I have troubles I got this logs:
>>>
>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
>>> (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
>>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
>>> (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
>>> memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB)
>>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
>>> memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB)
>>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
>>> (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
>>> (TID 72) in 208 ms on xxxxxxxxx (1/3)
>>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
>>> (TID 74) in 49 ms on xxxxxxxxx (2/3)
>>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 1438259580000
>>> ms*
>>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000
>>> ms*
>>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 1438259590000
>>> ms*
>>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000
>>> ms*
>>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 1438259600000
>>> ms*
>>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000
>>> ms*
>>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 1438259610000
>>> ms*
>>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000
>>> ms*
>>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 1438259620000
>>> ms*
>>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000
>>> ms*
>>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 1438259630000
>>> ms*
>>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000
>>> ms*
>>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
>>> (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
>>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
>>> have all completed, from pool
>>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
>>> MetricsSpark.scala:67) finished in 60.379 s
>>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>>> MetricsSpark.scala:67, took 60.391761 s
>>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
>>> 1438258210000 ms.0 from job set of time 1438258210000 ms
>>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
>>> 1438258210000 ms (execution: 60.399 s)
>>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
>>> 1438258215000 ms.0 from job set of time 1438258215000 ms
>>>
>>> There are *always *a minute of delay in the third task, when I have
>>> executed same code in another cluster there isn't this delay in the
>>> JobScheduler. I checked the configuration in YARN in both clusters and it
>>> seems the same.
>>>
>>> The log in the cluster is working good is
>>>
>>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
>>> (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
>>> (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
>>> memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB)
>>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
>>> (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
>>> (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
>>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
>>> memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB)
>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0
>>> (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
>>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0
>>> (TID 280) in 519 ms on xxxxxxxxx (3/3)
>>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
>>> MetricsSpark.scala:67) finished in 0.522 s
>>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks
>>> have all completed, from pool
>>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>>> MetricsSpark.scala:67, took 0.531323 s
>>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
>>> 1438259855000 ms.0 from job set of time 1438259855000 ms
>>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
>>> 1438259855000 ms (execution: 0.540 s)
>>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence list
>>>
>>> Any clue about where I could take a look? Number of cpus in YARN is
>>> enough. I executing YARN with same options (--master yarn-server with 1g of
>>> memory in both)
>>>
>>
>>
>

Re: Problems with JobScheduler

Posted by Guillermo Ortiz <ko...@gmail.com>.
I read about maxRatePerPartition parameter, I haven't set this parameter.
Could it be the problem?? Although this wouldn't explain why it doesn't
work in one of the clusters.

2015-07-30 14:47 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:

> They just share the kafka, the rest of resources are independents. I tried
> to stop one cluster and execute just the cluster isn't working but it
> happens the same.
>
> 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:
>
>> I have some problem with the JobScheduler. I have executed same code in
>> two cluster. I read from three topics in Kafka with DirectStream so I have
>> three tasks.
>>
>> I have check YARN and there aren't more jobs launched.
>>
>> The cluster where I have troubles I got this logs:
>>
>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
>> (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
>> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
>> (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
>> memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB)
>> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
>> memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB)
>> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
>> (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
>> (TID 72) in 208 ms on xxxxxxxxx (1/3)
>> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
>> (TID 74) in 49 ms on xxxxxxxxx (2/3)
>> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 1438259580000
>> ms*
>> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000
>> ms*
>> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 1438259590000
>> ms*
>> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000
>> ms*
>> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 1438259600000
>> ms*
>> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000
>> ms*
>> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 1438259610000
>> ms*
>> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000
>> ms*
>> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 1438259620000
>> ms*
>> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000
>> ms*
>> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 1438259630000
>> ms*
>> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000
>> ms*
>> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
>> (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
>> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
>> have all completed, from pool
>> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
>> MetricsSpark.scala:67) finished in 60.379 s
>> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
>> MetricsSpark.scala:67, took 60.391761 s
>> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
>> 1438258210000 ms.0 from job set of time 1438258210000 ms
>> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
>> 1438258210000 ms (execution: 60.399 s)
>> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
>> 1438258215000 ms.0 from job set of time 1438258215000 ms
>>
>> There are *always *a minute of delay in the third task, when I have
>> executed same code in another cluster there isn't this delay in the
>> JobScheduler. I checked the configuration in YARN in both clusters and it
>> seems the same.
>>
>> The log in the cluster is working good is
>>
>> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
>> (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
>> (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
>> memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB)
>> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
>> (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
>> (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
>> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
>> memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB)
>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0
>> (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
>> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0
>> (TID 280) in 519 ms on xxxxxxxxx (3/3)
>> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
>> MetricsSpark.scala:67) finished in 0.522 s
>> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks
>> have all completed, from pool
>> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
>> MetricsSpark.scala:67, took 0.531323 s
>> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
>> 1438259855000 ms.0 from job set of time 1438259855000 ms
>> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
>> 1438259855000 ms (execution: 0.540 s)
>> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence list
>>
>> Any clue about where I could take a look? Number of cpus in YARN is
>> enough. I executing YARN with same options (--master yarn-server with 1g of
>> memory in both)
>>
>
>

Re: Problems with JobScheduler

Posted by Guillermo Ortiz <ko...@gmail.com>.
They just share the kafka, the rest of resources are independents. I tried
to stop one cluster and execute just the cluster isn't working but it
happens the same.

2015-07-30 14:41 GMT+02:00 Guillermo Ortiz <ko...@gmail.com>:

> I have some problem with the JobScheduler. I have executed same code in
> two cluster. I read from three topics in Kafka with DirectStream so I have
> three tasks.
>
> I have check YARN and there aren't more jobs launched.
>
> The cluster where I have troubles I got this logs:
>
> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0
> (TID 72, xxxxxxxxx, RACK_LOCAL, 14856 bytes)
> 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0
> (TID 73, xxxxxxxxxxxxxxx, RACK_LOCAL, 14852 bytes)
> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
> memory on xxxxxxxxxxx:44909 (size: 1802.0 B, free: 530.3 MB)
> 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in
> memory on xxxxxxxxx:43477 (size: 1802.0 B, free: 530.3 MB)
> 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0
> (TID 74, xxxxxxxxx, RACK_LOCAL, 14860 bytes)
> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0
> (TID 72) in 208 ms on xxxxxxxxx (1/3)
> 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0
> (TID 74) in 49 ms on xxxxxxxxx (2/3)
> *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 1438259580000 ms*
> *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms*
> *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 1438259590000 ms*
> *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms*
> *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 1438259600000 ms*
> *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms*
> *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 1438259610000 ms*
> *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms*
> *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 1438259620000 ms*
> *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms*
> *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 1438259630000 ms*
> *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms*
> 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0
> (TID 73) in 60373 ms onxxxxxxxxxxxxxxxx (3/3)
> 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks
> have all completed, from pool
> 15/07/30 14:33:59 INFO DAGScheduler: Stage 24 (foreachRDD at
> MetricsSpark.scala:67) finished in 60.379 s
> 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
> MetricsSpark.scala:67, took 60.391761 s
> 15/07/30 14:33:59 INFO JobScheduler: Finished job streaming job
> 1438258210000 ms.0 from job set of time 1438258210000 ms
> 15/07/30 14:33:59 INFO JobScheduler: Total delay: 1429.249 s for time
> 1438258210000 ms (execution: 60.399 s)
> 15/07/30 14:33:59 INFO JobScheduler: Starting job streaming job
> 1438258215000 ms.0 from job set of time 1438258215000 ms
>
> There are *always *a minute of delay in the third task, when I have
> executed same code in another cluster there isn't this delay in the
> JobScheduler. I checked the configuration in YARN in both clusters and it
> seems the same.
>
> The log in the cluster is working good is
>
> 15/07/30 14:37:35 INFO YarnScheduler: Adding task set 93.0 with 3 tasks
> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 0.0 in stage 93.0
> (TID 279, xxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14643 bytes)
> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 1.0 in stage 93.0
> (TID 280, xxxxxxxxx, RACK_LOCAL, 14639 bytes)
> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
> memory on xxxxxxxxxxxxxxxxx:45132 (size: 1801.0 B, free: 530.3 MB)
> 15/07/30 14:37:35 INFO TaskSetManager: Starting task 2.0 in stage 93.0
> (TID 281, xxxxxxxxxxxxxxxxxxx, RACK_LOCAL, 14647 bytes)
> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 0.0 in stage 93.0
> (TID 279) in 121 ms on xxxxxxxxxxxxxxxxxxxx (1/3)
> 15/07/30 14:37:35 INFO BlockManagerInfo: Added broadcast_93_piece0 in
> memory on xxxxxxxxx:49886 (size: 1801.0 B, free: 530.3 MB)
> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 2.0 in stage 93.0
> (TID 281) in 261 ms on xxxxxxxxxxxxxxxxxx (2/3)
> 15/07/30 14:37:35 INFO TaskSetManager: Finished task 1.0 in stage 93.0
> (TID 280) in 519 ms on xxxxxxxxx (3/3)
> 15/07/30 14:37:35 INFO DAGScheduler: Stage 93 (foreachRDD at
> MetricsSpark.scala:67) finished in 0.522 s
> 15/07/30 14:37:35 INFO YarnScheduler: Removed TaskSet 93.0, whose tasks
> have all completed, from pool
> 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
> MetricsSpark.scala:67, took 0.531323 s
> 15/07/30 14:37:35 INFO JobScheduler: Finished job streaming job
> 1438259855000 ms.0 from job set of time 1438259855000 ms
> 15/07/30 14:37:35 INFO JobScheduler: Total delay: 0.548 s for time
> 1438259855000 ms (execution: 0.540 s)
> 15/07/30 14:37:35 INFO KafkaRDD: Removing RDD 184 from persistence list
>
> Any clue about where I could take a look? Number of cpus in YARN is
> enough. I executing YARN with same options (--master yarn-server with 1g of
> memory in both)
>