You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "LINZ, Arnaud" <AL...@bouyguestelecom.fr> on 2015/05/04 10:47:22 UTC

Best way to join with inequalities (historical data)

Hello,

I was wondering how to join large data sets on inequalities.

Let say I have a data set whose “keys” are two timestamps (start time & end time of validity) and value is a label :
        final DataSet<Tuple3<Long, Long, String>> historical = …;

I also have events, with an event name and a timestamp :
        final DataSet<Tuple2<String, Long>> events = …;

I want to join my events with my historical data to get the “active” label for the time of the event.
The simple way is to use a cross product + a filter :

events.cross(historical).filter((crossedRow) -> {
            return (crossedRow.f0.f1 >= crossedRow.f1.f0) && (crossedRow.f0.f1 <= crossedRow.f1.f1);
        })

But that’s not efficient with 2 big data sets…

How would you code that ?

Greetings,
Arnaud





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Re: Best way to join with inequalities (historical data)

Posted by Stephan Ewen <se...@apache.org>.
When you use cross() it is always goot to use crossWithLarge or
crossWithTiny to tell the system which side is small. It can not always
infer that automagically at this point.

If you have an optimized structure for the lookups, go with a broadcast
variable and a map() function.

On Mon, May 4, 2015 at 1:40 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
wrote:

> Hi,
>
> Thanks. The use case I have right now does not require too much magic ; my
> historical data set is small enough to fit in RAM, I'll spread it over each
> node and use a simple mapping with a log(n) look up. It was more a
> theorical question.
> If my dataset becomes too large, I may use some hashing techniques (for
> instance at day level) and cut the intervals at hash frontiers by
> duplicating the row to prevent overlapping.
>
> Arnaud
>
>
>
>
> -----Message d'origine-----
> De : Matthias J. Sax [mailto:mjsax@informatik.hu-berlin.de]
> Envoyé : lundi 4 mai 2015 11:52
> À : user@flink.apache.org
> Objet : Re: Best way to join with inequalities (historical data)
>
> Hi,
>
> there is no other system support to express this join.
>
> However, you could perform some "hand wired" optimization by partitioning
> your input data into distinct intervals. It might be tricky though.
> Especially, if the time-ranges in your "range-key" dataset are overlapping
> everywhere (-> data replication necessary for overlapping parts).
>
> But it might be worth the effort if you can't get the job done using
> cross-product. How large are your data sets? What hardware are you using?
>
>
> -Matthias
>
>
> On 05/04/2015 10:47 AM, LINZ, Arnaud wrote:
> > Hello,
> >
> >
> >
> > I was wondering how to join large data sets on inequalities.
> >
> >
> >
> > Let say I have a data set whose “keys” are two timestamps (start time
> > & end time of validity) and value is a label :
> >
> >         *final*DataSet<Tuple3<Long, Long, String>> historical= …;
> >
> >
> >
> > I also have events, with an event name and a timestamp :
> >
> >         *final*DataSet<Tuple2<String, Long>> events= …;
> >
> >
> >
> > I want to join my events with my historical data to get the “active”
> > label for the time of the event.
> >
> > The simple way is to use a cross product + a filter :
> >
> >
> >
> > events.cross(historical).filter((crossedRow) -> {
> >
> >             *return*(crossedRow.f0.f1>= crossedRow.f1.f0) &&
> > (crossedRow.f0.f1<= crossedRow.f1.f1);
> >
> >         })
> >
> >
> >
> > But that’s not efficient with 2 big data sets…
> >
> >
> >
> > How would you code that ?
> >
> >
> >
> > Greetings,
> >
> > Arnaud
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > ----------------------------------------------------------------------
> > --
> >
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> > expéditrice ne peut être tenue responsable de son contenu ni de ses
> > pièces jointes. Toute utilisation ou diffusion non autorisée est
> > interdite. Si vous n'êtes pas destinataire de ce message, merci de le
> > détruire et d'avertir l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet.
> > The company that sent this message cannot therefore be held liable for
> > its content nor attachments. Any unauthorized use or dissemination is
> > prohibited. If you are not the intended recipient of this message,
> > then please delete it and notify the sender.
>
>

RE: Best way to join with inequalities (historical data)

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,

Thanks. The use case I have right now does not require too much magic ; my historical data set is small enough to fit in RAM, I'll spread it over each node and use a simple mapping with a log(n) look up. It was more a theorical question.
If my dataset becomes too large, I may use some hashing techniques (for instance at day level) and cut the intervals at hash frontiers by duplicating the row to prevent overlapping.

