You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ri...@sina.cn on 2016/09/06 13:53:46 UTC

回复:Re: 回复:Re: modify coGroup GlobalWindows_GlobalWindow

think your anwser.
but i can not get your ideal."If all elements of "words2" have been processed, the right side of your coGroup will always be empty no matter what is incoming in your socketTextStream.",the mean i can not get.

the following is the ideal from me(it maybe error):
         the coGroup will create new dataStream,T1 and T2,this must use GlobalWindows to store all elements from T2,if use timeWindow or others,the T2's element will not all store.
                   ------------------
                       T1,  T2
                   ------------------
        and into apply function,get result,
 when input first element,the T1 will add one element,
                    ---------------------------------
                       T1(+first),  T2
                   ---------------------------------
         and into apply function,get result.

when input second element,the T1 will add one element,
                    ---------------------------------------
                       T1(+first+second),  T2
                   -----------------------------------------
         and into apply function,get result.
         ***************************************************
but,in fact ,i want to get the datastream like this,
                       -------------------------------
                       T1,  T2
                      --------------------------------
when input first ,is follow:
                       --------------------------------
                          T1(+first),  T2
                      ---------------------------------
when input second, is follow:
                       --------------------------------
                          T1(+second),  T2
                      --------------------------------
so the first must fired,this is my intention.

and i try to cut socket input datastream,use countWindow or timewindow,it is not work,when use coGroup,the datastream is T1 and T2,they are  a whole ,so i think must window the coGroup.
----- 原始邮件 -----
发件人:Timo Walther <tw...@apache.org>
收件人:user@flink.apache.org
主题:Re: 回复:Re: modify coGroup GlobalWindows_GlobalWindow
日期:2016年09月06日 20点52分


  
  
    I think you have to rethink your
      approach. In your example "words2" is a stream but only with a
      fixed set of elements. If all elements of "words2" have been
      processed, the right side of your coGroup will always be empty no
      matter what is incoming in your socketTextStream.

      It is not read in over and over again. Is that your intention?

      

      

      Am 06/09/16 um 13:15 schrieb rimin515@sina.cn:

    
    i try read data into a list or List[Map] to store the
      T2,but i think if use list or List[Map],there is not
      parallelization,so i want to use coGroup.

      other hand,the coGroup function is join the T1 and T2,and must
      have window and trigger method,the window is cut the T1 and T2,

      the trigger is trigger the apply function when input to the
      trigger threshold.

      from the result,in apply(), i use my InnerJoinFunction,and output
      the T1 and T2,we can see when input data,and trigger the
      apply,into the InnerJoinFunction,the T1 and T2 will output,

      the T1 is increase,and T2 is not change, so the window cut the T1
      and T2 do not achieve mine goal,so i want to write my
      "GlobalWindows.create()".

      and Flink's operator state i have no ideal for it,and really do
      not know how to use it.can you give me a example.

      
        ----- 原始邮件 -----

          发件人:Timo Walther <tw...@apache.org>

          收件人:user@flink.apache.org

          主题:Re: modify coGroup GlobalWindows GlobalWindow

          日期:2016年09月06日 17点52分

        
        

        Hi,

          

          will words2 always remain constant? If yes, you don't have to
          create a stream out of it and coGroup it, but you could simply
          pass the collection to Map/FlatMap function and do the joining
          there without the need of a window. Btw. you know that
          non-keyed global windows do not scale? 

          If I understand your code correctly, you just want to get a
          stream with the last T2, right? I don't think you have to
          implement your own "GlobalWindow" for that. Have you tried to
          use Flink's operator state for that? So that if the state is
          growing it can be written to disk.

          

          Hope that helps.

          

          Timo

          

          Am 06/09/16 um 10:05 schrieb rimin515@sina.cn:

        
        
          
            Hi,
                  the follow code:
            

              
                val text =
                env.socketTextStream(hostName, port)
                val words1 = text.map { x =>
                  val res = x.split(",")
                 
                (res.apply(0)->res.apply(1))
                }
                
                val words2 =
                env.fromElements(("a","w1"),("a","w2"),("c","w3"),("d","w4"))
                val joinedStream = words1
                  .coGroup(words2)
                  .where(_._1)
                  .equalTo(_._1)
                 
                .window(GlobalWindows.create())
                  .trigger(CountTrigger.of(1))
            

              
                  val res =
                joinedStream.apply(new InnerJoinFunction).print()
            

              
                env.execute()
            

            
            

            
            
              class InnerJoinFunction extends
                CoGroupFunction[(String,String),(String,String),(String,String)]{
                 
                  override def coGroup(T1:
                java.lang.Iterable[(String,String)], 
                      T2: java.lang.Iterable[(String,String)], 
                      out: Collector[(String, String)]): Unit = {
                      println("****************************")
                      println("T1="+T1+"T2="+T2)
                    import scala.collection.JavaConverters._
                    val scalaT2 = T2.asScala.toList
                    if(!T1.asScala.isEmpty &&
                scalaT2.nonEmpty){
                        val transaction = T1.asScala.last
                         println("T2 last="+transaction)
                        for(snapshot <- scalaT2){
                         
                out.collect(transaction._1,transaction._2+snapshot._2)
                        }
                    }
                  }
                }
            
            --------------------------------

            
            
              the code have no proplem,and can
                run,the follow is the result:(input "a,1" then input
                "a,2")
              

              
              
                ****************************
                T1=[(a,1)]T2=[(a,w2), (a,w1)]
                T2 last=(a,1)
                2> (a,1w2)
                2> (a,1w1)
                ****************************
                T1=[(a,1), (a,2)]T2=[(a,w2),
                    (a,w1)]
                T2 last=(a,2)
                2> (a,2w2)
                2> (a,2w1)
              
              --------------------------------------------------
              the T1 is increase,and T2 is not
                change.i worry,when  input so many,the T1 will out of
                storage.
              so i want to write my
                "GlobalWindows.create()", to achieve T1 will store the
                only one,from input(or read from kafka),and the history
                of T1 will clear(input a,1 T1 is [(a,1)],then input
                a,2,T1 is [(a,2)],not T1=[(a,1), (a,2)]),but T2 will not
                change.
              

              
              i rewrite the "GlobalWindows",but it
                do not work,i read the code,find must rewrite the
                "GlobalWindow",and must modify "the class Serializer
                extends TypeSerializer<MyGlobalWindow>",but when i
                run,it can not into there,why? some can tell me?
            
          
        
        

        

        
        -- 
