You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2014/12/08 02:16:12 UTC

[jira] [Commented] (FLINK-986) Add intermediate results to distributed runtime

    [ https://issues.apache.org/jira/browse/FLINK-986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14237344#comment-14237344 ] 

ASF GitHub Bot commented on FLINK-986:
--------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/incubator-flink/pull/254

    [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results

    ## FLINK-25: Offer buffer-oriented runtime abstraction
    
    - The distributed runtime API was originally record-oriented. The output side has previously been refactored to a buffer-oriented API (`BufferWriter`). With this pull request, the input side offers a similar interface: `BufferReader`.
    
    - This enables us to directly work on the serialized data in the future. Previously, this was impossible as buffers were directly deserialized after being handed over to an input channel.
    
    - Currently, the buffer-oriented readers and writers are always wrapped by record-oriented readers and writers. The deserialization logic of the input channels has essentially moved to the readers.
    
    - The way of registering I/O has changed. The life-cycle of each task (see `AbstractInvokable`) involves two methods: `registerInputOutput()` and `invoke()`. The I/O setup was coupled with the creation of readers/writers in `registerInputOutput()` although the required information is independent of this Therefore, they are now directly instantiated by the runtime environment and can be accessed via `getEnvironment().getReader(int)` and `getEnvironment().getWriter(int)` respectively.
    
    ## FLINK-986: Add initial support for intermediate results
    
    - This commit introduces the abstraction of intermediate results into the distributed runtime. It is essentially the runtime-part of @StephanEwen's rework of the scheduler/execution graph ([FLINK-1094](https://issues.apache.org/jira/browse/FLINK-1094)), which made the scheduler/execution graph aware of intermediate results. Job graphs used to essentially look like this: `Source => Task i => ... => Sink`. With Stephan's rework, they already look like this: `Task:Intermediate Result <= Task i:Intermediate Result <= ... <= Task:Intermediate Result`. Tasks produce intermediate results and attach to them for consumption. These changes logically decoupled produced results and in the job graph. At the runtime level, the intermediate results were still tightly coupled to the producing and consuming task. With this pull request, the producing task, the produced intermediate result, and the consuming task are decoupled as well.
    
    - Previously, the network stack was responsible to package buffers and immediately dispatch them to their receivers in a push-fashion. The first buffer for a receiver resulted in the scheduling of this receiver. With the new model, we gain much more flexibilty in the way we can produce and consume these intermediate results. The current state is feature equivalent with the current master, e.g. the produced intermediate result is pipelined and not persistent. I ran a few performance comparisons and the performance was in the same ball park as the current master although it was (consistently) slightly slower.
    
    ### Changes
    
    #### Buffer management/The life of a Buffer
    
    - The buffers for the network stack are allocated by each task manager at start up time on the heap by the `NetworkBufferPool` (previously `GlobalBufferPool`). These network buffers are divided uniformly at runtime among the tasks. The current default for this pool is set to 2048 buffers (each 32k bytes), resulting in a total of 64 MB. Intermediate results get their buffers from this buffer pool. Therefore, for any in-memory persistent data set to make sense, we would have to give the network pool a lot more memory. I think that this is currently not desirable as our memory management is static, e.g. memory given to the network pool would be missing at operators like sorting or hashing. In my opinion, adaptive memory management ([FLINK-1101](https://issues.apache.org/jira/browse/FLINK-1101)) is therefore a requirement before continuing with more runtime intermediate result variants (see below).
    
    - The buffer management for the pipelined intermediate results has essentially not changed. We need at least a single buffer per outgoing channel (or queue in the terms of the new runtime). When a task is submitted to a task manager, the network buffers are divided as follows: a single buffer pool **per produced result** (previously one for all produced results) and one pool **per consumed result** (unchanged).
    
    - A produced buffer used to follow the following high-level path: `BufferWriter => ChannelManager => NETWORK xor local InputChannel`. Now it is `BufferWriter => IntermediateResult`. What happens at the intermediate result is flexible depending on the `IntermediateResultType`. Early discussions with @StephanEwen suggested the following three dimensions:
    
      1. persistent/ephemeral,
      2. pipelined/blocking, and
      3. back pressure/no back pressure.
    
    The current state offers an *ephemeral-pipelined-back pressure* implementation, which is what we currently have. (I have removed code for a *persistent-blocking-no back pressure* variant for now.)
    
    #### Intermediate results, intermediate result partitions and intermediate result partition queues
    
    - As noted above, tasks at the job graph level are associated with intermediate results. When scheduling these graphs, the intermediate result is divided into result partitions (associated with execution vertices). Each parallel sub task (execution vertex) produces one partition of this result (it produces possibly multiple intermediate results).
    
    - At the runtime level, these partitions are further divided into queues depending on the degree of parallelism. For example a map-reduce with degree of parallelism of 2 might look like this at the map side:
    
    ```
                                                 +---------+
    +-------+               +-------------+  +=> | Queue 1 |
    | Map 1 | = produces => | Partition 1 | =|   +---------+
    +-------+               +-------------+  +=> | Queue 2 |
                                                 +---------+
    
                                                 +---------+
    +-------+               +-------------+  +=> | Queue 1 |
    | Map 2 | = produces => | Partition 2 | =|   +---------+
    +-------+               +-------------+  +=> | Queue 2 |
                                                 +---------+
    ```
    
    #### Scheduling of consumers and receiver-initiated sending
    
    - Depending on the intermediate result type, it is necessary to deploy the consumers either when the first buffer is produced or after all buffers have been produced (depending on the pipelined vs. blocking type). The receivers then request the respective intermediate result partition queue from the task managers where the partitions were produced. For a data repartitioning as in a map-reduce, each sub task would request the queue matching its subtask number, e.g.:
    
    ```
                                                 +---------+                         +----------+
    +-------+               +-------------+  +=> | Queue 1 | <=======+=== requests = | Reduce 1 |
    | Map 1 | = produces => | Partition 1 | =|   +---------+         |               +----------+
    +-------+               +-------------+  +=> | Queue 2 | <==+    |
                                                 +---------+    |    |
                                                                |    |
                                                 +---------+    |    |
    +-------+               +-------------+  +=> | Queue 1 | <==+====+
    | Map 2 | = produces => | Partition 2 | =|   +---------+    |                    +----------+
    +-------+               +-------------+  +=> | Queue 2 | <==+======== requests = | Reduce 2 |
                                                 +---------+                         +----------+
    ```
    
    If the partitioning is not known or a task has no known consumers, the partition will consist of a single queue (or the default degree of parallelism). When it is consumed, a re-partitioning task needs to be inserted.
    
    - The scheduling of consumers requires notifications to the central job manager, which can then deploy the consumers. For a single produced *pipelined* intermediate result, this results in `number of produced partitions` RPC calls to the job manager (see `ConsumerNotificationProtocol`). The consumer tasks can then be deployed with the necessary information to request the respective queues. This results in `number of consumer sub tasks` task deployment RPC calls. The first notification from a task manager results in the deployment of the receiver. It is possible that not all producer task locations are known at deployment time of a consumer task. At the moment, we follow the naive approach of sending task update RPC calls with information about the location of the producer for every partition. This currently results in a total of `N * M` RPC calls per produced intermediate result (where `N` is the number of produced partitions and `M` the number of consumer tasks).
    
    - For blocking results, this can be reduced to `2*N` as we are guaranteed to know all producer locations when all partitions are finished. It should also be possible to further reduce this number to the number of task managers by collecting the notifications per task manager and then sending them to the job manager at once.
    
    #### More robust shuffle
    
    - Shuffles were previously managed by a large per task manager component called the `ChannelManager`, which kept track of all channels. This component was huge and responsible for many different things (network buffers, channel lookup, buffer routing, TCP connections). This has been re-factored to more localized components, where resource acquisition and release can be handled in the same local component, which is in my opinion much easier to follow and understand.
    
    - In particular, we had issues with the release of resources like channel lookup tables or TCP connections, which have been addressed with this pull request as well. Especially the Netty-based network code was not reliable in the way it propagated errors to the tasks and released all resources. Addressing this was a requirement for adding reliable fault-tolerance mechanisms in the future.
    
    ### Next steps
    
    The goal is to merge the current state as soon as possible and then start to extend (I've removed some code, which was already in place). I could have done this way earlier and intend to **not** make that mistake again.
    
    1. I've only tested on a small custer (4 nodes) with a low degree of parallelism (32). I have to test further on a larger cluster with a higher degree of parallelism. I also didn't test iterative programs yet. After doing this, I will also post the numbers.
    2. I've already discovered a few problems in corner cases, which I need to fix before we review this.
    3. Some tests are failing and I've disabled a few others, which I will adjust and enable again.
    
    I intend to address these points over the course of this week and work on the branch of this pull request.
    
    After I've addressed the above points, I find it reasonable to work on reducing the number of redundant RPC calls and introducing the *persistent-blocking-no back pressure* intermediate result partition variant.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/incubator-flink flink-986-runtime_intermediate_results

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #254
    
----
commit 5e879773ed214880b0108238765761cc4391281f
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-10-08T09:40:31Z

    [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results

commit 2a57f078ee6a16e55be44f5c6ee6c3eb66abf5a3
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-05T15:27:45Z

    [Distributed runtime] Move BroadcastVariableManager to NetworkEnvironment

commit 347eaf9c0000ff59a1bf92551fe53cc9a2d481d3
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-05T15:28:54Z

    [Distributed runtime] Add TODO for intermediate result consumer notification RPC calls

commit 00bad69044b801a280fe3dde18671f4ddf184d21
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-05T15:43:05Z

    [Distributed runtime] Log number of update task RPC calls

commit cabd126bc726443770089628036dacae8d06f776
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-05T16:29:58Z

    [Distributed runtime] Fix possible error during release of resources in RuntimeEnvironment

commit d46d2961bba82c53ad1c24419a34584c78e80be3
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-05T20:37:15Z

    [Distributed runtime] Minor code cleanup

commit a77a713fa898b4eeaacc19ef4b0758d89e3dbbf9
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-06T12:50:48Z

    [Distributed runtime] Fix checkstyle and visibility errors

commit 491f2f4cb395074669116ed6fd4b8cab810a53fc
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-06T13:05:37Z

    [Distributed runtime] Use ConcurrentLinkedQueue instead of Deque, which is only available since Java 1.7

commit 27ed29baf79d655f5e72321f2446428a6e92ab59
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-06T13:21:39Z

    [Distributed runtime] Add lock to iterator subscription to prevent possible deadlock

commit 0feca10fb19d8de18eaca35a64c4a08ab30ed1c4
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-07T13:17:09Z

    [Distributed runtime] Fix lifecycle of intermediate result partitions

commit 3d6a26a72fc76e21f936390f3f07ddad8f661069
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-07T13:42:34Z

    [Distributed runtime] Throw proper exception when illegal queue request

commit 200626b6528bf3dc55b29955adc437e18ce64561
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-07T15:24:46Z

    [Distributed runtime] [Tests] Verify buffer reycled when Buffer decoded

commit b049b3190887d6a5e989f51afa89b8e20ebe2614
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-07T15:59:06Z

    [Distributed runtime] Log debug output of partition queue handler

commit 2f9638ba7d84796673913aab847538df96fabc08
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-07T17:01:26Z

    [Distributed runtime] Make BufferReader and BufferWriter final

commit 0a42dfac5517b11f378deb31c78a0111c2e8faaa
Author: Ufuk Celebi <uc...@apache.org>
Date:   2014-12-07T20:01:11Z

    [Distributed runtime] Log debug output of client request handler

----


> Add intermediate results to distributed runtime
> -----------------------------------------------
>
>                 Key: FLINK-986
>                 URL: https://issues.apache.org/jira/browse/FLINK-986
>             Project: Flink
>          Issue Type: New Feature
>          Components: Distributed Runtime
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>            Priority: Blocker
>
> Support for intermediate results in the runtime is currently blocking different efforts like fault tolerance or result collection at the client.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)