You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mohamagdy <gi...@git.apache.org> on 2017/02/17 22:47:11 UTC

[GitHub] flink pull request #3353: Multithreaded `DataSet#flatMap` function

GitHub user mohamagdy opened a pull request:

    https://github.com/apache/flink/pull/3353

    Multithreaded `DataSet#flatMap` function

    # Mutli-Threaded FlatMap
    
    ## Overview
    
    The DataStream#flatMap function takes a FlatMapFunction interface that has a method named flatMap that gets called by the DataStream#flatMap.
    
    The FlatMapFunction#flatMap method takes an element (DataStream record) and do some transformation on that element for example if the element is a string, the flatMap function can implement the logic for splitting the element by space or converting to upper case.
    
    The current implementation of the DataStream#flatMap uses a single thread to transform the element from one form to another. The idea of this change is to introduce a new API method the DataStream#flatMap that takes the parallelism value for transforming the input elements.
    
    ## Implementation Details
    
    The following diagram shows the multithreaded `flatMap` function. Assume in the following diagram the parallelism (maximum thread pool) is set to `3` (3 threads can run transformations on the input element)
    
    Briefly, when the `flatMap` function receives an element it pushes it to a _buffer_ then spawns a thread per element to do the transformation then write back to the _output_.
    
    The _output_ is thread-safe and only 1 thread can write at a time. It uses an `Object` as a lock state to the output.
    
    The _buffer_ is used to accumulate elements so that when the _snapshot_ job runs and the element transformation is not yet finished, the _buffer_ writes all its elements serialized into an _output stream_. When a _restore_ is called it deserialize the elements of the buffer and try run the transformation again because their output state was not taken into consideration when the snapshot job ran.
    
    ![multithreaded flatmap](https://cloud.githubusercontent.com/assets/1228432/23085859/699a2926-f56a-11e6-9146-1be213caaaa7.png)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mohamagdy/flink parallel-dataset-flatmap

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3353.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3353
    
----
commit bf2c710fe8af2be3475d66221f9e6b8e2090bbfc
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-12T21:35:06Z

    [FLINK-XXXX] Fix JavaDoc class name

commit bbee6c580815190139f289d0309bfdc2f4ca83c6
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-12T21:36:09Z

    [FLINK-XXXX] Override `close()` method
    
    In order to introduce threads for processing `flatMap` elements,
    the `close` method in `StreamFlatMap` will be overriden to shutdown
    the thread pool.

commit e0aff5e241aff97426d8c10e84ba5a932668d815
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-13T20:09:15Z

    [FLINK-XXXX] Organize imports in test files
    
    Ran Intellij organize imports

commit 73803bc8fc89f48691b4c0270b0061377a3e7e38
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-14T15:08:30Z

    [FLINK-XXXX] Fix typo in tests

commit b9343317eb35ef41aba82c70648dc8fd8767273e
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-14T22:21:46Z

    [FLINK-XXXX] Call `close` after `processElement` in tests
    
    In order to follow the flow described in `StreamOperator` interface for the `close` method
    which says the following:
    
    ```
    This method is called after all records have been added to the operators via the methods
    {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
    {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
    {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
    ```

commit fd0c28adceca9fbe5fc9e3ffd93679495edca02b
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-14T23:46:01Z

    [FLINK-XXXX] Add tests for multi-threaded `DataStream` `flatMap`
    
    Tweak tests to check results of `flatMap` when processing elements of `DataStream`
    in multiple threads.

commit cbe2fa8059272d8cb49e50056466564d6d8c322d
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-14T23:50:45Z

    [FLINK-XXXX] Add multiple threads for processing `flatMap` elements
    
    Add an option to the `DataStream` `flatMap` function that sets the parallelizm
    of processing the `DataStream` elements. This helps in cases when processing elements
    of the `DataStream` blocks the main thread in favour of other elements.
    
    The main thread is blocked until all the elements are processed and all the threads
    finish.

commit 357a72d0bc5cca497eb80483ca32a8d958dfdbb5
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-15T16:44:46Z

    [FLINK-XXXX] Create a new `TimestampedCollector` for each thread
    
    In order to have a collector per thread instead of a single collector per thread.
    Though the `TimestampedCollector` uses the same `Output`.

commit 43cf265582ef80e5437da1eaaa3a74113f0cbda2
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-15T21:19:28Z

    [FLINK-XXXX] Add checkpoint and restore to `StreamFlatMap`
    
    The checkpoint and restore methods are overriden in the `StreamFlatMap`.

commit 532b4a030d8ea820f7630c8b6c96e446b09f81c3
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-15T23:07:42Z

    [FLINK-XXXX] Implement `InputTypeConfigurable` to enable checkpoints for multithreaded `flatMap`

commit 2cca78cc800ccdcf5af4c01626012a1b485b03e4
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-16T10:01:16Z

    [FLINK-XXXX] Prevent multiple threads from writing into same `output` at the same time
    
    In order to prevent race conditions writing into the same `output` of the `flatMap`
    at the same time, added a lock surrodinging the call to write into the `output` so that only
    1 thread can write to the `output` at the same time.

commit 8bd5d3487d3420fffe3f79096172611bd3d12473
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-16T10:36:17Z

    [FLINK-XXXX] Move mutli-threaded `streamFlatMap` to new class
    
    In order to organize and separate the logic of the single and multi-threaded
    `StreamFlatMap` class, it is cleaner to seprate each in a different class
    as the logic of the multi-threaded `StreamFlatMap` started to devaite from
    the logic of the existing single threaded `StreamFlatMap`.

commit b24046bf94f1e1301d463021a598ed2a8afb4f21
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-16T13:29:09Z

    [FLINK-XXXX] Fix `transformation` message for multithreaded `flatMap`

commit 96aaaf8b3f6a477bf0ad4d3b4d4025b0823da6f5
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-16T18:49:09Z

    [FLINK-XXXX] Use shared lock for `flatMap` output
    
    Before this change it was a lock per object created
    for the `MultiThreadedTimestampedCollector` object which does not guarantee
    only 1 thread writes into the output at a time.

commit b310aeb6fc02a49709fb789d9e73b4c09b321f3b
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-16T19:33:56Z

    [FLINK-XXXX] Fix `parallelism` check to be > 0
    
    Instead of >= 0 case, it does not make sense for the multi-threaded
    `flatMap` to accept 0 parallelism value.

commit e52f6496d1e2d9a2bddacfd5848a51633a4a5576
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-16T23:00:17Z

    [FLINK-XXXX] Set `lock` object to be `transient`

commit 952d901e30a4d2a43fe5031e51a4c3e58cf279b6
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-17T17:43:09Z

    [FLINK-XXXX] Add logic for snapshot and notify of snapshot complete
    
    In order to make the multi-threaded `flatMap` fault tolerant, the snapshot
    and notify on snapshot completion logic is implemented.

commit e30be1f41cdda5a83909cc99cdf8fbdd47ec2cbb
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-17T17:45:24Z

    [FLINK-XXXX] Remove unused variable

commit fd3f08c8936071d8f1e12468995640c2f03fa972
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-17T18:09:52Z

    [FLINK-XXXX] Replace the threads invocation to be right away instead of in `close` method
    
    Before this change, the `flatMap` elements threads used to run all at once with `invokeAll`
    with a list of tasks that got populated once element is received which is not applicable
    as the this will cause issues for snapshots because the elements will not be processed
    unless the `close` method is called.

commit 1f8365cd520c1e7ffb9b3467d595e1dd1e85b6c2
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-17T19:05:41Z

    [FLINK-XXXX] Use listner executor to handle buffer queue
    
    To handle the buffer queue, used a listner executore with callbacks
    `onSuccess` and `onFailure` to handle the buffer queue (remove records
    from buffer when the thread succeeds)

commit c74d465e89899ab1c825c13f2940dba17b895959
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-17T19:43:09Z

    [FLINK-XXXX] Use same collector for all threads
    
    No need to create a new collector per thread, only one collector is enough.
    The collector's output is synchronized and thread safe.

commit f054bf2539e8f93311c5feba66731bddd06a200c
Author: Mohamed Magdy <mo...@fyber.com>
Date:   2017-02-17T20:40:00Z

    [FLINK-XXXX] Add snapshot/recover object serialization
    
    Add the logic for snapshot capturing by serializing the elements in
    the buffer and deserializing it in the restore process.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3353: Multithreaded `DataSet#flatMap` function

Posted by mohamagdy <gi...@git.apache.org>.
Github user mohamagdy closed the pull request at:

    https://github.com/apache/flink/pull/3353


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---