You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2014/12/22 11:47:47 UTC

Union of multiple datasets vs Join

Hi guys,

In my use case I have multiple Datasets with the same structure (e.g.
Tuple3) and I want to produce an output Dataset containing all Tuple3
grouped by the first field (0).
I can obtain the same results performing a union of all datasets and then a
group by (simplest implementation) or join all of them pairwise
(((A->B)->C)->D)..) or I don't know if there is any other solution. When
should I use the first or the second approach? Could you help me in
figuring out the internals of the two approaches? I always have some fear
when using multiple joins when I don't know exactly their size..

Best,
Flavio

Re: Union of multiple datasets vs Join

Posted by Flavio Pompermaier <po...@okkam.it>.
I don't know if that could be useful, do you?

On Tue, Mar 17, 2015 at 10:29 PM, Stephan Ewen <se...@apache.org> wrote:

> Yes, that is the way to do it.
>
> This makes me think that it would be nice to have a method that builds the
> union of a list of data sets.
>
> DataSet<T> union(DataSet<T>... sets)
>
> It would be implemented like in your loop. Would that be helpful?
>
> Stephan
>
>
> On Tue, Mar 17, 2015 at 6:03 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> As always one minute after I sent the email I found the problem!
>> It was that I should reassign the initial dataset:
>>     ret = ret.union(sourceDs);
>>
>> Bye,
>> Flavio
>>
>> On Tue, Mar 17, 2015 at 5:58 PM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> Hi Fabian,
>>> I was trying to use the strategy you suggested with flink 0.8.1 but it
>>> seems that the union of the datasets cannot be created programmatically
>>> because the union operator gives a name to the generated dataset that is
>>> the name of the calling function so that  only the first dataset is read.
>>> My code looks like:
>>>
>>> private static DataSet<Tuple6<...> getSourceDs(ExecutionEnvironment env,
>>>  final String outputGraph, List<String> tableNames) {
>>> DataSet<Tuple6<...>> ret = null;
>>> for (String tableName : tableNames) {
>>> DataSet<Tuple6<...>> sourceDs = env.createInput(new
>>> MyTableInputFormat(tableName))
>>>                         ....
>>>
>>> if(ret==null)
>>> ret = sourceDs;
>>> else
>>> ret.union(sourceDs);
>>>                }
>>>               return ret;
>>>        }
>>>
>>> Is this a bug or am I'm doing something wrong?
>>> Thanks in advance,
>>> Flavio
>>>
>>> On Mon, Dec 22, 2014 at 2:42 PM, <fh...@gmail.com> wrote:
>>>
>>>>  Union is just combining data from multiple sources into a single
>>>> dataset.
>>>> That’s it. No memory, no disk involved.
>>>>
>>>> In you case you have
>>>>
>>>> input1.union(input2).groupBy(1).reduce(…)
>>>>
>>>> This will translate into:
>>>>
>>>> input1 -> repartition ->
>>>>                                         read-both-inputs ->  sort ->
>>>> reduce
>>>> input2 -> repartition ->
>>>>
>>>> So, in your case not even additional network transfer is involved,
>>>> because both data sets would need to be partitioned for the reduce anyway.
>>>>
>>>> Note, union in Flink has SQL union-all semantics, i.e., there is
>>>> not removal of duplicates.
>>>>
>>>> Cheers, Fabian
>>>>
>>>> *From:* Flavio Pompermaier <po...@okkam.it>
>>>> *Sent:* ‎Monday‎, ‎22‎. ‎December‎, ‎2014 ‎14‎:‎32
>>>> *To:* user@flink.incubator.apache.org
>>>>
>>>> Ok thanks Fabian. I'd like just to know the internals of the union of
>>>> multiple datasets (partitioning, distribution among server, memory/disk,
>>>> etc..). Do you have any ref to this?
>>>>
>>>> Thanks in advance,
>>>> Flavio
>>>>
>>>> On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <fh...@apache.org>
>>>> wrote:
>>>>
>>>>> Follow the first approach.
>>>>> Joins are expensive, union comes for free.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>
>>>>>> Hi guys,
>>>>>>
>>>>>> In my use case I have multiple Datasets with the same structure (e.g.
>>>>>> Tuple3) and I want to produce an output Dataset containing all Tuple3
>>>>>> grouped by the first field (0).
>>>>>> I can obtain the same results performing a union of all datasets and
>>>>>> then a group by (simplest implementation) or join all of them pairwise
>>>>>> (((A->B)->C)->D)..) or I don't know if there is any other solution. When
>>>>>> should I use the first or the second approach? Could you help me in
>>>>>> figuring out the internals of the two approaches? I always have some fear
>>>>>> when using multiple joins when I don't know exactly their size..
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>
>>>>>
>>>>
>>
>

