You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/06/01 23:49:17 UTC

[jira] [Commented] (FLINK-2119) Add ExecutionGraph support for leg-wise scheduling

    [ https://issues.apache.org/jira/browse/FLINK-2119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14568087#comment-14568087 ] 

ASF GitHub Bot commented on FLINK-2119:
---------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/754

    [FLINK-2119] Add ExecutionGraph support for batch scheduling

    This PR adds support for a newly introduced scheduling mode `BATCH_FROM_SOURCES`. The goal for me was to make this change *minimally invasive* in order to not touch too much core code shortly before the release.
    
    Essentially, this only touches two parts of the codebase: the scheduling action for blocking results and the job vertices.
    
    If you set the scheduling mode to `BATCH_FROM_SOURCES`, you can manually configure which input vertices are used as the sources when scheduling (`setAsBatchSource`). You can then manually specify the successor vertices (`addBatchSuccessor`), which are scheduled after the blocking results are finished. When there are no successors specified manually, the result consumers are scheduled as before. Mixing pipelined and blocking results leads to unspecified behaviour currently (aka it's not a good idea to do this at the moment).
    
    When you have something like this:
    ```
            O sink
            |
            . <------------- denotes a pipelined result
            O union
      +----´|`----+
      |     |     |
      ■     ■     ■ <------- denotes a blocking result
      O     O     O
     src0  src1  src2
    ```
    You can first first schedule `src0`, `src1`, `src2`, and then continue with the `union-sink` pipeline.
    
    ```java
    src[0].setAsBatchSource(); // src0 is the first to go...
    
    src[0].addBatchSuccessors(src[1]); // src0 => src1
    
    src[1].addBatchSuccessors(src[2]); // src1 => src2
    
    src[2].addBatchSuccessors(union); // src2 => [union => sink]
    ```
    
    @StephanEwen or @tillrohrmann will work on the Optimizer/JobGraph counterpart of this and will build the `JobGraph` for programs in batch mode using the methods introduced in this PR. Do you guys think that this minimal support is sufficient for the first version?
    
    (Going over the result partition notification code, I really think it's pressing to refactor it. It is very very hard to understand. The corresponding issue [FLINK-1833](https://issues.apache.org/jira/browse/FLINK-1833) has been created a while back. I want to do this after the release.)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/incubator-flink legs-2119

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/754.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #754
    
----
commit 4ac15982700257d3deb2d55a389afd0531f7f8be
Author: Ufuk Celebi <uc...@apache.org>
Date:   2015-06-01T21:12:47Z

    [FLINK-2119] Add ExecutionGraph support for batch scheduling

----


> Add ExecutionGraph support for leg-wise scheduling
> --------------------------------------------------
>
>                 Key: FLINK-2119
>                 URL: https://issues.apache.org/jira/browse/FLINK-2119
>             Project: Flink
>          Issue Type: Improvement
>          Components: Scheduler
>    Affects Versions: master
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>
> Scheduling currently happens by lazily unrolling the ExecutionGraph from the sources.
> 1. All sources are scheduled for execution.
> 2. Their results trigger scheduling and deployment of the receiving tasks (either on the first available buffer or when all are produced [pipelined vs. blocking exchange]).
> For certain batch jobs this can be problematic as many tasks will be running at the same time and consume task manager resources like executionslots and memory. For these jobs, it is desirable to schedule the ExecutionGraph in with different strategies.
> With respect to the ExecutionGraph, the current limitation is that data availability for a result always triggers scheduling of the consuming tasks. This needs to be more general to allow different scheduling strategies.
> Consider the following example:
> {code}
>           [ union ]
>          /         \
>         /           \
>   [ source 1 ]  [ source 2 ]
> {code}
> Currently, both sources are scheduled concurrently and the "faster" one triggers scheduling of the union. It is desirable to first allow source 1 to completly produce its result, then trigger scheduling of source 2, and only then schedule the union.
> The required changes in the ExecutionGraph are conceptually straight-forward: instead of going through the list of result consumers and scheduling them, we need to be able to run a more general action. For normal operation, this will still schedule the consumer task, but we can also configure it to kick of the next source etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)