You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/02/02 21:17:51 UTC

[jira] [Commented] (FLINK-2883) Add documentation to forbid key-modifying ReduceFunction

    [ https://issues.apache.org/jira/browse/FLINK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15850521#comment-15850521 ] 

ASF GitHub Bot commented on FLINK-2883:
---------------------------------------

GitHub user greghogan opened a pull request:

    https://github.com/apache/flink/pull/3256

    [FLINK-2883] [docs] Add documentation to forbid key-modifying ReduceFunction

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/greghogan/flink 2883_add_documentation_to_forbid_keymodifying_reducefunction

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3256.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3256
    
----
commit 2a858819e2163a4c935521670d766ebaaba5b99d
Author: Greg Hogan <co...@greghogan.com>
Date:   2017-02-02T21:15:52Z

    [FLINK-2883] [docs] Add documentation to forbid key-modifying ReduceFunction

----


> Add documentation to forbid key-modifying ReduceFunction
> --------------------------------------------------------
>
>                 Key: FLINK-2883
>                 URL: https://issues.apache.org/jira/browse/FLINK-2883
>             Project: Flink
>          Issue Type: Task
>          Components: DataStream API, Documentation
>    Affects Versions: 0.10.0
>            Reporter: Till Rohrmann
>            Assignee: Greg Hogan
>
> If one uses a combinable reduce operation which also changes the key value of the underlying data element, then the results of the reduce operation can become wrong. The reason is that after the combine phase, another reduce operator is executed which will then reduce the elements based on the new key values. This might be not so surprising if one explicitly defined ones {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} conceals the fact that a combiner is used implicitly. Furthermore, the API does not prevent the user from changing the key fields which could solve the problem.
> The following example program illustrates the problem
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(1)
> val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))
> val result = input.groupBy(0).reduce{
>   (left, right) =>
>     (left._1 + right._1, left._2 + right._2)
> }
> result.output(new PrintingOutputFormat[Int]())
> env.execute()
> {code}
> The expected output is 
> {code}
> (2, 5)
> (2, 3)
> (6, 7)
> {code}
> However, the actual output is
> {code}
> (4, 8)
> (6, 7)
> {code}
> I think that the underlying problem is that associativity and commutativity is not sufficient for a combinable reduce operation. Additionally we also need to make sure that the key stays the same.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)