Re: Union of multiple datasets vs Join

Posted by Stephan Ewen <se...@apache.org>.
Yes, that is the way to do it.

This makes me think that it would be nice to have a method that builds the
union of a list of data sets.

DataSet<T> union(DataSet<T>... sets)

It would be implemented like in your loop. Would that be helpful?

Stephan


On Tue, Mar 17, 2015 at 6:03 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> As always one minute after I sent the email I found the problem!
> It was that I should reassign the initial dataset:
>     ret = ret.union(sourceDs);
>
> Bye,
> Flavio
>
> On Tue, Mar 17, 2015 at 5:58 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Hi Fabian,
>> I was trying to use the strategy you suggested with flink 0.8.1 but it
>> seems that the union of the datasets cannot be created programmatically
>> because the union operator gives a name to the generated dataset that is
>> the name of the calling function so that  only the first dataset is read.
>> My code looks like:
>>
>> private static DataSet<Tuple6<...> getSourceDs(ExecutionEnvironment env, final
>> String outputGraph, List<String> tableNames) {
>> DataSet<Tuple6<...>> ret = null;
>> for (String tableName : tableNames) {
>> DataSet<Tuple6<...>> sourceDs = env.createInput(new
>> MyTableInputFormat(tableName))
>>                         ....
>>
>> if(ret==null)
>> ret = sourceDs;
>> else
>> ret.union(sourceDs);
>>                }
>>               return ret;
>>        }
>>
>> Is this a bug or am I'm doing something wrong?
>> Thanks in advance,
>> Flavio
>>
>> On Mon, Dec 22, 2014 at 2:42 PM, <fh...@gmail.com> wrote:
>>
>>>  Union is just combining data from multiple sources into a single
>>> dataset.
>>> That’s it. No memory, no disk involved.
>>>
>>> In you case you have
>>>
>>> input1.union(input2).groupBy(1).reduce(…)
>>>
>>> This will translate into:
>>>
>>> input1 -> repartition ->
>>>                                         read-both-inputs ->  sort ->
>>> reduce
>>> input2 -> repartition ->
>>>
>>> So, in your case not even additional network transfer is involved,
>>> because both data sets would need to be partitioned for the reduce anyway.
>>>
>>> Note, union in Flink has SQL union-all semantics, i.e., there is
>>> not removal of duplicates.
>>>
>>> Cheers, Fabian
>>>
>>> *From:* Flavio Pompermaier <po...@okkam.it>
>>> *Sent:* ‎Monday‎, ‎22‎. ‎December‎, ‎2014 ‎14‎:‎32
>>> *To:* user@flink.incubator.apache.org
>>>
>>> Ok thanks Fabian. I'd like just to know the internals of the union of
>>> multiple datasets (partitioning, distribution among server, memory/disk,
>>> etc..). Do you have any ref to this?
>>>
>>> Thanks in advance,
>>> Flavio
>>>
>>> On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <fh...@apache.org>
>>> wrote:
>>>
>>>> Follow the first approach.
>>>> Joins are expensive, union comes for free.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> In my use case I have multiple Datasets with the same structure (e.g.
>>>>> Tuple3) and I want to produce an output Dataset containing all Tuple3
>>>>> grouped by the first field (0).
>>>>> I can obtain the same results performing a union of all datasets and
>>>>> then a group by (simplest implementation) or join all of them pairwise
>>>>> (((A->B)->C)->D)..) or I don't know if there is any other solution. When
>>>>> should I use the first or the second approach? Could you help me in
>>>>> figuring out the internals of the two approaches? I always have some fear
>>>>> when using multiple joins when I don't know exactly their size..
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>>
>>>
>

Re: Union of multiple datasets vs Join

Posted by Flavio Pompermaier <po...@okkam.it>.
As always one minute after I sent the email I found the problem!
It was that I should reassign the initial dataset:
    ret = ret.union(sourceDs);

Bye,
Flavio

