You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/08/04 18:41:04 UTC

[jira] [Commented] (FLINK-2398) Decouple StreamGraph Building from the API

    [ https://issues.apache.org/jira/browse/FLINK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14653942#comment-14653942 ] 

ASF GitHub Bot commented on FLINK-2398:
---------------------------------------

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/988

    [FLINK-2398][api-breaking] Introduce StreamGraphGenerator

    This decouples the building of the StreamGraph from the API methods.
    Before, the methods would build the StreamGraph as they go. Now the API
    methods build a hierachy of StreamTransformation nodes. From these a
    StreamGraph is generated upon execution.
    
    This also introduces some API breaking changes:
    
     - The result of methods that create sinks is now DataStreamSink instead
       of DataStream
     - Iterations cannot have feedback edges with differing parallelism
     - "Preserve partitioning" is not the default for feedback edges. The
       previous option for this is removed.
     - You can close an iteration several times, no need for a union.
     - Strict checking of whether partitioning and parallelism work
       together. I.e. if upstream and downstream parallelism don't match it
       is not legal to have Forward partitioning anymore. This was not very
       transparent: When you went from low parallelism to high dop some
       downstream  operators would never get any input. When you went from high
       parallelism to low dop you would get skew in the downstream operators
       because all elements that would be forwarded to an operator that is not
       "there" go to another operator. This requires insertion of global()
       or rebalance() in some places. For example with most sources which
       have parallelism one.
    
    This is from the Javadoc of StreamTransformation, it describes quite well how it works:
    ```
    A {@code StreamTransformation} represents the operation that creates a
     * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every
     * {@link org.apache.flink.streaming.api.datastream.DataStream} has an underlying
     * {@code StreamTransformation} that is the origin of said DataStream.
     *
     * <p>
     * API operations such as {@link org.apache.flink.streaming.api.datastream.DataStream#map} create
     * a tree of {@code StreamTransformation}s underneath. When the stream program is to be executed this
     * graph is translated to a {@link StreamGraph} using
     * {@link org.apache.flink.streaming.api.graph.StreamGraphGenerator}.
     *
     * <p>
     * A {@code StreamTransformation} does not necessarily correspond to a physical operation
     * at runtime. Some operations are only logical concepts. Examples of this are union,
     * split/select data stream, partitioning.
     *
     * <p>
     * The following graph of {@code StreamTransformations}:
     *
     * <pre>
     *   Source              Source        
     *      +                   +           
     *      |                   |           
     *      v                   v           
     *  Rebalance          HashPartition    
     *      +                   +           
     *      |                   |           
     *      |                   |           
     *      +------>Union<------+           
     *                +                     
     *                |                     
     *                v                     
     *              Split                   
     *                +                     
     *                |                     
     *                v                     
     *              Select                  
     *                +                     
     *                v                     
     *               Map                    
     *                +                     
     *                |                     
     *                v                     
     *              Sink 
     * </pre>
     *
     * Would result in this graph of operations at runtime:
     *
     * <pre>
     *  Source              Source
     *    +                   +
     *    |                   |
     *    |                   |
     *    +------->Map<-------+
     *              +
     *              |
     *              v
     *             Sink
     * </pre>
     *
     * The information about partitioning, union, split/select end up being encoded in the edges
     * that connect the sources to the map operation.
    ```
    
    I still have to fix the Scala examples, but you can already comment on the overall idea and implementation.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink stream-api-rework

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/988.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #988
    
----
commit dce02be23fc98390b8b0b98f02ad1dd69be30d4c
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-07-23T13:12:38Z

    [FLINK-2398][api-breaking] Introduce StreamGraphGenerator
    
    This decouples the building of the StreamGraph from the API methods.
    Before the methods would build the StreamGraph as they go. Now the API
    methods build a hierachy of StreamTransformation nodes. From these a
    StreamGraph is generated upon execution.
    
    This also introduces some API breaking changes:
    
     - The result of methods that create sinks is now DataStreamSink instead
       of DataStream
     - Iterations cannot have feedback edges with differing parallelism
     - "Preserve partitioning" is not the default for feedback edges. The
       previous option for this is removed.
     - You can close an iteration several times, no need for a union.
     - Strict checking of whether partitioning and parallelism work
       together. I.e. if upstream and downstream parallelism don't match it
       is not legal to have Forward partitioning anymore. This was not very
       transparent: When you went from low parallelism to high dop some
       downstream  operators would never get any input. When you went from high
       parallelism to low dop you would get skew in the downstream operators
       because all elements that would be forwarded to an operator that is not
       "there" go to another operator. This requires insertion of global()
       or rebalance() in some places. For example with most sources which
       have parallelism one.

----


> Decouple StreamGraph Building from the API
> ------------------------------------------
>
>                 Key: FLINK-2398
>                 URL: https://issues.apache.org/jira/browse/FLINK-2398
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> Currently, the building of the StreamGraph is very intertwined with the API methods. DataStream knows about the StreamGraph and keeps track of splitting, selected names, unions and so on. This leads to the problem that is is very hard to understand how the StreamGraph is built because the code that does it is all over the place. This also makes it hard to extend/change parts of the Streaming system.
> I propose to introduce "Transformations". A transformation hold information about one operation: The input streams, types, names, operator and so on. An API method creates a transformation instead of fiddling with the StreamGraph directly. A new component, the StreamGraphGenerator creates a StreamGraph from the tree of transformations that result from program specification using the API methods. This would relieve DataStream from knowing about the StreamGraph and makes unions, splitting, selection visible transformations instead of being scattered across the different API classes as fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)