You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jaswin Shah <ja...@outlook.com> on 2020/05/11 16:03:43 UTC

Not able to implement an usecase

Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process function catching up the discrepant results on joining. Joining is done on key orderId. Now, I want to identify all the messages in both datastreams which are not joined. Means, for a message in left stream if I do not find any message in right stream over the interval defined, then, that message should be caught and same for right stream if there are messages which do not have corresponding messages in left streams then, catch them.Need an help how can I achieve the use case. I know this can be done with outer join but interval join or tumbling event time window joins only support inner join as per my knowledge. I do not want to use table/sql api here but want to work on this datastream apis only.

Currently I am using this which is working for 90 % of the cases but 10 % of the cases where large large delay can happen and messages in left or right streams are missing are not getting supported with my this implementaions:

/**
 * Join cart and pg streams on mid and orderId, and the interval specified.
 *
 * @param leftStream
 * @param rightStream
 * @return
 */
public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
    //Descripant results are sent to kafka from CartPGProcessFunction.
    return leftStream
        .keyBy(new CartJoinColumnsSelector())
        .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
        .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
        .process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the datastreams I am reading from kafka to hive which I want to batch process with Flink

Please help me on resolving this use cases.

Thanks,
Jaswin


Get Outlook for Android<https://aka.ms/ghei36>

Re: Not able to implement an usecase

Posted by Rui Li <li...@apache.org>.
The hive table sink is only for batch processing in Flink 1.10. There're
some on-going efforts to support writing streams to hive and we intend to
make it available in 1.11. Stay tuned :)

On Tue, May 12, 2020 at 3:52 PM Khachatryan Roman <
khachatryan.roman@gmail.com> wrote:

> AFAIK, yes, you can write streams.
>
> I'm pulling in Jingsong Li and Rui Li as they might know better.
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 10:21 PM Jaswin Shah <ja...@outlook.com>
> wrote:
>
>> If I go with table apis, can I write the streams to hive or it is only
>> for batch processing as of now.
>>
>> Get Outlook for Android <https://aka.ms/ghei36>
>>
>> ------------------------------
>> *From:* Khachatryan Roman <kh...@gmail.com>
>> *Sent:* Tuesday, May 12, 2020 1:49:10 AM
>> *To:* Jaswin Shah <ja...@outlook.com>
>> *Cc:* user@flink.apache.org <us...@flink.apache.org>
>> *Subject:* Re: Not able to implement an usecase
>>
>> Hi Jaswin,
>>
>> Currently, DataStream API doesn't support outer joins.
>> As a workaround, you can use coGroup function [1].
>>
>> Hive is also not supported by DataStream API though it's supported by
>> Table API [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <ja...@outlook.com>
>> wrote:
>>
>> Hi,
>> I want to implement the below use case in my application:
>> I am doing an interval join between two data streams and then, in process
>> function catching up the discrepant results on joining. Joining is done on
>> key orderId. Now, I want to identify all the messages in both datastreams
>> which are not joined. Means, for a message in left stream if I do not
>> find any message in right stream over the interval defined, then, that
>> message should be caught and same for right stream if there are messages
>> which do not have corresponding messages in left streams then, catch
>> them.Need an help how can I achieve the use case. I know this can be
>> done with outer join but interval join or tumbling event time window joins
>> only support inner join as per my knowledge. I do not want to use table/sql
>> api here but want to work on this datastream apis only.
>>
>> Currently I am using this which is working for 90 % of the cases but 10 %
>> of the cases where large large delay can happen and messages in left or
>> right streams are missing are not getting supported with my this
>> implementaions:
>>
>> /**
>>  * Join cart and pg streams on mid and orderId, and the interval specified.
>>  *
>>  * @param leftStream
>>  * @param rightStream
>>  * @return
>>  */
>> public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
>>     //Descripant results are sent to kafka from CartPGProcessFunction.
>>     return leftStream
>>         .keyBy(new CartJoinColumnsSelector())
>>         .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
>>         .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
>>         .process(new CartPGProcessFunction());
>>
>> }
>>
>>
>>
>> Secondly, I am unable to find the streaming support to stream out the
>> datastreams I am reading from kafka to hive which I want to batch process
>> with Flink
>>
>> Please help me on resolving this use cases.
>>
>> Thanks,
>> Jaswin
>>
>>
>> Get Outlook for Android <https://aka.ms/ghei36>
>>
>>

-- 
Cheers,
Rui Li

Re: Not able to implement an usecase

Posted by Jingsong Li <ji...@gmail.com>.
Thanks Roman for involving me.

Hi Jaswin,

FLIP-115[1] will finish Kafka -> Hive/Filesystem. And will be released in
1.11.

