You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by iñaki williams <ju...@gmail.com> on 2016/06/09 12:36:22 UTC

Maxby() and KeyBy() question

Hi again!

I am working with two DataStreams, I want to get the maximun value from
each pair of them, for example:

//Informacion (matchName, LocalOdd, AwayOdd)

        Informacion info1= new Informacion("Match1", 1.10, 3.22);
        Informacion info2= new Informacion("Match2", 2.11, 1.10);
        Informacion info3= new Informacion("Match3", 4.10, 1.05);

        Informacion info11= new Informacion("Match1", 1.80, 2.20);
        Informacion info22= new Informacion("Match2", 3.10, 1.15);
        Informacion info33= new Informacion("Match3", 2.12, 1.25);


        DataStream<Informacion> src = see.fromElements(info1,info2, info3);
        DataStream<Informacion> src2 =
see.fromElements(info11,info22,info33);
        DataStream<Informacion> src3= src.union(src2);

        DataStream<Informacion> maxLocal =
src3.keyBy("nombrePartido").maxBy("cuotaGanadorLocal");

        maxLocal.print();



Let's suppose that those are tennis matches with their names and their bet
odds, and the name of the matches are the same on both streams, I mean
Match1=Match1 , Match2=Match2 .... (Image that match 1 name is "Rafa Nadal
- Roger Federer").


I want to get the maximun localOdd from matches with the same name, the
result of my code is:


1> Informacion [matchName=Match2, localOdd=2.11, awayOdd=1.1]
1> *Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
1> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
1>* Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
4> Informacion [matchName=Match1, localOdd=1.1, awayOdd=3.22]
4> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]

It seems like it is taking the biggest value from all the matches and not
by keyed matches


I am looking for this:


Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]



How can I get it?


Thanks in advanced

Re: Maxby() and KeyBy() question

Posted by iñaki williams <ju...@gmail.com>.
Understood!

I have created a WindowStream and now it is working. Thanks !


El jueves, 9 de junio de 2016, Fabian Hueske <fh...@gmail.com> escribió:

> Hi,
>
> you are computing a running aggregate, i.e., you're getting one output
> record for each input record and the output record is the record with the
> largest value observed so far.
> If the record with the largest value is the first, the record is sent out
> another time. This is what happened with Match3 in your example.
>
> There are two ways to compute aggregates on streams: 1) a running
> aggregate as you just did, or 2) a windowed aggregate.
> For a windowed aggregate, you need to need to specify a window. The window
> can be time or count based.
> The following blog post should be a good introduction into Flink's window
> support [1].
>
> Best, Fabian
>
> [1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
>
> 2016-06-09 14:36 GMT+02:00 iñaki williams <juanramallo80@gmail.com
> <javascript:_e(%7B%7D,'cvml','juanramallo80@gmail.com');>>:
>
>> Hi again!
>>
>> I am working with two DataStreams, I want to get the maximun value from
>> each pair of them, for example:
>>
>> //Informacion (matchName, LocalOdd, AwayOdd)
>>
>>         Informacion info1= new Informacion("Match1", 1.10, 3.22);
>>         Informacion info2= new Informacion("Match2", 2.11, 1.10);
>>         Informacion info3= new Informacion("Match3", 4.10, 1.05);
>>
>>         Informacion info11= new Informacion("Match1", 1.80, 2.20);
>>         Informacion info22= new Informacion("Match2", 3.10, 1.15);
>>         Informacion info33= new Informacion("Match3", 2.12, 1.25);
>>
>>
>>         DataStream<Informacion> src = see.fromElements(info1,info2,
>> info3);
>>         DataStream<Informacion> src2 =
>> see.fromElements(info11,info22,info33);
>>         DataStream<Informacion> src3= src.union(src2);
>>
>>         DataStream<Informacion> maxLocal =
>> src3.keyBy("nombrePartido").maxBy("cuotaGanadorLocal");
>>
>>         maxLocal.print();
>>
>>
>>
>> Let's suppose that those are tennis matches with their names and their
>> bet odds, and the name of the matches are the same on both streams, I mean
>> Match1=Match1 , Match2=Match2 .... (Image that match 1 name is "Rafa Nadal
>> - Roger Federer").
>>
>>
>> I want to get the maximun localOdd from matches with the same name, the
>> result of my code is:
>>
>>
>> 1> Informacion [matchName=Match2, localOdd=2.11, awayOdd=1.1]
>> 1> *Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
>> 1> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
>> 1>* Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
>> 4> Informacion [matchName=Match1, localOdd=1.1, awayOdd=3.22]
>> 4> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
>>
>> It seems like it is taking the biggest value from all the matches and not
>> by keyed matches
>>
>>
>> I am looking for this:
>>
>>
>> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
>> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
>> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]
>>
>>
>>
>> How can I get it?
>>
>>
>> Thanks in advanced
>>
>>
>>
>>
>>
>

Re: Maxby() and KeyBy() question

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

you are computing a running aggregate, i.e., you're getting one output
record for each input record and the output record is the record with the
largest value observed so far.
If the record with the largest value is the first, the record is sent out
another time. This is what happened with Match3 in your example.

There are two ways to compute aggregates on streams: 1) a running aggregate
as you just did, or 2) a windowed aggregate.
For a windowed aggregate, you need to need to specify a window. The window
can be time or count based.
The following blog post should be a good introduction into Flink's window
support [1].

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html

2016-06-09 14:36 GMT+02:00 iñaki williams <ju...@gmail.com>:

> Hi again!
>
> I am working with two DataStreams, I want to get the maximun value from
> each pair of them, for example:
>
> //Informacion (matchName, LocalOdd, AwayOdd)
>
>         Informacion info1= new Informacion("Match1", 1.10, 3.22);
>         Informacion info2= new Informacion("Match2", 2.11, 1.10);
>         Informacion info3= new Informacion("Match3", 4.10, 1.05);
>
>         Informacion info11= new Informacion("Match1", 1.80, 2.20);
>         Informacion info22= new Informacion("Match2", 3.10, 1.15);
>         Informacion info33= new Informacion("Match3", 2.12, 1.25);
>
>
>         DataStream<Informacion> src = see.fromElements(info1,info2, info3);
>         DataStream<Informacion> src2 =
> see.fromElements(info11,info22,info33);
>         DataStream<Informacion> src3= src.union(src2);
>
>         DataStream<Informacion> maxLocal =
> src3.keyBy("nombrePartido").maxBy("cuotaGanadorLocal");
>
>         maxLocal.print();
>
>
>
> Let's suppose that those are tennis matches with their names and their bet
> odds, and the name of the matches are the same on both streams, I mean
> Match1=Match1 , Match2=Match2 .... (Image that match 1 name is "Rafa Nadal
> - Roger Federer").
>
>
> I want to get the maximun localOdd from matches with the same name, the
> result of my code is:
>
>
> 1> Informacion [matchName=Match2, localOdd=2.11, awayOdd=1.1]
> 1> *Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
> 1> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
> 1>* Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
> 4> Informacion [matchName=Match1, localOdd=1.1, awayOdd=3.22]
> 4> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
>
> It seems like it is taking the biggest value from all the matches and not
> by keyed matches
>
>
> I am looking for this:
>
>
> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]
>
>
>
> How can I get it?
>
>
> Thanks in advanced
>
>
>
>
>