You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Stavros Kontopoulos <st...@lightbend.com> on 2019/05/24 16:14:18 UTC

dynamic allocation manager in SS

Hi,

Some while ago the streaming dynamic allocation part was added in DStreams(
https://issues.apache.org/jira/browse/SPARK-12133)  to improve the issues
with the batch based one. Should this be ported to structured streaming?
Thoughts?
AFAIK there is no support in SS for it.

Best,
Stavros

Re: dynamic allocation manager in SS

Posted by Igor Dvorzhak <id...@google.com.INVALID>.
Hello,

FYI, there are SPARK-24815
<https://issues.apache.org/jira/browse/SPARK-24815> JIRA for adding support
for Dynamic allocation support in Spark Streaming. We plan to work on this
over the summer.

Let's move design discussion to the JIRA so it will be easier to move it
forward.

Best regards,
Igor Dvorzhak


On Mon, May 27, 2019 at 9:41 AM Stavros Kontopoulos <
stavros.kontopoulos@lightbend.com> wrote:

> Sure im not talking about k8s here.
> The discussion is about the heuristics and their drawbacks.
>
> Στις Δευ, 27 Μαΐ 2019, 2:04 μ.μ. ο χρήστης Gabor Somogyi <
> gabor.g.somogyi@gmail.com> έγραψε:
>
>> K8s is a different story, please take a look at the doc "Future Work"
>> part.
>>
>> On Fri, May 24, 2019 at 9:40 PM Stavros Kontopoulos <
>> stavros.kontopoulos@lightbend.com> wrote:
>>
>>> Btw the heuristics for batch mode (
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L289)
>>> vs
>>> streaming (
>>> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L91-L92)
>>> are different. In batch mode you care about the numRunningOrPendingTasks while
>>> for streaming about the ratio: averageBatchProcTime.toDouble /
>>> batchDurationMs so there are some concerns beyond scaling down when
>>> idle.
>>> A scenario things might now work for batch dynamic allocation with SS is
>>> as follows. I start with a query that reads x kafka partitions and the data
>>> arriving is low and all tasks (1 per partition) are running since there are
>>> enough resources anyway.
>>> At some point the data increases per partition (maxOffsetsPerTrigger is
>>> high enough) and so processing takes more time. AFAIK SS will wait for a
>>> batch to finish before running the next (waits for the trigger to finish,
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L46
>>> ).
>>> In this case I suspect there is no scaling up with the batch dynamic
>>> allocation mode as there are no pending tasks, only processing time
>>> changed. In this case the streaming dynamic heuristics I think are better.
>>> Batch mode heuristics could work, if not mistaken, if you have multiple
>>> streaming queries and there are batches waiting (using fair-scheduling etc).
>>>
>>> PS. this has been discussed, not in depth, in the past on the list (
>>> https://mail-archives.apache.org/mod_mbox/spark-user/201708.mbox/%3C1503626484779-29104.post@n3.nabble.com%3E
>>> )
>>>
>>>
>>>
>>>
>>> On Fri, May 24, 2019 at 9:22 PM Stavros Kontopoulos <
>>> stavros.kontopoulos@lightbend.com> wrote:
>>>
>>>> I am on k8s where there is no support yet afaik, there is wip wrt the
>>>> shuffle service. So from your experience there are no issues with using the
>>>> batch dynamic allocation version like there was before with dstreams as
>>>> described in the related jira?
>>>>
>>>> Στις Παρ, 24 Μαΐ 2019, 8:28 μ.μ. ο χρήστης Gabor Somogyi <
>>>> gabor.g.somogyi@gmail.com> έγραψε:
>>>>
>>>>> It scales down with yarn. Not sure how you've tested.
>>>>>
>>>>> On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <
>>>>> stavros.kontopoulos@lightbend.com> wrote:
>>>>>
>>>>>> Yes nothing happens. In this case it could propagate info to the
>>>>>> resource manager to scale down the number of executors no? Just a thought.
>>>>>>
>>>>>> Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <
>>>>>> gabor.g.somogyi@gmail.com> έγραψε:
>>>>>>
>>>>>>> Structured Streaming works differently. If no data arrives no tasks
>>>>>>> are executed (just had a case in this area).
>>>>>>>
>>>>>>> BR,
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <
>>>>>>> stavros.kontopoulos@lightbend.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Some while ago the streaming dynamic allocation part was added in
>>>>>>>> DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to
>>>>>>>> improve the issues with the batch based one. Should this be ported
>>>>>>>> to structured streaming? Thoughts?
>>>>>>>> AFAIK there is no support in SS for it.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Stavros
>>>>>>>>
>>>>>>>>
>>>
>>> --
>>> Stavros Kontopoulos
>>> *Principal Engineer*
>>> *Lightbend Platform <https://www.lightbend.com/lightbend-platform>*
>>> *mob: **+30 6977967274 <+30+6977967274>*
>>>
>>>

Re: dynamic allocation manager in SS

Posted by Stavros Kontopoulos <st...@lightbend.com>.
Sure im not talking about k8s here.
The discussion is about the heuristics and their drawbacks.

Στις Δευ, 27 Μαΐ 2019, 2:04 μ.μ. ο χρήστης Gabor Somogyi <
gabor.g.somogyi@gmail.com> έγραψε:

> K8s is a different story, please take a look at the doc "Future Work" part.
>
> On Fri, May 24, 2019 at 9:40 PM Stavros Kontopoulos <
> stavros.kontopoulos@lightbend.com> wrote:
>
>> Btw the heuristics for batch mode (
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L289)
>> vs
>> streaming (
>> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L91-L92)
>> are different. In batch mode you care about the numRunningOrPendingTasks while
>> for streaming about the ratio: averageBatchProcTime.toDouble /
>> batchDurationMs so there are some concerns beyond scaling down when
>> idle.
>> A scenario things might now work for batch dynamic allocation with SS is
>> as follows. I start with a query that reads x kafka partitions and the data
>> arriving is low and all tasks (1 per partition) are running since there are
>> enough resources anyway.
>> At some point the data increases per partition (maxOffsetsPerTrigger is
>> high enough) and so processing takes more time. AFAIK SS will wait for a
>> batch to finish before running the next (waits for the trigger to finish,
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L46
>> ).
>> In this case I suspect there is no scaling up with the batch dynamic
>> allocation mode as there are no pending tasks, only processing time
>> changed. In this case the streaming dynamic heuristics I think are better.
>> Batch mode heuristics could work, if not mistaken, if you have multiple
>> streaming queries and there are batches waiting (using fair-scheduling etc).
>>
>> PS. this has been discussed, not in depth, in the past on the list (
>> https://mail-archives.apache.org/mod_mbox/spark-user/201708.mbox/%3C1503626484779-29104.post@n3.nabble.com%3E
>> )
>>
>>
>>
>>
>> On Fri, May 24, 2019 at 9:22 PM Stavros Kontopoulos <
>> stavros.kontopoulos@lightbend.com> wrote:
>>
>>> I am on k8s where there is no support yet afaik, there is wip wrt the
>>> shuffle service. So from your experience there are no issues with using the
>>> batch dynamic allocation version like there was before with dstreams as
>>> described in the related jira?
>>>
>>> Στις Παρ, 24 Μαΐ 2019, 8:28 μ.μ. ο χρήστης Gabor Somogyi <
>>> gabor.g.somogyi@gmail.com> έγραψε:
>>>
>>>> It scales down with yarn. Not sure how you've tested.
>>>>
>>>> On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <
>>>> stavros.kontopoulos@lightbend.com> wrote:
>>>>
>>>>> Yes nothing happens. In this case it could propagate info to the
>>>>> resource manager to scale down the number of executors no? Just a thought.
>>>>>
>>>>> Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <
>>>>> gabor.g.somogyi@gmail.com> έγραψε:
>>>>>
>>>>>> Structured Streaming works differently. If no data arrives no tasks
>>>>>> are executed (just had a case in this area).
>>>>>>
>>>>>> BR,
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <
>>>>>> stavros.kontopoulos@lightbend.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Some while ago the streaming dynamic allocation part was added in
>>>>>>> DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to
>>>>>>> improve the issues with the batch based one. Should this be ported
>>>>>>> to structured streaming? Thoughts?
>>>>>>> AFAIK there is no support in SS for it.
>>>>>>>
>>>>>>> Best,
>>>>>>> Stavros
>>>>>>>
>>>>>>>
>>
>> --
>> Stavros Kontopoulos
>> *Principal Engineer*
>> *Lightbend Platform <https://www.lightbend.com/lightbend-platform>*
>> *mob: **+30 6977967274 <+30+6977967274>*
>>
>>

Re: dynamic allocation manager in SS

Posted by Gabor Somogyi <ga...@gmail.com>.
K8s is a different story, please take a look at the doc "Future Work" part.

On Fri, May 24, 2019 at 9:40 PM Stavros Kontopoulos <
stavros.kontopoulos@lightbend.com> wrote:

> Btw the heuristics for batch mode (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L289)
> vs
> streaming (
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L91-L92)
> are different. In batch mode you care about the numRunningOrPendingTasks while
> for streaming about the ratio: averageBatchProcTime.toDouble /
> batchDurationMs so there are some concerns beyond scaling down when idle.
> A scenario things might now work for batch dynamic allocation with SS is
> as follows. I start with a query that reads x kafka partitions and the data
> arriving is low and all tasks (1 per partition) are running since there are
> enough resources anyway.
> At some point the data increases per partition (maxOffsetsPerTrigger is
> high enough) and so processing takes more time. AFAIK SS will wait for a
> batch to finish before running the next (waits for the trigger to finish,
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L46
> ).
> In this case I suspect there is no scaling up with the batch dynamic
> allocation mode as there are no pending tasks, only processing time
> changed. In this case the streaming dynamic heuristics I think are better.
> Batch mode heuristics could work, if not mistaken, if you have multiple
> streaming queries and there are batches waiting (using fair-scheduling etc).
>
> PS. this has been discussed, not in depth, in the past on the list (
> https://mail-archives.apache.org/mod_mbox/spark-user/201708.mbox/%3C1503626484779-29104.post@n3.nabble.com%3E
> )
>
>
>
>
> On Fri, May 24, 2019 at 9:22 PM Stavros Kontopoulos <
> stavros.kontopoulos@lightbend.com> wrote:
>
>> I am on k8s where there is no support yet afaik, there is wip wrt the
>> shuffle service. So from your experience there are no issues with using the
>> batch dynamic allocation version like there was before with dstreams as
>> described in the related jira?
>>
>> Στις Παρ, 24 Μαΐ 2019, 8:28 μ.μ. ο χρήστης Gabor Somogyi <
>> gabor.g.somogyi@gmail.com> έγραψε:
>>
>>> It scales down with yarn. Not sure how you've tested.
>>>
>>> On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <
>>> stavros.kontopoulos@lightbend.com> wrote:
>>>
>>>> Yes nothing happens. In this case it could propagate info to the
>>>> resource manager to scale down the number of executors no? Just a thought.
>>>>
>>>> Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <
>>>> gabor.g.somogyi@gmail.com> έγραψε:
>>>>
>>>>> Structured Streaming works differently. If no data arrives no tasks
>>>>> are executed (just had a case in this area).
>>>>>
>>>>> BR,
>>>>> G
>>>>>
>>>>>
>>>>> On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <
>>>>> stavros.kontopoulos@lightbend.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Some while ago the streaming dynamic allocation part was added in
>>>>>> DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to
>>>>>> improve the issues with the batch based one. Should this be ported
>>>>>> to structured streaming? Thoughts?
>>>>>> AFAIK there is no support in SS for it.
>>>>>>
>>>>>> Best,
>>>>>> Stavros
>>>>>>
>>>>>>
>
> --
> Stavros Kontopoulos
> *Principal Engineer*
> *Lightbend Platform <https://www.lightbend.com/lightbend-platform>*
> *mob: **+30 6977967274 <+30+6977967274>*
>
>

Re: dynamic allocation manager in SS

Posted by Stavros Kontopoulos <st...@lightbend.com>.
Btw the heuristics for batch mode (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L289)
vs
streaming (
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L91-L92)
are different. In batch mode you care about the numRunningOrPendingTasks while
for streaming about the ratio: averageBatchProcTime.toDouble /
batchDurationMs so there are some concerns beyond scaling down when idle.
A scenario things might now work for batch dynamic allocation with SS is as
follows. I start with a query that reads x kafka partitions and the data
arriving is low and all tasks (1 per partition) are running since there are
enough resources anyway.
At some point the data increases per partition (maxOffsetsPerTrigger is
high enough) and so processing takes more time. AFAIK SS will wait for a
batch to finish before running the next (waits for the trigger to finish,
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala#L46
).
In this case I suspect there is no scaling up with the batch dynamic
allocation mode as there are no pending tasks, only processing time
changed. In this case the streaming dynamic heuristics I think are better.
Batch mode heuristics could work, if not mistaken, if you have multiple
streaming queries and there are batches waiting (using fair-scheduling etc).

PS. this has been discussed, not in depth, in the past on the list (
https://mail-archives.apache.org/mod_mbox/spark-user/201708.mbox/%3C1503626484779-29104.post@n3.nabble.com%3E
)




On Fri, May 24, 2019 at 9:22 PM Stavros Kontopoulos <
stavros.kontopoulos@lightbend.com> wrote:

> I am on k8s where there is no support yet afaik, there is wip wrt the
> shuffle service. So from your experience there are no issues with using the
> batch dynamic allocation version like there was before with dstreams as
> described in the related jira?
>
> Στις Παρ, 24 Μαΐ 2019, 8:28 μ.μ. ο χρήστης Gabor Somogyi <
> gabor.g.somogyi@gmail.com> έγραψε:
>
>> It scales down with yarn. Not sure how you've tested.
>>
>> On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <
>> stavros.kontopoulos@lightbend.com> wrote:
>>
>>> Yes nothing happens. In this case it could propagate info to the
>>> resource manager to scale down the number of executors no? Just a thought.
>>>
>>> Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <
>>> gabor.g.somogyi@gmail.com> έγραψε:
>>>
>>>> Structured Streaming works differently. If no data arrives no tasks are
>>>> executed (just had a case in this area).
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <
>>>> stavros.kontopoulos@lightbend.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Some while ago the streaming dynamic allocation part was added in
>>>>> DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to
>>>>> improve the issues with the batch based one. Should this be ported to
>>>>> structured streaming? Thoughts?
>>>>> AFAIK there is no support in SS for it.
>>>>>
>>>>> Best,
>>>>> Stavros
>>>>>
>>>>>

-- 
Stavros Kontopoulos
*Principal Engineer*
*Lightbend Platform <https://www.lightbend.com/lightbend-platform>*
*mob: **+30 6977967274 <+30+6977967274>*

Re: dynamic allocation manager in SS

Posted by Stavros Kontopoulos <st...@lightbend.com>.
I am on k8s where there is no support yet afaik, there is wip wrt the
shuffle service. So from your experience there are no issues with using the
batch dynamic allocation version like there was before with dstreams as
described in the related jira?

Στις Παρ, 24 Μαΐ 2019, 8:28 μ.μ. ο χρήστης Gabor Somogyi <
gabor.g.somogyi@gmail.com> έγραψε:

> It scales down with yarn. Not sure how you've tested.
>
> On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <
> stavros.kontopoulos@lightbend.com> wrote:
>
>> Yes nothing happens. In this case it could propagate info to the resource
>> manager to scale down the number of executors no? Just a thought.
>>
>> Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <
>> gabor.g.somogyi@gmail.com> έγραψε:
>>
>>> Structured Streaming works differently. If no data arrives no tasks are
>>> executed (just had a case in this area).
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <
>>> stavros.kontopoulos@lightbend.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Some while ago the streaming dynamic allocation part was added in
>>>> DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to
>>>> improve the issues with the batch based one. Should this be ported to
>>>> structured streaming? Thoughts?
>>>> AFAIK there is no support in SS for it.
>>>>
>>>> Best,
>>>> Stavros
>>>>
>>>>

Re: dynamic allocation manager in SS

Posted by Gabor Somogyi <ga...@gmail.com>.
It scales down with yarn. Not sure how you've tested.

On Fri, 24 May 2019, 19:10 Stavros Kontopoulos, <
stavros.kontopoulos@lightbend.com> wrote:

> Yes nothing happens. In this case it could propagate info to the resource
> manager to scale down the number of executors no? Just a thought.
>
> Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <
> gabor.g.somogyi@gmail.com> έγραψε:
>
>> Structured Streaming works differently. If no data arrives no tasks are
>> executed (just had a case in this area).
>>
>> BR,
>> G
>>
>>
>> On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <
>> stavros.kontopoulos@lightbend.com> wrote:
>>
>>> Hi,
>>>
>>> Some while ago the streaming dynamic allocation part was added in
>>> DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to improve
>>> the issues with the batch based one. Should this be ported to
>>> structured streaming? Thoughts?
>>> AFAIK there is no support in SS for it.
>>>
>>> Best,
>>> Stavros
>>>
>>>

Re: dynamic allocation manager in SS

Posted by Stavros Kontopoulos <st...@lightbend.com>.
Yes nothing happens. In this case it could propagate info to the resource
manager to scale down the number of executors no? Just a thought.

Στις Παρ, 24 Μαΐ 2019, 7:17 μ.μ. ο χρήστης Gabor Somogyi <
gabor.g.somogyi@gmail.com> έγραψε:

> Structured Streaming works differently. If no data arrives no tasks are
> executed (just had a case in this area).
>
> BR,
> G
>
>
> On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <
> stavros.kontopoulos@lightbend.com> wrote:
>
>> Hi,
>>
>> Some while ago the streaming dynamic allocation part was added in
>> DStreams(https://issues.apache.org/jira/browse/SPARK-12133)  to improve
>> the issues with the batch based one. Should this be ported to structured
>> streaming? Thoughts?
>> AFAIK there is no support in SS for it.
>>
>> Best,
>> Stavros
>>
>>

Re: dynamic allocation manager in SS

Posted by Gabor Somogyi <ga...@gmail.com>.
Structured Streaming works differently. If no data arrives no tasks are
executed (just had a case in this area).

BR,
G


On Fri, 24 May 2019, 18:14 Stavros Kontopoulos, <
stavros.kontopoulos@lightbend.com> wrote:

> Hi,
>
> Some while ago the streaming dynamic allocation part was added in DStreams(
> https://issues.apache.org/jira/browse/SPARK-12133)  to improve the issues
> with the batch based one. Should this be ported to structured streaming?
> Thoughts?
> AFAIK there is no support in SS for it.
>
> Best,
> Stavros
>
>