You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2016/09/07 07:59:12 UTC

Re: Re: 回复:Re: modify coGroup GlobalWindows_GlobalWindow

Hi Rimin,

I have to admit that I don't really understand what you're trying to
achieve. Could you try again to explain your problem?

Cheers,
Till

On Tue, Sep 6, 2016 at 3:53 PM, <ri...@sina.cn> wrote:

> think your anwser.
> but i can not get your ideal."If all elements of "words2" have been
> processed, the right side of your coGroup will always be empty no matter
> what is incoming in your socketTextStream.",the mean i can not get.
> the following is the ideal from me(it maybe error):
> the coGroup will create new dataStream,T1 and T2,this must use
> GlobalWindows to store all elements from T2,if use timeWindow or others,the
> T2's element will not all store.
> ------------------
> T1, T2
> ------------------
> and into apply function,get result,
> when input first element,the T1 will add one element,
> ---------------------------------
> T1(+first), T2
> ---------------------------------
> and into apply function,get result.
> when input second element,the T1 will add one element,
> ---------------------------------------
> T1(+first+second), T2
> -----------------------------------------
> and into apply function,get result.
> ***************************************************
> but,in fact ,i want to get the datastream like this,
> -------------------------------
> T1, T2
> --------------------------------
> when input first ,is follow:
> --------------------------------
> T1(+first), T2
> ---------------------------------
> when input second, is follow:
> --------------------------------
> T1(+second), T2
> --------------------------------
> so the first must fired,this is my intention.
> and i try to cut socket input datastream,use countWindow or timewindow,it
> is not work,when use coGroup,the datastream is T1 and T2,they are a whole
> ,so i think must window the coGroup.
> ----- 原始邮件 -----
> 发件人:Timo Walther <tw...@apache.org>
> 收件人:user@flink.apache.org
> 主题:Re: 回复:Re: modify coGroup GlobalWindows_GlobalWindow
> 日期:2016年09月06日 20点52分
>
> I think you have to rethink your approach. In your example "words2" is a
> stream but only with a fixed set of elements. If all elements of "words2"
> have been processed, the right side of your coGroup will always be empty no
> matter what is incoming in your socketTextStream.
> It is not read in over and over again. Is that your intention?
>
>
> Am 06/09/16 um 13:15 schrieb rimin515@sina.cn:
>
> i try read data into a list or List[Map] to store the T2,but i think if
> use list or List[Map],there is not parallelization,so i want to use coGroup.
> other hand,the coGroup function is join the T1 and T2,and must have window
> and trigger method,the window is cut the T1 and T2,
> the trigger is trigger the apply function when input to the trigger
> threshold.
> from the result,in apply(), i use my InnerJoinFunction,and output the T1
> and T2,we can see when input data,and trigger the apply,into the
> InnerJoinFunction,the T1 and T2 will output,
> the T1 is increase,and T2 is not change, so the window cut the T1 and T2
> do not achieve mine goal,so i want to write my "GlobalWindows.create()".
> and Flink's operator state i have no ideal for it,and really do not know
> how to use it.can you give me a example.
> ----- 原始邮件 -----
> 发件人:Timo Walther <tw...@apache.org> <tw...@apache.org>
> 收件人:user@flink.apache.org
> 主题:Re: modify coGroup GlobalWindows GlobalWindow
> 日期:2016年09月06日 17点52分
>
> Hi,
>
> will words2 always remain constant? If yes, you don't have to create a
> stream out of it and coGroup it, but you could simply pass the collection
> to Map/FlatMap function and do the joining there without the need of a
> window. Btw. you know that non-keyed global windows do not scale?
> If I understand your code correctly, you just want to get a stream with
> the last T2, right? I don't think you have to implement your own
> "GlobalWindow" for that. Have you tried to use Flink's operator state for
> that? So that if the state is growing it can be written to disk.
>
> Hope that helps.
>
> Timo
>
> Am 06/09/16 um 10:05 schrieb rimin515@sina.cn:
>
> Hi,
>       the follow code:
>
>     val text = env.socketTextStream(hostName, port)
>     val words1 = text.map { x =>
>       val res = x.split(",")
>       (res.apply(0)->res.apply(1))
>     }
>
>     val words2 = env.fromElements(("a","w1"),("
> a","w2"),("c","w3"),("d","w4"))
>     val joinedStream = words1
>       .coGroup(words2)
>       .where(_._1)
>       .equalTo(_._1)
>       .window(GlobalWindows.create())
>       .trigger(CountTrigger.of(1))
>
>       val res = joinedStream.apply(new InnerJoinFunction).print()
>
>     env.execute()
>
>
> class InnerJoinFunction extends CoGroupFunction[(String,
> String),(String,String),(String,String)]{
>
>     override def coGroup(T1: java.lang.Iterable[(String,String)],
>         T2: java.lang.Iterable[(String,String)],
>         out: Collector[(String, String)]): Unit = {
>         println("****************************")
>         println("T1="+T1+"T2="+T2)
>       import scala.collection.JavaConverters._
>       val scalaT2 = T2.asScala.toList
>       if(!T1.asScala.isEmpty && scalaT2.nonEmpty){
>           val transaction = T1.asScala.last
>            println("T2 last="+transaction)
>           for(snapshot <- scalaT2){
>             out.collect(transaction._1,transaction._2+snapshot._2)
>           }
>       }
>     }
>   }
> --------------------------------
> the code have no proplem,and can run,the follow is the result:(input "a,1"
> then input "a,2")
>
> ****************************
> T1=[(a,1)]T2=[(a,w2), (a,w1)]
> T2 last=(a,1)
> 2> (a,1w2)
> 2> (a,1w1)
> ****************************
> T1=[(a,1), (a,2)]T2=[(a,w2), (a,w1)]
> T2 last=(a,2)
> 2> (a,2w2)
> 2> (a,2w1)
> --------------------------------------------------
> the T1 is increase,and T2 is not change.i worry,when  input so many,the T1
> will out of storage.
> so i want to write my "GlobalWindows.create()", to achieve T1 will store
> the only one,from input(or read from kafka),and the history of T1 will
> clear(input a,1 T1 is [(a,1)],then input a,2,T1 is [(a,2)],not T1=[(a,1),
> (a,2)]),but T2 will not change.
>
> i rewrite the "GlobalWindows",but it do not work,i read the code,find must
> rewrite the "GlobalWindow",and must modify "the class Serializer extends
> TypeSerializer<MyGlobalWindow>",but when i run,it can not into there,why?
> some can tell me?
>
>
>
> --
> Freundliche Grüße / Kind Regards
>
> Timo Walther
>
> Follow me: @twalthrhttps://www.linkedin.com/in/twalthr
>
>
>
> --
> Freundliche Grüße / Kind Regards
>
> Timo Walther
>
> Follow me: @twalthrhttps://www.linkedin.com/in/twalthr
>
>