You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Johann Kovacs (JIRA)" <ji...@apache.org> on 2015/06/17 17:03:00 UTC

[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm

    [ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14589892#comment-14589892 ] 

Johann Kovacs commented on FLINK-2105:
--------------------------------------

@[~aalexandrov] and watchers: Based on our understanding of flink internals and [~fhueske]'s hints we decided for now to implement the outer join operator as a special case of the sort merge inner join, as implemented in {{MatchDriver}} and the mentioned iterator classes.
During discussion, the question came up of how and where to handle, or protect against, cases where the partitioning of the inputs is invalid for the selected operator and operator strategy (please see examples below). I believe the broader question would be how the optimizer works internally and how to extend it, which may be answered by completing [these|https://cwiki.apache.org/confluence/display/FLINK/Adding+a+new+Operator+Step-by-step] [wiki|https://cwiki.apache.org/confluence/display/FLINK/Optimizer+Internals] pages (hint hint ;)). 

Our assumptions and conclusions so far follow below. Please let us know if any of those are wrong:

* In the case where one input side is broadcast and you try to perform a sort-merge outer join on the _same side that was broadcast_ (or a full outer join), you don't know whether to emit (x, null) or whether there is maybe a matching key x on the right side on some other node.
* Similarly, the same problem occurs if you were to perform a sort-merge based cogroup, with one side being broadcast.
* Is there even a case in the current implementation where you would broadcast one side and perform a sort-merge join as opposed to a hash join with the broadcast side as the build side? This scenario wouldn't make a lot of sense IMO; I don't think we found the option for that in the source code either. (E.g. there is no {{BROADCAST_SORT_MERGE}} in the {{JoinHint}} enum.)
* Similarly, co group is only implemented as a sort-merge strategy, which knows nothing about the actual partitioning of the data, thus it will only work for both sides repartitioned.

Because of that we assume that:

* there must be some component in the flink runtime which decides which partitioning makes sense for which operator and operator strategy. For example, if the optimizer chooses the left side shall be broadcast, then the {{MatchDriver}} should perform a {{HYBRIDHASH_BUILD_FIRST}} join, and so on
* Similarly, if it encounters a CoGroup operator, both sides will need to be repartitioned
* We believe this is somehow part of the optimizer/dag package, but would appreciate a hint where to look for this

Because of _this_:

* keeping track of which side was broadcast, repartitioned, sorted or grouped doesn't appear to be the responsibility of the Driver implementation or, in the case of the {{MatchDriver}}, the iterator implementations that perform the sort-merge or hash joins, correct?
* Similarly, for example, the {{BuildFirst-}} and {{BuildSecondHashMatchIterators}} don't actually check which side was broadcast and which was repartitioned. Apparently it just assumes the optimizer did its job correctly in constructing the data flow graph and stupidly does as is told. Same for the CoGroup driver and iterator.
* I would assume we can make the same assumption for our outer join implementation?

Thanks

> Implement Sort-Merge Outer Join algorithm
> -----------------------------------------
>
>                 Key: FLINK-2105
>                 URL: https://issues.apache.org/jira/browse/FLINK-2105
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Local Runtime
>            Reporter: Fabian Hueske
>            Assignee: Ricky Pogalz
>            Priority: Minor
>             Fix For: pre-apache
>
>
> Flink does not natively support outer joins at the moment. 
> This issue proposes to implement a sort-merge outer join algorithm that can cover left, right, and full outer joins.
> The implementation can be based on the regular sort-merge join iterator ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also {{MatchDriver}} class)
> The Reusing and NonReusing variants differ in whether object instances are reused or new objects are created. I would start with the NonReusing variant which is safer from a user's point of view and should also be easier to implement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)