You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Averell <lv...@gmail.com> on 2018/08/15 05:51:07 UTC

CoFlatMapFunction with more than two input streams

Hi,

I have stream_A of type "Dog", which needs to be transformed using data from
stream_C of type "Name_Mapping". As stream_C is a slow one (data is not
being updated frequently), to do the transformation I connect two streams,
do a keyBy, and then use a RichCoFlatMapFunction in which mapping data from
stream_C is saved into a State (flatMap1 generates 1 output, while flatMap2
is just to update State table, not generating any output).

Now I have another stream B of type "Cat", which also needs to be
transformed using data from stream_C. After that transformation,
transformed_B will go through a completely different pipeline from
transformed A. 

I can see two approaches for this:
1. duplicate stream_C and the RichCoFlatMapFunction and apply on stream_B
2. create a new stream D of type "Animal", transform it with C, then split
the result into two streams using split/select using case class pattern
matching.

My question is which option should I choose?
With option 1, at least I need to maintain two State tables, let alone the
cost for duplicating stream (I am not sure how expensive this is in term of
resource), and the requirement on duplicating the CoFlatMapFunction (*).
With option 2, there's additional cost coming from unioning,
splitting/selecting, and type-casting at the final streams. 
Is there any better option for me?

Thank you very much for your support.
Regards,
Averell

