You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by srikanth flink <fl...@gmail.com> on 2019/09/23 11:04:39 UTC

Approach to match join streams to create unique streams.

 Hi there,

I've two streams source Kafka. Stream1 is a continuous data and stream2 is
a periodic update. Stream2 contains only one column.

*Use case*: Every entry from stream1 should verify if the stream2 has any
match.
The matched and unmatched records should be separated into new unique
streams. For example: column1, column10 from stream1 match/unmatch check on
stream2 column to put to a new stream safeStream and unSafeStream
respectively.

*Implemented solution*: stream2 as temporal function to join over stream1
which is a dynamic table.

   - Ran a time based query where stream1.column1 = stream2.column and
   stream1.column10 = stream2.column ; Working


   - Ran a time based query where stream1.column1 <> stream1.column and
   tream1.column10 <> stream1.column ; Not working.

Would like to ask if there's a possibility that I could load the stream as
a list so I could do a *contains*? OR any other approach?

Help appreciated.

Thanks
Srikanth

Re: Approach to match join streams to create unique streams.

Posted by srikanth flink <fl...@gmail.com>.
Fabian,

Thanks, already implemented the left join.

Srikanth

On Tue, Sep 24, 2019 at 2:12 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> AFAIK, Flink SQL Temporal table function joins are only supported as inner
> equality joins.
> An extension to left outer joins would be great, but is not on the
> immediate roadmap AFAIK.
>
> If you need the inverse, I'd recommend to implement the logic in a
> DataStream program with a KeyedCoProcessFunction.
>
> Best, Fabian
>
> Am Mo., 23. Sept. 2019 um 13:04 Uhr schrieb srikanth flink <
> flink.devv@gmail.com>:
>
>>  Hi there,
>>
>> I've two streams source Kafka. Stream1 is a continuous data and stream2
>> is a periodic update. Stream2 contains only one column.
>>
>> *Use case*: Every entry from stream1 should verify if the stream2 has
>> any match.
>> The matched and unmatched records should be separated into new unique
>> streams. For example: column1, column10 from stream1 match/unmatch check on
>> stream2 column to put to a new stream safeStream and unSafeStream
>> respectively.
>>
>> *Implemented solution*: stream2 as temporal function to join over
>> stream1 which is a dynamic table.
>>
>>    - Ran a time based query where stream1.column1 = stream2.column and
>>    stream1.column10 = stream2.column ; Working
>>
>>
>>    - Ran a time based query where stream1.column1 <> stream1.column and
>>    tream1.column10 <> stream1.column ; Not working.
>>
>> Would like to ask if there's a possibility that I could load the stream
>> as a list so I could do a *contains*? OR any other approach?
>>
>> Help appreciated.
>>
>> Thanks
>> Srikanth
>>
>>

Re: Approach to match join streams to create unique streams.

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

AFAIK, Flink SQL Temporal table function joins are only supported as inner
equality joins.
An extension to left outer joins would be great, but is not on the
immediate roadmap AFAIK.

If you need the inverse, I'd recommend to implement the logic in a
DataStream program with a KeyedCoProcessFunction.

Best, Fabian

Am Mo., 23. Sept. 2019 um 13:04 Uhr schrieb srikanth flink <
flink.devv@gmail.com>:

>  Hi there,
>
> I've two streams source Kafka. Stream1 is a continuous data and stream2 is
> a periodic update. Stream2 contains only one column.
>
> *Use case*: Every entry from stream1 should verify if the stream2 has any
> match.
> The matched and unmatched records should be separated into new unique
> streams. For example: column1, column10 from stream1 match/unmatch check on
> stream2 column to put to a new stream safeStream and unSafeStream
> respectively.
>
> *Implemented solution*: stream2 as temporal function to join over stream1
> which is a dynamic table.
>
>    - Ran a time based query where stream1.column1 = stream2.column and
>    stream1.column10 = stream2.column ; Working
>
>
>    - Ran a time based query where stream1.column1 <> stream1.column and
>    tream1.column10 <> stream1.column ; Not working.
>
> Would like to ask if there's a possibility that I could load the stream as
> a list so I could do a *contains*? OR any other approach?
>
> Help appreciated.
>
> Thanks
> Srikanth
>
>