Arnaud




-----Message d'origine-----
De : Matthias J. Sax [mailto:mjsax@informatik.hu-berlin.de] 
Envoyé : lundi 4 mai 2015 11:52
À : user@flink.apache.org
Objet : Re: Best way to join with inequalities (historical data)

Hi,

there is no other system support to express this join.

However, you could perform some "hand wired" optimization by partitioning your input data into distinct intervals. It might be tricky though. Especially, if the time-ranges in your "range-key" dataset are overlapping everywhere (-> data replication necessary for overlapping parts).

But it might be worth the effort if you can't get the job done using cross-product. How large are your data sets? What hardware are you using?


-Matthias


On 05/04/2015 10:47 AM, LINZ, Arnaud wrote:
> Hello,
> 
>  
> 
> I was wondering how to join large data sets on inequalities.
> 
>  
> 
> Let say I have a data set whose “keys” are two timestamps (start time 
> & end time of validity) and value is a label :
> 
>         *final*DataSet<Tuple3<Long, Long, String>> historical= …;
> 
>  
> 
> I also have events, with an event name and a timestamp :
> 
>         *final*DataSet<Tuple2<String, Long>> events= …;
> 
>  
> 
> I want to join my events with my historical data to get the “active”
> label for the time of the event.
> 
> The simple way is to use a cross product + a filter :
> 
>  
> 
> events.cross(historical).filter((crossedRow) -> {
> 
>             *return*(crossedRow.f0.f1>= crossedRow.f1.f0) && 
> (crossedRow.f0.f1<= crossedRow.f1.f1);
> 
>         })
> 
>  
> 
> But that’s not efficient with 2 big data sets…
> 
>  
> 
> How would you code that ?
> 
>  
> 
> Greetings,
> 
> Arnaud
> 
>  
> 
>  
> 
>  
> 
>  
> 
> 
> ----------------------------------------------------------------------
> --
> 
> L'intégrité de ce message n'étant pas assurée sur internet, la société 
> expéditrice ne peut être tenue responsable de son contenu ni de ses 
> pièces jointes. Toute utilisation ou diffusion non autorisée est 
> interdite. Si vous n'êtes pas destinataire de ce message, merci de le 
> détruire et d'avertir l'expéditeur.
> 
> The integrity of this message cannot be guaranteed on the Internet. 
> The company that sent this message cannot therefore be held liable for 
> its content nor attachments. Any unauthorized use or dissemination is 
> prohibited. If you are not the intended recipient of this message, 
> then please delete it and notify the sender.


Re: Best way to join with inequalities (historical data)

Posted by "Matthias J. Sax" <mj...@informatik.hu-berlin.de>.
Hi,

there is no other system support to express this join.

However, you could perform some "hand wired" optimization by
partitioning your input data into distinct intervals. It might be tricky
though. Especially, if the time-ranges in your "range-key" dataset are
overlapping everywhere (-> data replication necessary for overlapping
parts).

But it might be worth the effort if you can't get the job done using
cross-product. How large are your data sets? What hardware are you using?


-Matthias


On 05/04/2015 10:47 AM, LINZ, Arnaud wrote:
> Hello,
> 
>  
> 
> I was wondering how to join large data sets on inequalities.
> 
>  
> 
> Let say I have a data set whose “keys” are two timestamps (start time &
> end time of validity) and value is a label :
> 
>         *final*DataSet<Tuple3<Long, Long, String>> historical= …;
> 
>  
> 
> I also have events, with an event name and a timestamp :
> 
>         *final*DataSet<Tuple2<String, Long>> events= …;
> 
>  
> 
> I want to join my events with my historical data to get the “active”
> label for the time of the event.
> 
> The simple way is to use a cross product + a filter :
> 
>  
> 
> events.cross(historical).filter((crossedRow) -> {
> 
>             *return*(crossedRow.f0.f1>= crossedRow.f1.f0) &&
> (crossedRow.f0.f1<= crossedRow.f1.f1);
> 
>         })
> 
>  
> 
> But that’s not efficient with 2 big data sets…
> 
>  
> 
> How would you code that ?
> 
>  
> 
> Greetings,
> 
> Arnaud
> 
>  
> 
>  
> 
>  
> 
>  
> 
> 
> ------------------------------------------------------------------------
> 
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses
> pièces jointes. Toute utilisation ou diffusion non autorisée est
> interdite. Si vous n'êtes pas destinataire de ce message, merci de le
> détruire et d'avertir l'expéditeur.
> 
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.