(*) I am using Scala, and I tried to create a RichCoFlatMapFunction of type
[Animal, Name_Mapping] but it cannot be used for a stream of [Dog,
Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the
Function as well.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: CoFlatMapFunction with more than two input streams

Posted by Averell <lv...@gmail.com>.
Thank you Xingcan.
Regarding that Either, I still see the need to do TypeCasting/CaseClass
matching. Could you please help give a look?
val transformed = dog
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
.union(cat)
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;.connect(transformer)
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .keyBy(r
=&gt; r.name, r2 =&gt; r2.name)
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
.process(new TransformerCoProcessFunction)
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .split(_
match {

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
case Right(d) =&gt; List("dog")

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
case Left(c) =&gt; List("cat")

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
case _ =&gt; List("")
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; })
 
 val transformed_dog = transformed.select("dog").map(_ match {

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
case Right(d) =&gt; d

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
case _ =&gt; NON_EXIST_DOG
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; })
 val transformed_cat = transformed.select("cat").map(_ match {

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
case Left(c) =&gt; c

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
case _ =&gt; NON_EXIST_CAT
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; })
Thanks!
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: CoFlatMapFunction with more than two input streams

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Averell,

With the CoProcessFunction, you could get access to the time-related services which may be useful when maintaining the elements in Stream_C and you could get rid of type casting with the Either class.

Best,
Xingcan

> On Aug 15, 2018, at 3:27 PM, Averell <lv...@gmail.com> wrote:
> 
> Thank you Vino & Xingcan.
> @Vino: could you help explain more details on using DBMS? Would that be with
> using TableAPI, or you meant directly reading DBMS data inside the
> ProcessFunction?
> 
> @Xingcan: I don't know what are the benefits of using CoProcess over
> RichCoFlatMap in this case.
> Regarding using Either wrapper, as my understanding, I would need to use
> that both in my sources (stream_A and B) and in the
> CoProcess/CoFlatMapFunction. Then using a super class Animal would be more
> convenient, wouldn't it?
> 
> Thanks and regards,
> Averell
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: CoFlatMapFunction with more than two input streams

Posted by vino yang <ya...@gmail.com>.
Hi Averell,

What I mean is that if you store stream_c data in an RDBMS, you can access
the RDBMS directly in the CoFlatMapFunction instead of using the Table API.
This is somewhat similar to stream and dimension table joins.
Of course, the premise of adopting this option is that the amount of data
in stream_c is not particularly large and will not be updated frequently.

Thanks, vino.

Averell <lv...@gmail.com> 于2018年8月15日周三 下午3:27写道:

> Thank you Vino & Xingcan.
> @Vino: could you help explain more details on using DBMS? Would that be
> with
> using TableAPI, or you meant directly reading DBMS data inside the
> ProcessFunction?
>
> @Xingcan: I don't know what are the benefits of using CoProcess over
> RichCoFlatMap in this case.
> Regarding using Either wrapper, as my understanding, I would need to use
> that both in my sources (stream_A and B) and in the
> CoProcess/CoFlatMapFunction. Then using a super class Animal would be more
> convenient, wouldn't it?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: CoFlatMapFunction with more than two input streams

Posted by Averell <lv...@gmail.com>.
Thank you Vino & Xingcan.
@Vino: could you help explain more details on using DBMS? Would that be with
using TableAPI, or you meant directly reading DBMS data inside the
ProcessFunction?

@Xingcan: I don't know what are the benefits of using CoProcess over
RichCoFlatMap in this case.
Regarding using Either wrapper, as my understanding, I would need to use
that both in my sources (stream_A and B) and in the
CoProcess/CoFlatMapFunction. Then using a super class Animal would be more
convenient, wouldn't it?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: CoFlatMapFunction with more than two input streams

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Averell,

I am also in favor of option 2. Besides, you could use CoProcessFunction instead of CoFlatMapFunction and try to wrap elements of stream_A and stream_B using the `Either` class.

Best,
Xingcan

> On Aug 15, 2018, at 2:24 PM, vino yang <ya...@gmail.com> wrote:
> 
> Hi Averell,
> 
> As far as these two solutions are concerned, I think you can only choose option 2, because as you have stated, the current Flink DataStream API does not support the replacement of one of the input stream types of CoFlatMapFunction. Another choice:
> 
> 1. Split it into two separate jobs. But in comparison, I still think that Option 2 is better.
> 2. Since you said that stream_c is slower and has fewer updates, if it is not very large, you can store it in the RDBMS and then join it with stream_a and stream_b respectively (using CoFlatMapFunction as well).
> 
> I think you should give priority to your option 2.
> 
> Thanks, vino.
> 
> Averell <lvhuyen@gmail.com <ma...@gmail.com>> 于2018年8月15日周三 下午1:51写道:
> Hi,
> 
> I have stream_A of type "Dog", which needs to be transformed using data from
> stream_C of type "Name_Mapping". As stream_C is a slow one (data is not
> being updated frequently), to do the transformation I connect two streams,
> do a keyBy, and then use a RichCoFlatMapFunction in which mapping data from
> stream_C is saved into a State (flatMap1 generates 1 output, while flatMap2
> is just to update State table, not generating any output).
> 
> Now I have another stream B of type "Cat", which also needs to be
> transformed using data from stream_C. After that transformation,
> transformed_B will go through a completely different pipeline from
> transformed A. 
> 
> I can see two approaches for this:
> 1. duplicate stream_C and the RichCoFlatMapFunction and apply on stream_B
> 2. create a new stream D of type "Animal", transform it with C, then split
> the result into two streams using split/select using case class pattern
> matching.
> 
> My question is which option should I choose?
> With option 1, at least I need to maintain two State tables, let alone the
> cost for duplicating stream (I am not sure how expensive this is in term of
> resource), and the requirement on duplicating the CoFlatMapFunction (*).
> With option 2, there's additional cost coming from unioning,
> splitting/selecting, and type-casting at the final streams. 
> Is there any better option for me?
> 
> Thank you very much for your support.
> Regards,
> Averell
> 
> (*) I am using Scala, and I tried to create a RichCoFlatMapFunction of type
> [Animal, Name_Mapping] but it cannot be used for a stream of [Dog,
> Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the
> Function as well.
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>


Re: CoFlatMapFunction with more than two input streams

Posted by vino yang <ya...@gmail.com>.
Hi Averell,

As far as these two solutions are concerned, I think you can only choose
option 2, because as you have stated, the current Flink DataStream API does
not support the replacement of one of the input stream types of
CoFlatMapFunction. Another choice:

1. Split it into two separate jobs. But in comparison, I still think that
Option 2 is better.
2. Since you said that stream_c is slower and has fewer updates, if it is
not very large, you can store it in the RDBMS and then join it with
stream_a and stream_b respectively (using CoFlatMapFunction as well).

I think you should give priority to your option 2.

Thanks, vino.

Averell <lv...@gmail.com> 于2018年8月15日周三 下午1:51写道:

> Hi,
>
> I have stream_A of type "Dog", which needs to be transformed using data
> from
> stream_C of type "Name_Mapping". As stream_C is a slow one (data is not
> being updated frequently), to do the transformation I connect two streams,
> do a keyBy, and then use a RichCoFlatMapFunction in which mapping data from
> stream_C is saved into a State (flatMap1 generates 1 output, while flatMap2
> is just to update State table, not generating any output).
>
> Now I have another stream B of type "Cat", which also needs to be
> transformed using data from stream_C. After that transformation,
> transformed_B will go through a completely different pipeline from
> transformed A.
>
> I can see two approaches for this:
> 1. duplicate stream_C and the RichCoFlatMapFunction and apply on stream_B
> 2. create a new stream D of type "Animal", transform it with C, then split
> the result into two streams using split/select using case class pattern
> matching.
>
> My question is which option should I choose?
> With option 1, at least I need to maintain two State tables, let alone the
> cost for duplicating stream (I am not sure how expensive this is in term of
> resource), and the requirement on duplicating the CoFlatMapFunction (*).
> With option 2, there's additional cost coming from unioning,
> splitting/selecting, and type-casting at the final streams.
> Is there any better option for me?
>
> Thank you very much for your support.
> Regards,
> Averell
>
> (*) I am using Scala, and I tried to create a RichCoFlatMapFunction of type
> [Animal, Name_Mapping] but it cannot be used for a stream of [Dog,
> Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the
> Function as well.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>