We will provide two connectors in table:
- file system connector, this connector manage partitions and files by file
system paths. You can define a file system table with parquet/orc format,
this should be consistent with hive exclude hive metastore support.
- hive connector, this connector manage partitions and files by hive
metastore, support automatic adding partition to hive metastore.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee

On Tue, May 12, 2020 at 3:52 PM Khachatryan Roman <
khachatryan.roman@gmail.com> wrote:

> AFAIK, yes, you can write streams.
>
> I'm pulling in Jingsong Li and Rui Li as they might know better.
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 10:21 PM Jaswin Shah <ja...@outlook.com>
> wrote:
>
>> If I go with table apis, can I write the streams to hive or it is only
>> for batch processing as of now.
>>
>> Get Outlook for Android <https://aka.ms/ghei36>
>>
>> ------------------------------
>> *From:* Khachatryan Roman <kh...@gmail.com>
>> *Sent:* Tuesday, May 12, 2020 1:49:10 AM
>> *To:* Jaswin Shah <ja...@outlook.com>
>> *Cc:* user@flink.apache.org <us...@flink.apache.org>
>> *Subject:* Re: Not able to implement an usecase
>>
>> Hi Jaswin,
>>
>> Currently, DataStream API doesn't support outer joins.
>> As a workaround, you can use coGroup function [1].
>>
>> Hive is also not supported by DataStream API though it's supported by
>> Table API [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <ja...@outlook.com>
>> wrote:
>>
>> Hi,
>> I want to implement the below use case in my application:
>> I am doing an interval join between two data streams and then, in process
>> function catching up the discrepant results on joining. Joining is done on
>> key orderId. Now, I want to identify all the messages in both datastreams
>> which are not joined. Means, for a message in left stream if I do not
>> find any message in right stream over the interval defined, then, that
>> message should be caught and same for right stream if there are messages
>> which do not have corresponding messages in left streams then, catch
>> them.Need an help how can I achieve the use case. I know this can be
>> done with outer join but interval join or tumbling event time window joins
>> only support inner join as per my knowledge. I do not want to use table/sql
>> api here but want to work on this datastream apis only.
>>
>> Currently I am using this which is working for 90 % of the cases but 10 %
>> of the cases where large large delay can happen and messages in left or
>> right streams are missing are not getting supported with my this
>> implementaions:
>>
>> /**
>>  * Join cart and pg streams on mid and orderId, and the interval specified.
>>  *
>>  * @param leftStream
>>  * @param rightStream
>>  * @return
>>  */
>> public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
>>     //Descripant results are sent to kafka from CartPGProcessFunction.
>>     return leftStream
>>         .keyBy(new CartJoinColumnsSelector())
>>         .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
>>         .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
>>         .process(new CartPGProcessFunction());
>>
>> }
>>
>>
>>
>> Secondly, I am unable to find the streaming support to stream out the
>> datastreams I am reading from kafka to hive which I want to batch process
>> with Flink
>>
>> Please help me on resolving this use cases.
>>
>> Thanks,
>> Jaswin
>>
>>
>> Get Outlook for Android <https://aka.ms/ghei36>
>>
>>

-- 
Best, Jingsong Lee

Re: Not able to implement an usecase

Posted by Khachatryan Roman <kh...@gmail.com>.
AFAIK, yes, you can write streams.

I'm pulling in Jingsong Li and Rui Li as they might know better.

Regards,
Roman


On Mon, May 11, 2020 at 10:21 PM Jaswin Shah <ja...@outlook.com>
wrote:

> If I go with table apis, can I write the streams to hive or it is only for
> batch processing as of now.
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
> ------------------------------
> *From:* Khachatryan Roman <kh...@gmail.com>
> *Sent:* Tuesday, May 12, 2020 1:49:10 AM
> *To:* Jaswin Shah <ja...@outlook.com>
> *Cc:* user@flink.apache.org <us...@flink.apache.org>
> *Subject:* Re: Not able to implement an usecase
>
> Hi Jaswin,
>
> Currently, DataStream API doesn't support outer joins.
> As a workaround, you can use coGroup function [1].
>
> Hive is also not supported by DataStream API though it's supported by
> Table API [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html
>
> Regards,
> Roman
>
>
> On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <ja...@outlook.com>
> wrote:
>
> Hi,
> I want to implement the below use case in my application:
> I am doing an interval join between two data streams and then, in process
> function catching up the discrepant results on joining. Joining is done on
> key orderId. Now, I want to identify all the messages in both datastreams
> which are not joined. Means, for a message in left stream if I do not
> find any message in right stream over the interval defined, then, that
> message should be caught and same for right stream if there are messages
> which do not have corresponding messages in left streams then, catch
> them.Need an help how can I achieve the use case. I know this can be done
> with outer join but interval join or tumbling event time window joins only
> support inner join as per my knowledge. I do not want to use table/sql api
> here but want to work on this datastream apis only.
>
> Currently I am using this which is working for 90 % of the cases but 10 %
> of the cases where large large delay can happen and messages in left or
> right streams are missing are not getting supported with my this
> implementaions:
>
> /**
>  * Join cart and pg streams on mid and orderId, and the interval specified.
>  *
>  * @param leftStream
>  * @param rightStream
>  * @return
>  */
> public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
>     //Descripant results are sent to kafka from CartPGProcessFunction.
>     return leftStream
>         .keyBy(new CartJoinColumnsSelector())
>         .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
>         .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
>         .process(new CartPGProcessFunction());
>
> }
>
>
>
> Secondly, I am unable to find the streaming support to stream out the
> datastreams I am reading from kafka to hive which I want to batch process
> with Flink
>
> Please help me on resolving this use cases.
>
> Thanks,
> Jaswin
>
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
>