On Tue, Mar 17, 2015 at 5:58 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Hi Fabian,
> I was trying to use the strategy you suggested with flink 0.8.1 but it
> seems that the union of the datasets cannot be created programmatically
> because the union operator gives a name to the generated dataset that is
> the name of the calling function so that  only the first dataset is read.
> My code looks like:
>
> private static DataSet<Tuple6<...> getSourceDs(ExecutionEnvironment env, final
> String outputGraph, List<String> tableNames) {
> DataSet<Tuple6<...>> ret = null;
> for (String tableName : tableNames) {
> DataSet<Tuple6<...>> sourceDs = env.createInput(new
> MyTableInputFormat(tableName))
>                         ....
>
> if(ret==null)
> ret = sourceDs;
> else
> ret.union(sourceDs);
>                }
>               return ret;
>        }
>
> Is this a bug or am I'm doing something wrong?
> Thanks in advance,
> Flavio
>
> On Mon, Dec 22, 2014 at 2:42 PM, <fh...@gmail.com> wrote:
>
>>  Union is just combining data from multiple sources into a single
>> dataset.
>> That’s it. No memory, no disk involved.
>>
>> In you case you have
>>
>> input1.union(input2).groupBy(1).reduce(…)
>>
>> This will translate into:
>>
>> input1 -> repartition ->
>>                                         read-both-inputs ->  sort ->
>> reduce
>> input2 -> repartition ->
>>
>> So, in your case not even additional network transfer is involved,
>> because both data sets would need to be partitioned for the reduce anyway.
>>
>> Note, union in Flink has SQL union-all semantics, i.e., there is
>> not removal of duplicates.
>>
>> Cheers, Fabian
>>
>> *From:* Flavio Pompermaier <po...@okkam.it>
>> *Sent:* ‎Monday‎, ‎22‎. ‎December‎, ‎2014 ‎14‎:‎32
>> *To:* user@flink.incubator.apache.org
>>
>> Ok thanks Fabian. I'd like just to know the internals of the union of
>> multiple datasets (partitioning, distribution among server, memory/disk,
>> etc..). Do you have any ref to this?
>>
>> Thanks in advance,
>> Flavio
>>
>> On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <fh...@apache.org>
>> wrote:
>>
>>> Follow the first approach.
>>> Joins are expensive, union comes for free.
>>>
>>> Best, Fabian
>>>
>>> 2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Hi guys,
>>>>
>>>> In my use case I have multiple Datasets with the same structure (e.g.
>>>> Tuple3) and I want to produce an output Dataset containing all Tuple3
>>>> grouped by the first field (0).
>>>> I can obtain the same results performing a union of all datasets and
>>>> then a group by (simplest implementation) or join all of them pairwise
>>>> (((A->B)->C)->D)..) or I don't know if there is any other solution. When
>>>> should I use the first or the second approach? Could you help me in
>>>> figuring out the internals of the two approaches? I always have some fear
>>>> when using multiple joins when I don't know exactly their size..
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>>
>>

Re: Union of multiple datasets vs Join

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Fabian,
I was trying to use the strategy you suggested with flink 0.8.1 but it
seems that the union of the datasets cannot be created programmatically
because the union operator gives a name to the generated dataset that is
the name of the calling function so that  only the first dataset is read.
My code looks like:

private static DataSet<Tuple6<...> getSourceDs(ExecutionEnvironment env, final
String outputGraph, List<String> tableNames) {
DataSet<Tuple6<...>> ret = null;
for (String tableName : tableNames) {
DataSet<Tuple6<...>> sourceDs = env.createInput(new
MyTableInputFormat(tableName))
                        ....

if(ret==null)
ret = sourceDs;
else
ret.union(sourceDs);
               }
              return ret;
       }

Is this a bug or am I'm doing something wrong?
Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 2:42 PM, <fh...@gmail.com> wrote:

>  Union is just combining data from multiple sources into a single
> dataset.
> That’s it. No memory, no disk involved.
>
> In you case you have
>
> input1.union(input2).groupBy(1).reduce(…)
>
> This will translate into:
>
> input1 -> repartition ->
>                                         read-both-inputs ->  sort -> reduce
> input2 -> repartition ->
>
> So, in your case not even additional network transfer is involved, because
> both data sets would need to be partitioned for the reduce anyway.
>
> Note, union in Flink has SQL union-all semantics, i.e., there is
> not removal of duplicates.
>
> Cheers, Fabian
>
> *From:* Flavio Pompermaier <po...@okkam.it>
> *Sent:* ‎Monday‎, ‎22‎. ‎December‎, ‎2014 ‎14‎:‎32
> *To:* user@flink.incubator.apache.org
>
> Ok thanks Fabian. I'd like just to know the internals of the union of
> multiple datasets (partitioning, distribution among server, memory/disk,
> etc..). Do you have any ref to this?
>
> Thanks in advance,
> Flavio
>
> On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <fh...@apache.org>
> wrote:
>
>> Follow the first approach.
>> Joins are expensive, union comes for free.
>>
>> Best, Fabian
>>
>> 2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Hi guys,
>>>
>>> In my use case I have multiple Datasets with the same structure (e.g.
>>> Tuple3) and I want to produce an output Dataset containing all Tuple3
>>> grouped by the first field (0).
>>> I can obtain the same results performing a union of all datasets and
>>> then a group by (simplest implementation) or join all of them pairwise
>>> (((A->B)->C)->D)..) or I don't know if there is any other solution. When
>>> should I use the first or the second approach? Could you help me in
>>> figuring out the internals of the two approaches? I always have some fear
>>> when using multiple joins when I don't know exactly their size..
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>

Re: Union of multiple datasets vs Join

Posted by fh...@gmail.com.
Union is just combining data from multiple sources into a single dataset. 

That’s it. No memory, no disk involved.


In you case you have


input1.union(input2).groupBy(1).reduce(…)


This will translate into:


input1 -> repartition -> 

                                        read-both-inputs ->  sort -> reduce

input2 -> repartition ->


So, in your case not even additional network transfer is involved, because both data sets would need to be partitioned for the reduce anyway.


Note, union in Flink has SQL union-all semantics, i.e., there is not removal of duplicates.


Cheers, Fabian 






From: Flavio Pompermaier
Sent: ‎Monday‎, ‎22‎. ‎December‎, ‎2014 ‎14‎:‎32
To: user@flink.incubator.apache.org





Ok thanks Fabian. I'd like just to know the internals of the union of multiple datasets (partitioning, distribution among server, memory/disk, etc..). Do you have any ref to this?



Thanks in advance,

Flavio



On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <fh...@apache.org> wrote:


Follow the first approach. 
Joins are expensive, union comes for free.





Best, Fabian





2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:


Hi guys,






In my use case I have multiple Datasets with the same structure (e.g. Tuple3) and I want to produce an output Dataset containing all Tuple3 grouped by the first field (0).

I can obtain the same results performing a union of all datasets and then a group by (simplest implementation) or join all of them pairwise (((A->B)->C)->D)..) or I don't know if there is any other solution. When should I use the first or the second approach? Could you help me in figuring out the internals of the two approaches? I always have some fear when using multiple joins when I don't know exactly their size..




Best,

Flavio

Re: Union of multiple datasets vs Join

Posted by Flavio Pompermaier <po...@okkam.it>.
Ok thanks Fabian. I'd like just to know the internals of the union of
multiple datasets (partitioning, distribution among server, memory/disk,
etc..). Do you have any ref to this?

Thanks in advance,
Flavio

On Mon, Dec 22, 2014 at 12:46 PM, Fabian Hueske <fh...@apache.org> wrote:

> Follow the first approach.
> Joins are expensive, union comes for free.
>
> Best, Fabian
>
> 2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Hi guys,
>>
>> In my use case I have multiple Datasets with the same structure (e.g.
>> Tuple3) and I want to produce an output Dataset containing all Tuple3
>> grouped by the first field (0).
>> I can obtain the same results performing a union of all datasets and then
>> a group by (simplest implementation) or join all of them pairwise
>> (((A->B)->C)->D)..) or I don't know if there is any other solution. When
>> should I use the first or the second approach? Could you help me in
>> figuring out the internals of the two approaches? I always have some fear
>> when using multiple joins when I don't know exactly their size..
>>
>> Best,
>> Flavio
>>
>
>

Re: Union of multiple datasets vs Join

Posted by Fabian Hueske <fh...@apache.org>.
Follow the first approach.
Joins are expensive, union comes for free.

Best, Fabian

2014-12-22 11:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> Hi guys,
>
> In my use case I have multiple Datasets with the same structure (e.g.
> Tuple3) and I want to produce an output Dataset containing all Tuple3
> grouped by the first field (0).
> I can obtain the same results performing a union of all datasets and then
> a group by (simplest implementation) or join all of them pairwise
> (((A->B)->C)->D)..) or I don't know if there is any other solution. When
> should I use the first or the second approach? Could you help me in
> figuring out the internals of the two approaches? I always have some fear
> when using multiple joins when I don't know exactly their size..
>
> Best,
> Flavio
>