You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Florian Schmidt <fl...@data-artisans.com> on 2018/08/13 11:18:18 UTC

[Discuss] Outer join support and timestamp assignment for IntervalJoin

Hello Community,

I’ve recently been working on adding support for outer joins [1] and timestamp assignment [2] to the IntervalJoin in the DataStream API.
As this is a public API and it should be simple and understandable for users I wanted to gather some feedback on some variations that I drafted up:

1. Add outer joins

Approach A

keyedStreamA.intervalJoin(keyedStreamB)
		.leftOuter() // .rightOuter, .fullOuter()
		.between(<Time>, <Time>)
		.process(new ProcessJoinFunction() { /* … */ }

Approach B

keyedStreamA.intervalLeftJoin(keyedStreamB) // intervalRightJoin, intervalFullOuterJoin
		.between(<Time>, <Time>)
		.process(new ProcessJoinFunction() { /* … */ }

Approach C

keyedStreamA.intervalJoin(keyedStreamB)
		.joinType(JoinType.INNER) // Reuse existing (internally used) JoinType


Personally I feel like C is the cleanest approach, but it has the problem that checking for invalid timestamp strategy & join combinations can only be done during runtime, whereas A and B would allow us to express valid combinations through the type system.

2. Assign timestamps to the joined pairs

When two elements are joined together, this will add support for specifying which of the elements timestamps should be assigned as the results timestamp.
The for options are MIN, MAX, LEFT and RIGHT, where MIN selects the minimum of the two elements timestamps, MAX the maximum, LEFT the left elements timestamp and RIGHT the right elements timestamp.

Approach A

keyedStreamA.intervalJoin(streamB)
		.between(<Time>, <Time>)
		.assignLeftTimestamp() // assignRightTimestamp(), assignMinTimestamp(), assignMaxTimestamp()
		.process(new ProcessJoinFunction() { /* … */ }

Approach B

keyedStreamA.intervalJoin(keyedStreamB)
		.between(<Time>, <Time>)
		.assignTimestamp(TimestampStrategy.LEFT) // .RIGHT, .MIN, .MAX

Again I feel like B is the cleanest approach, but has the same caveat with runtime vs. type system checks as the approach above. This could be especially interesting when it comes to combinations of join types and timestamp assignments, where we will have a few combinations that are not possibly. 

Any feedback would be greatly appreciated. I also updated the design doc at [3] if anyone wants to hop in on further discussions!

Florian

[1] https://issues.apache.org/jira/browse/FLINK-8483 <https://issues.apache.org/jira/browse/FLINK-8483>
[2] https://issues.apache.org/jira/browse/FLINK-8482 <https://issues.apache.org/jira/browse/FLINK-8482>
[3] https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c <https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c>


Re: [Discuss] Outer join support and timestamp assignment for IntervalJoin

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks for starting the discussion Florian.

I'm also in favor of both A options.
Option A for the outer joins is also is closest to the join syntax of the
DataSet API.

Thanks,
Fabian



2018-08-13 20:50 GMT+02:00 Elias Levy <fe...@gmail.com>:

> As a developer, while not quite a succinct, I feel that option A in both
> cases is easier to read.
>
> On Mon, Aug 13, 2018 at 4:18 AM Florian Schmidt <florian@data-artisans.com
> >
> wrote:
>
> > Hello Community,
> >
> > I’ve recently been working on adding support for outer joins [1] and
> > timestamp assignment [2] to the IntervalJoin in the DataStream API.
> > As this is a public API and it should be simple and understandable for
> > users I wanted to gather some feedback on some variations that I drafted
> up:
> >
> > 1. Add outer joins
> >
> > Approach A
> >
> > keyedStreamA.intervalJoin(keyedStreamB)
> >                 .leftOuter() // .rightOuter, .fullOuter()
> >                 .between(<Time>, <Time>)
> >                 .process(new ProcessJoinFunction() { /* … */ }
> >
> > Approach B
> >
> > keyedStreamA.intervalLeftJoin(keyedStreamB) // intervalRightJoin,
> > intervalFullOuterJoin
> >                 .between(<Time>, <Time>)
> >                 .process(new ProcessJoinFunction() { /* … */ }
> >
> > Approach C
> >
> > keyedStreamA.intervalJoin(keyedStreamB)
> >                 .joinType(JoinType.INNER) // Reuse existing (internally
> > used) JoinType
> >
> >
> > Personally I feel like C is the cleanest approach, but it has the problem
> > that checking for invalid timestamp strategy & join combinations can only
> > be done during runtime, whereas A and B would allow us to express valid
> > combinations through the type system.
> >
> > 2. Assign timestamps to the joined pairs
> >
> > When two elements are joined together, this will add support for
> > specifying which of the elements timestamps should be assigned as the
> > results timestamp.
> > The for options are MIN, MAX, LEFT and RIGHT, where MIN selects the
> > minimum of the two elements timestamps, MAX the maximum, LEFT the left
> > elements timestamp and RIGHT the right elements timestamp.
> >
> > Approach A
> >
> > keyedStreamA.intervalJoin(streamB)
> >                 .between(<Time>, <Time>)
> >                 .assignLeftTimestamp() // assignRightTimestamp(),
> > assignMinTimestamp(), assignMaxTimestamp()
> >                 .process(new ProcessJoinFunction() { /* … */ }
> >
> > Approach B
> >
> > keyedStreamA.intervalJoin(keyedStreamB)
> >                 .between(<Time>, <Time>)
> >                 .assignTimestamp(TimestampStrategy.LEFT) // .RIGHT,
> .MIN,
> > .MAX
> >
> > Again I feel like B is the cleanest approach, but has the same caveat
> with
> > runtime vs. type system checks as the approach above. This could be
> > especially interesting when it comes to combinations of join types and
> > timestamp assignments, where we will have a few combinations that are not
> > possibly.
> >
> > Any feedback would be greatly appreciated. I also updated the design doc
> > at [3] if anyone wants to hop in on further discussions!
> >
> > Florian
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-8483 <
> > https://issues.apache.org/jira/browse/FLINK-8483>
> > [2] https://issues.apache.org/jira/browse/FLINK-8482 <
> > https://issues.apache.org/jira/browse/FLINK-8482>
> > [3]
> > https://docs.google.com/document/d/16GMH5VM6JJiWj_
> N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
> > <
> > https://docs.google.com/document/d/16GMH5VM6JJiWj_
> N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
> > >
> >
> >
>

Re: [Discuss] Outer join support and timestamp assignment for IntervalJoin

Posted by Elias Levy <fe...@gmail.com>.
As a developer, while not quite a succinct, I feel that option A in both
cases is easier to read.

On Mon, Aug 13, 2018 at 4:18 AM Florian Schmidt <fl...@data-artisans.com>
wrote:

> Hello Community,
>
> I’ve recently been working on adding support for outer joins [1] and
> timestamp assignment [2] to the IntervalJoin in the DataStream API.
> As this is a public API and it should be simple and understandable for
> users I wanted to gather some feedback on some variations that I drafted up:
>
> 1. Add outer joins
>
> Approach A
>
> keyedStreamA.intervalJoin(keyedStreamB)
>                 .leftOuter() // .rightOuter, .fullOuter()
>                 .between(<Time>, <Time>)
>                 .process(new ProcessJoinFunction() { /* … */ }
>
> Approach B
>
> keyedStreamA.intervalLeftJoin(keyedStreamB) // intervalRightJoin,
> intervalFullOuterJoin
>                 .between(<Time>, <Time>)
>                 .process(new ProcessJoinFunction() { /* … */ }
>
> Approach C
>
> keyedStreamA.intervalJoin(keyedStreamB)
>                 .joinType(JoinType.INNER) // Reuse existing (internally
> used) JoinType
>
>
> Personally I feel like C is the cleanest approach, but it has the problem
> that checking for invalid timestamp strategy & join combinations can only
> be done during runtime, whereas A and B would allow us to express valid
> combinations through the type system.
>
> 2. Assign timestamps to the joined pairs
>
> When two elements are joined together, this will add support for
> specifying which of the elements timestamps should be assigned as the
> results timestamp.
> The for options are MIN, MAX, LEFT and RIGHT, where MIN selects the
> minimum of the two elements timestamps, MAX the maximum, LEFT the left
> elements timestamp and RIGHT the right elements timestamp.
>
> Approach A
>
> keyedStreamA.intervalJoin(streamB)
>                 .between(<Time>, <Time>)
>                 .assignLeftTimestamp() // assignRightTimestamp(),
> assignMinTimestamp(), assignMaxTimestamp()
>                 .process(new ProcessJoinFunction() { /* … */ }
>
> Approach B
>
> keyedStreamA.intervalJoin(keyedStreamB)
>                 .between(<Time>, <Time>)
>                 .assignTimestamp(TimestampStrategy.LEFT) // .RIGHT, .MIN,
> .MAX
>
> Again I feel like B is the cleanest approach, but has the same caveat with
> runtime vs. type system checks as the approach above. This could be
> especially interesting when it comes to combinations of join types and
> timestamp assignments, where we will have a few combinations that are not
> possibly.
>
> Any feedback would be greatly appreciated. I also updated the design doc
> at [3] if anyone wants to hop in on further discussions!
>
> Florian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8483 <
> https://issues.apache.org/jira/browse/FLINK-8483>
> [2] https://issues.apache.org/jira/browse/FLINK-8482 <
> https://issues.apache.org/jira/browse/FLINK-8482>
> [3]
> https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
> <
> https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
> >
>
>