Re: Not able to implement an usecase

Posted by Jaswin Shah <ja...@outlook.com>.
If I go with table apis, can I write the streams to hive or it is only for batch processing as of now.

Get Outlook for Android<https://aka.ms/ghei36>

________________________________
From: Khachatryan Roman <kh...@gmail.com>
Sent: Tuesday, May 12, 2020 1:49:10 AM
To: Jaswin Shah <ja...@outlook.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Not able to implement an usecase

Hi Jaswin,

Currently, DataStream API doesn't support outer joins.
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported by Table API [2].

[1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html

Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <ja...@outlook.com>> wrote:
Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process function catching up the discrepant results on joining. Joining is done on key orderId. Now, I want to identify all the messages in both datastreams which are not joined. Means, for a message in left stream if I do not find any message in right stream over the interval defined, then, that message should be caught and same for right stream if there are messages which do not have corresponding messages in left streams then, catch them.Need an help how can I achieve the use case. I know this can be done with outer join but interval join or tumbling event time window joins only support inner join as per my knowledge. I do not want to use table/sql api here but want to work on this datastream apis only.

Currently I am using this which is working for 90 % of the cases but 10 % of the cases where large large delay can happen and messages in left or right streams are missing are not getting supported with my this implementaions:

/**
 * Join cart and pg streams on mid and orderId, and the interval specified.
 *
 * @param leftStream
 * @param rightStream
 * @return
 */
public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
    //Descripant results are sent to kafka from CartPGProcessFunction.
    return leftStream
        .keyBy(new CartJoinColumnsSelector())
        .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
        .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
        .process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the datastreams I am reading from kafka to hive which I want to batch process with Flink

Please help me on resolving this use cases.

Thanks,
Jaswin


Get Outlook for Android<https://aka.ms/ghei36>

Re: Not able to implement an usecase

Posted by Khachatryan Roman <kh...@gmail.com>.
Hi Jaswin,

Currently, DataStream API doesn't support outer joins.
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported by Table
API [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/CoGroupFunction.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html

Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <ja...@outlook.com> wrote:

> Hi,
> I want to implement the below use case in my application:
> I am doing an interval join between two data streams and then, in process
> function catching up the discrepant results on joining. Joining is done on
> key orderId. Now, I want to identify all the messages in both datastreams
> which are not joined. Means, for a message in left stream if I do not
> find any message in right stream over the interval defined, then, that
> message should be caught and same for right stream if there are messages
> which do not have corresponding messages in left streams then, catch
> them.Need an help how can I achieve the use case. I know this can be done
> with outer join but interval join or tumbling event time window joins only
> support inner join as per my knowledge. I do not want to use table/sql api
> here but want to work on this datastream apis only.
>
> Currently I am using this which is working for 90 % of the cases but 10 %
> of the cases where large large delay can happen and messages in left or
> right streams are missing are not getting supported with my this
> implementaions:
>
> /**
>  * Join cart and pg streams on mid and orderId, and the interval specified.
>  *
>  * @param leftStream
>  * @param rightStream
>  * @return
>  */
> public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
>     //Descripant results are sent to kafka from CartPGProcessFunction.
>     return leftStream
>         .keyBy(new CartJoinColumnsSelector())
>         .intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
>         .between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
>         .process(new CartPGProcessFunction());
>
> }
>
>
>
> Secondly, I am unable to find the streaming support to stream out the
> datastreams I am reading from kafka to hive which I want to batch process
> with Flink
>
> Please help me on resolving this use cases.
>
> Thanks,
> Jaswin
>
>
> Get Outlook for Android <https://aka.ms/ghei36>
>