Freundliche Grüße / Kind Regards

Timo Walther 

Follow me: @twalthr
https://www.linkedin.com/in/twalthr
      
    
    

    

    
    -- 
Freundliche Grüße / Kind Regards

Timo Walther 

Follow me: @twalthr
https://www.linkedin.com/in/twalthr
  



Re: Re: 回复:Re: modify coGroup GlobalWindows_GlobalWindow

Posted by Till Rohrmann <tr...@apache.org>.
Hi Rimin,

I have to admit that I don't really understand what you're trying to
achieve. Could you try again to explain your problem?

Cheers,
Till

On Tue, Sep 6, 2016 at 3:53 PM, <ri...@sina.cn> wrote:

> think your anwser.
> but i can not get your ideal."If all elements of "words2" have been
> processed, the right side of your coGroup will always be empty no matter
> what is incoming in your socketTextStream.",the mean i can not get.
> the following is the ideal from me(it maybe error):
> the coGroup will create new dataStream,T1 and T2,this must use
> GlobalWindows to store all elements from T2,if use timeWindow or others,the
> T2's element will not all store.
> ------------------
> T1, T2
> ------------------
> and into apply function,get result,
> when input first element,the T1 will add one element,
> ---------------------------------
> T1(+first), T2
> ---------------------------------
> and into apply function,get result.
> when input second element,the T1 will add one element,
> ---------------------------------------
> T1(+first+second), T2
> -----------------------------------------
> and into apply function,get result.
> ***************************************************
> but,in fact ,i want to get the datastream like this,
> -------------------------------
> T1, T2
> --------------------------------
> when input first ,is follow:
> --------------------------------
> T1(+first), T2
> ---------------------------------
> when input second, is follow:
> --------------------------------
> T1(+second), T2
> --------------------------------
> so the first must fired,this is my intention.
> and i try to cut socket input datastream,use countWindow or timewindow,it
> is not work,when use coGroup,the datastream is T1 and T2,they are a whole
> ,so i think must window the coGroup.
> ----- 原始邮件 -----
> 发件人:Timo Walther <tw...@apache.org>
> 收件人:user@flink.apache.org
> 主题:Re: 回复:Re: modify coGroup GlobalWindows_GlobalWindow
> 日期:2016年09月06日 20点52分
>
> I think you have to rethink your approach. In your example "words2" is a
> stream but only with a fixed set of elements. If all elements of "words2"
> have been processed, the right side of your coGroup will always be empty no
> matter what is incoming in your socketTextStream.
> It is not read in over and over again. Is that your intention?
>
>
> Am 06/09/16 um 13:15 schrieb rimin515@sina.cn:
>
> i try read data into a list or List[Map] to store the T2,but i think if
> use list or List[Map],there is not parallelization,so i want to use coGroup.
> other hand,the coGroup function is join the T1 and T2,and must have window
> and trigger method,the window is cut the T1 and T2,
> the trigger is trigger the apply function when input to the trigger
> threshold.
> from the result,in apply(), i use my InnerJoinFunction,and output the T1
> and T2,we can see when input data,and trigger the apply,into the
> InnerJoinFunction,the T1 and T2 will output,
> the T1 is increase,and T2 is not change, so the window cut the T1 and T2
> do not achieve mine goal,so i want to write my "GlobalWindows.create()".
> and Flink's operator state i have no ideal for it,and really do not know
> how to use it.can you give me a example.
> ----- 原始邮件 -----
> 发件人:Timo Walther <tw...@apache.org> <tw...@apache.org>
> 收件人:user@flink.apache.org
> 主题:Re: modify coGroup GlobalWindows GlobalWindow
> 日期:2016年09月06日 17点52分
>
> Hi,
>
> will words2 always remain constant? If yes, you don't have to create a
> stream out of it and coGroup it, but you could simply pass the collection
> to Map/FlatMap function and do the joining there without the need of a
> window. Btw. you know that non-keyed global windows do not scale?
> If I understand your code correctly, you just want to get a stream with
> the last T2, right? I don't think you have to implement your own
> "GlobalWindow" for that. Have you tried to use Flink's operator state for
> that? So that if the state is growing it can be written to disk.
>
> Hope that helps.
>
> Timo
>
> Am 06/09/16 um 10:05 schrieb rimin515@sina.cn:
>
> Hi,
>       the follow code:
>
>     val text = env.socketTextStream(hostName, port)
>     val words1 = text.map { x =>
>       val res = x.split(",")
>       (res.apply(0)->res.apply(1))
>     }
>
>     val words2 = env.fromElements(("a","w1"),("
> a","w2"),("c","w3"),("d","w4"))
>     val joinedStream = words1
>       .coGroup(words2)
>       .where(_._1)
>       .equalTo(_._1)
>       .window(GlobalWindows.create())
>       .trigger(CountTrigger.of(1))
>
>       val res = joinedStream.apply(new InnerJoinFunction).print()
>
>     env.execute()
>
>
> class InnerJoinFunction extends CoGroupFunction[(String,
> String),(String,String),(String,String)]{
>
>     override def coGroup(T1: java.lang.Iterable[(String,String)],
>         T2: java.lang.Iterable[(String,String)],
>         out: Collector[(String, String)]): Unit = {
>         println("****************************")
>         println("T1="+T1+"T2="+T2)
>       import scala.collection.JavaConverters._
>       val scalaT2 = T2.asScala.toList
>       if(!T1.asScala.isEmpty && scalaT2.nonEmpty){
>           val transaction = T1.asScala.last
>            println("T2 last="+transaction)
>           for(snapshot <- scalaT2){
>             out.collect(transaction._1,transaction._2+snapshot._2)
>           }
>       }
>     }
>   }
> --------------------------------
> the code have no proplem,and can run,the follow is the result:(input "a,1"
> then input "a,2")
>
> ****************************
> T1=[(a,1)]T2=[(a,w2), (a,w1)]
> T2 last=(a,1)
> 2> (a,1w2)
> 2> (a,1w1)
> ****************************
> T1=[(a,1), (a,2)]T2=[(a,w2), (a,w1)]
> T2 last=(a,2)
> 2> (a,2w2)
> 2> (a,2w1)
> --------------------------------------------------
> the T1 is increase,and T2 is not change.i worry,when  input so many,the T1
> will out of storage.
> so i want to write my "GlobalWindows.create()", to achieve T1 will store
> the only one,from input(or read from kafka),and the history of T1 will
> clear(input a,1 T1 is [(a,1)],then input a,2,T1 is [(a,2)],not T1=[(a,1),
> (a,2)]),but T2 will not change.
>
> i rewrite the "GlobalWindows",but it do not work,i read the code,find must
> rewrite the "GlobalWindow",and must modify "the class Serializer extends
> TypeSerializer<MyGlobalWindow>",but when i run,it can not into there,why?
> some can tell me?
>
>
>
> --
> Freundliche Grüße / Kind Regards
>
> Timo Walther
>
> Follow me: @twalthrhttps://www.linkedin.com/in/twalthr
>
>
>
> --
> Freundliche Grüße / Kind Regards
>
> Timo Walther
>
> Follow me: @twalthrhttps://www.linkedin.com/in/twalthr
>
>