You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Konstantin Knauf <ko...@tngtech.com> on 2017/01/04 12:30:34 UTC

Question about Scheduling of Batch Jobs

Hi everyone,

I have a basic question regarding scheduling of batch programs. Let's
take the following graph:

          -> Group Combine -> ...
        /
Source ----> Group Combine -> ...
        \
          -> Map -> ...

So, a source and followed by three operators with ship strategy
"Forward" and exchange mode "pipelined".

The three flows are later joined again, so that this results in a single
job.

When the job is started, first, only one of the operators immediately
receive the input read by the source and can therefore run concurrently
with the source. Once the source is finished, the other two operators
are scheduled.

Two questions about this:

1) Why doesn't the source forward the records to all three operators
while still running?
2) How does the jobmanager decide, which of the three operators
receivese the pipelined data first?

Cheers and Thanks,

Konstantin


-- 
Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Question about Scheduling of Batch Jobs

Posted by Konstantin Knauf <ko...@tngtech.com>.
Hi Fabian,

I see, thank's for the quick explanation.

Cheers,

Konstantin


On 04.01.2017 14:15, Fabian Hueske wrote:
> Hi Konstantin,
> 
> the DataSet API tries to execute all operators as soon as possible.
> 
> I assume that in your case, Flink does not do this because it tries to
> avoid a deadlock.
> A dataflow which replicates data from the same source and joins it again
> might get deadlocked because all pipelines need to make progress in
> order to finish the source.
> 
> Think of a simple example like this:
> 
>            /-- Map1 --\
> Src --<                  >-Join
>            \-- Map2 --/
> 
> If the join is executed as a hash join, one input (Map1) is used to
> build a hash table. Only once the hash table is built, the other input
> (Map2) can be consumed.
> If both Map operators would run at the same time, Map2 would stall at
> some point because it cannot emit anymore data due to the backpressure
> of the not-yet-opened probe input of the hash join.
> Once Map2 stalls, the Source would stall and Map1 could not continue to
> finish the build side. At this point we have a deadlock.
> 
> Flink detects these situations and adds an artificial pipeline breaker
> in the dataflow to prevent deadlocks. Due to the pipeline breaker, the
> build side is completed before the probe side input is processed.
> 
> This also answers the question, which operator is executed first: the
> operator on the build side of the first join. Hence the join strategy of
> the optimizer (BUILD_FIRST, BUILD_SECONS) decides.
> You can also give a manual JoinHint to control that. If you give a
> SORT_MERGE hint, all three operators should run concurrently because
> both join input will be concurrently consumed for sorting.
> 
> Best, Fabian
> 
> 
> 2017-01-04 13:30 GMT+01:00 Konstantin Knauf
> <konstantin.knauf@tngtech.com <ma...@tngtech.com>>:
> 
>     Hi everyone,
> 
>     I have a basic question regarding scheduling of batch programs. Let's
>     take the following graph:
> 
>               -> Group Combine -> ...
>             /
>     Source ----> Group Combine -> ...
>             \
>               -> Map -> ...
> 
>     So, a source and followed by three operators with ship strategy
>     "Forward" and exchange mode "pipelined".
> 
>     The three flows are later joined again, so that this results in a single
>     job.
> 
>     When the job is started, first, only one of the operators immediately
>     receive the input read by the source and can therefore run concurrently
>     with the source. Once the source is finished, the other two operators
>     are scheduled.
> 
>     Two questions about this:
> 
>     1) Why doesn't the source forward the records to all three operators
>     while still running?
>     2) How does the jobmanager decide, which of the three operators
>     receivese the pipelined data first?
> 
>     Cheers and Thanks,
> 
>     Konstantin
> 
> 
>     --
>     Konstantin Knauf * konstantin.knauf@tngtech.com
>     <ma...@tngtech.com> * +49-174-3413182
>     <tel:%2B49-174-3413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 

-- 
Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Question about Scheduling of Batch Jobs

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Konstantin,

the DataSet API tries to execute all operators as soon as possible.

I assume that in your case, Flink does not do this because it tries to
avoid a deadlock.
A dataflow which replicates data from the same source and joins it again
might get deadlocked because all pipelines need to make progress in order
to finish the source.

Think of a simple example like this:

           /-- Map1 --\
Src --<                  >-Join
           \-- Map2 --/

If the join is executed as a hash join, one input (Map1) is used to build a
hash table. Only once the hash table is built, the other input (Map2) can
be consumed.
If both Map operators would run at the same time, Map2 would stall at some
point because it cannot emit anymore data due to the backpressure of the
not-yet-opened probe input of the hash join.
Once Map2 stalls, the Source would stall and Map1 could not continue to
finish the build side. At this point we have a deadlock.

Flink detects these situations and adds an artificial pipeline breaker in
the dataflow to prevent deadlocks. Due to the pipeline breaker, the build
side is completed before the probe side input is processed.

This also answers the question, which operator is executed first: the
operator on the build side of the first join. Hence the join strategy of
the optimizer (BUILD_FIRST, BUILD_SECONS) decides.
You can also give a manual JoinHint to control that. If you give a
SORT_MERGE hint, all three operators should run concurrently because both
join input will be concurrently consumed for sorting.

Best, Fabian


2017-01-04 13:30 GMT+01:00 Konstantin Knauf <ko...@tngtech.com>:

> Hi everyone,
>
> I have a basic question regarding scheduling of batch programs. Let's
> take the following graph:
>
>           -> Group Combine -> ...
>         /
> Source ----> Group Combine -> ...
>         \
>           -> Map -> ...
>
> So, a source and followed by three operators with ship strategy
> "Forward" and exchange mode "pipelined".
>
> The three flows are later joined again, so that this results in a single
> job.
>
> When the job is started, first, only one of the operators immediately
> receive the input read by the source and can therefore run concurrently
> with the source. Once the source is finished, the other two operators
> are scheduled.
>
> Two questions about this:
>
> 1) Why doesn't the source forward the records to all three operators
> while still running?
> 2) How does the jobmanager decide, which of the three operators
> receivese the pipelined data first?
>
> Cheers and Thanks,
>
> Konstantin
>
>
> --
> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>