You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by aljoscha <gi...@git.apache.org> on 2016/07/26 22:18:07 UTC

[GitHub] incubator-beam pull request #737: Add Side-Input Support in Flink Streaming ...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/incubator-beam/pull/737

    Add Side-Input Support in Flink Streaming Runner

    This does several things in somewhat isolated steps:
     - Unify the operator wrappers. Before there where 4 different DoFn/Window operator wrappers with code duplication. I unified this to make the implementation of side inputs for both `DoFn` and `CombineFnWithContext` easier.
     - Change `StateInternals` to use a Flink `AbstractStateBackend` internally. This is essentially the Flink version of `StateInternals` and moving on top of this will allow us to benefit from parallelism rescaling in the future. Also, it reduces the amount of code we have in the runner because we did all state serialization basically by hand before.
     - Add a generic `SideInputHandler` that uses `StateInternals` to keep the side input data as well as meta data. This is in the runners-core package and should be reusable for other runner implementations. The implementation is rather simple. The complicated part was all the plumbing and translation around it.
     - Change `ParDoTest.TestDoFnWithContext` to allow `DoFn` reuse. I think this might have been overlooked in the recent change to allow `DoFn` reuse. @tgroh ?
     - Change `PAssert` checker `DoFns` to not suppress exceptions in streaming mode. Otherwise the tests will not fail for the Flink runner when expected. Not sure if this is possible because the Dataflow runner needs it like that.
    
    The rest of the changes are aimed at making the `RunnableOnService` tests execute without errors on the streaming runner, all of the commits contain a lot of small fixes towards that goal.
    
    There is still the issue of side-input data not being garbage collected but I think this is a general problem and not specific to the Flink runner.
    
    There are still two failing tests but I think the problem might actually be with the tests \U0001f609:
     - `CombineTest.testGlobalCombineWithDefaultsAndTriggers` fails with
    ```
    Expected: a collection containing "2: true"
         but: was "1: true"
    ```
    I think this test works for the `DirectRunner` because the two input elements are all processed in one bundle and the Trigger is only asked after the bundle was processed whether we should fire. This produces the expected result. If you have a runner that asks the trigger after every element whether it should fire this means that the trigger fires after seeing the first element. Afterwards it goes into finished state and the pane is dropped and we never emit a pane at GC time as I would expect with ACCUMULATING mode. The relevant code is in `ReduceFnRunner.emitIfAppropriate`. `isFinished` will be true for the count trigger which in turn means `shouldDiscard` will be true.
     - `CombineTest.testHotKeyCombiningWithAccumulationMode` this test fails because it only expects one pane do be emitted while a tuple-at-a-time runner such as the Flink streaming runner will emit a pane for every element. For the `DirectRunner` it again works because all the elements are processed in one bundle and the trigger only fires once.
    
    R: @mxm 
    R: @tgroh for the change to the `ParDoTest` test and also overall
    R: @kennknowles For the `SideInputHandler` and also overall, turns out this piece was the easiest to do in all of this ... 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/incubator-beam flink/side-input-streaming

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-beam/pull/737.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #737
    
----
commit 11e685dd379ab41fe04392697b87687e297040e5
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-06-11T08:55:55Z

    Fix Emission in startBundle/finishBundle in Flink Wrappers

commit b647e97b2083d7d7c508f58ebad83e7cac2f3dbf
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-06-11T09:42:12Z

    [BEAM-253] Unify Flink-Streaming Operator Wrappers

commit ef9e7c480b55764efd03225109819d5a959c825b
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-07-08T16:42:00Z

    Replace Flink StateInternals by Partitioned StateInternals

commit f7c09c9021f57d0dbc61534d81c82af7fa9e1ba3
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-07-10T15:01:38Z

    Fix Checkstyle Errors in FlinkStreamingTransformTranslators

commit 8203d4f3308d27372c5c31ad4923d4b1ae2662d4
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-07-11T12:08:35Z

    [BEAM-102] Add Side Inputs in Flink Streaming Runner
    
    This adds a generic SideInputHandler in runners-core that is only used
    by the Flink runner right now but can be used by other runner
    implementations.

commit dc1376bf66d496a5ecad7043ddb1eb297d660eca
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-07-23T08:48:42Z

    Allow DoFn Reuse in ParDoTest.TestDoFnWithContext

commit 6e2286ed91b860d36b2a7c698094a9d1c7dc4634
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-07-24T10:04:28Z

    Don't Suppress Throwable in PAssert in Streaming Mode

commit de52aaee2672cc730ae0dcd237247dbbd0db625e
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-07-26T12:02:00Z

    Enable Flink Streaming Runner RunnableOnService tests

commit 29cfd7621c2ee1a3d8258e7956f81e6b6504f4f3
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-07-26T21:46:19Z

    Fixups after rebase on top of StateInternalsFactory Change

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-beam pull request #737: Add Side-Input Support in Flink Streaming ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-beam/pull/737


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---