You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Rakesh Kumar <ra...@gmail.com> on 2018/12/06 11:09:58 UTC

delay one of the datastream when performing join operation on event-time and watermark

Hi,
I have two data sources one is  for order data and another one is for
invoice data, these two data i am pushing into kafka topic in json form. I
wanted to delay order data for 5 mins because invoice data comes only after
order data is generated. So, for that i have written a flink program which
will take these two data from kafka and apply watermarks and delay order
data for 5 mins. After applying watermarks on these data, i wanted to join
these data based on order_id which is present in both order and invoice
data. After Joining i wanted to push it to kafka in different topic.

But, i am not able to join these data streams with 5 min delay and i am not
able to figure it out.

I am attaching my flink program below and it's dependency.

Re: delay one of the datastream when performing join operation on event-time and watermark

Posted by jincheng sun <su...@gmail.com>.
Hi Pakesh Kuma,
I think you can using the interval-join, e.g.:

orderStream
    .keyBy(<KeySelector>)
    .intervalJoin(invoiceStream.keyBy(<KeySelector>))
    .between(Time.minutes(-5), Time.minutes(5))

The semantics of interval-join and detailed usage description can refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-join

Hope to help you, and any feedback is welcome!

Bests,
Jincheng


Rakesh Kumar <ra...@gmail.com> 于2018年12月6日周四 下午7:10写道:

> Hi,
> I have two data sources one is  for order data and another one is for
> invoice data, these two data i am pushing into kafka topic in json form. I
> wanted to delay order data for 5 mins because invoice data comes only after
> order data is generated. So, for that i have written a flink program which
> will take these two data from kafka and apply watermarks and delay order
> data for 5 mins. After applying watermarks on these data, i wanted to join
> these data based on order_id which is present in both order and invoice
> data. After Joining i wanted to push it to kafka in different topic.
>
> But, i am not able to join these data streams with 5 min delay and i am
> not able to figure it out.
>
> I am attaching my flink program below and it's dependency.
>

Re: delay one of the datastream when performing join operation on event-time and watermark

Posted by jincheng sun <su...@gmail.com>.
Hi Pakesh Kuma,
I think you can using the interval-join, e.g.:

orderStream
    .keyBy(<KeySelector>)
    .intervalJoin(invoiceStream.keyBy(<KeySelector>))
    .between(Time.minutes(-5), Time.minutes(5))

The semantics of interval-join and detailed usage description can refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-join

Hope to help you, and any feedback is welcome!

Bests,
Jincheng


Rakesh Kumar <ra...@gmail.com> 于2018年12月6日周四 下午7:10写道:

> Hi,
> I have two data sources one is  for order data and another one is for
> invoice data, these two data i am pushing into kafka topic in json form. I
> wanted to delay order data for 5 mins because invoice data comes only after
> order data is generated. So, for that i have written a flink program which
> will take these two data from kafka and apply watermarks and delay order
> data for 5 mins. After applying watermarks on these data, i wanted to join
> these data based on order_id which is present in both order and invoice
> data. After Joining i wanted to push it to kafka in different topic.
>
> But, i am not able to join these data streams with 5 min delay and i am
> not able to figure it out.
>
> I am attaching my flink program below and it's dependency.
>