You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/18 08:50:00 UTC

[jira] [Commented] (FLINK-8483) Implement and expose outer joins

    [ https://issues.apache.org/jira/browse/FLINK-8483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654883#comment-16654883 ] 

ASF GitHub Bot commented on FLINK-8483:
---------------------------------------

florianschmidt1994 opened a new pull request #6874: [FLINK-8483][DataStream] Implement and expose outer joins
URL: https://github.com/apache/flink/pull/6874
 
 
   ## What is the purpose of the change
   *This depends on FLINK-8482*
    
   This PR adds leftOuter, rightOuter and fullOuter joins to the
   IntervalJoin in the DataStream API.
   
   The usage is as follows:
   
   ```java 
   leftKeyedStream.intervalJoin(rightKeyedStream)
       .between(<Time>,<Time>)
       .process(new ProcessJoinFunction() { ... })
   ```
   
   Note that some combinations of outer joins and timestamp strategies are
   not valid. An example of this would be using TimestampStrategy.RIGHT
   with JoinType.LEFT_OUTER.
   
   Internally the outer joins are implemented by
   
   1.) For each incoming element, a timer is registered with the timestamp
   at which the element can safely be removed from the buffer, because it
   will never be joined anymore (as calculated by the watermark and
   join boundaries). This timer is bound to the namespace with represents
   the side to which the element and its buffer belong to.
   
   2.) Each element that is added to the buffer
   has a flag that indicates whether this element has been joined yet.
   Whenever elements are joined, this flag will be set.  3.) When a timer
   fires, elements from the respective buffer (as indicated by the timers
   namespace) will be removed from the buffer and emitted, if they have not
   been joined yet.
   
   This approach means that the number of timers is roughly the same as the
   number of elements (a little less if many elements have the same
   timestamp). This is a tradeoff for only needing to access on bucket in
   the buffer for each side on cleanup, versus needing to iterate over all
   entries each time.
   
   ## Brief change log
   
   This feature is implemented and tested by
   - adding the implementation of outer joins to the IntervalJoin Operator
   - adding the implementation of outer joins to the DataStream Java API
   - adding the implementation of outer joins to the DataStream Scala API
   - adding unit tests in the IntervalJoinOperatorTest
   - adding IT tests in the IntervalJoinITTest
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - adding unit tests in the IntervalJoinOperatorTest
   - adding IT tests in the IntervalJoinITTest
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): yes
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Implement and expose outer joins
> --------------------------------
>
>                 Key: FLINK-8483
>                 URL: https://issues.apache.org/jira/browse/FLINK-8483
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Florian Schmidt
>            Assignee: Florian Schmidt
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)