You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2018/05/04 12:44:15 UTC

Question about datasource replication

Hi all,
I've a Flink batch job that reads a parquet dataset and then applies 2
flatMap to it (see pseudocode below).
The problem is that this dataset is quite big and Flink duplicates it before
sending the data to these 2 operators (I've guessed this from the doubling
amount of sent bytes) .
Is there a way to avoid this behaviour?

-------------------------------------------------------
Here's the pseudo code of my job:

DataSet X = readParquetDir();
X1 = X.flatMap(...);
X2 = X.flatMap(...);

Best,
Flavio

Re: Question about datasource replication

Posted by Fabian Hueske <fh...@gmail.com>.
The spilling will only happen when joining the branched data sets.
If you keep them separate and eventually emit them, no intermediate data
will be spilled.

2018-05-04 18:05 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Does this duplication happen when I write directly to disk after the
> flatMaps?
>
> On Fri, May 4, 2018 at 6:02 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> That will happen if you join (or coGroup) the branched DataSets, i.e.,
>> you have branching and merging pattern in your stream.
>>
>> The problem in that case is that one of the inputs is pipelined (e.g.,
>> the probe side of a hash join) and the other one is blocking.
>> In order to execute such a plan, we must spill the pipelined data set to
>> disk to ensure that the other input can be fully consumed (to build the
>> hash table).
>>
>> There's not really a solution to this.
>> You could change the join strategy to sort-merge-join but this will sort
>> both inputs and also result in spilling both to disk.
>>
>> 2018-05-04 17:25 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Hi Fabian,
>>> thanks for the detailed reply.
>>> The problem I see is that the source dataset is huge and, since it
>>> doesn't fit in memory, it's spilled twice to disk (I checked the increasing
>>> disk usage during the job and it was corresponding exactly to the size
>>> estimated by the Flink UI, that is twice it's initial size).
>>> Probably there are no problem until you keep data in memory but in my
>>> case it's very problematic this memory explosion :(
>>>
>>> On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> No, there's no way around it.
>>>> DataSets that are processed by more than one operator cannot be
>>>> processed by chained operators.
>>>> The records need to be copied to avoid concurrent modifications.
>>>> However, the data should not be shipped over the network if all operators
>>>> have the same parallelism.
>>>> Instead records are serialized and handed over via local byte[]
>>>> in-memory channels.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>> 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
>>>>>
>>>>> On Fri, May 4, 2018 at 2:50 PM, Amit Jain <aj...@gmail.com> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> Which version of Flink are you using?
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Amit
>>>>>>
>>>>>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>> > Hi all,
>>>>>> > I've a Flink batch job that reads a parquet dataset and then
>>>>>> applies 2
>>>>>> > flatMap to it (see pseudocode below).
>>>>>> > The problem is that this dataset is quite big and Flink duplicates
>>>>>> it before
>>>>>> > sending the data to these 2 operators (I've guessed this from the
>>>>>> doubling
>>>>>> > amount of sent bytes) .
>>>>>> > Is there a way to avoid this behaviour?
>>>>>> >
>>>>>> > -------------------------------------------------------
>>>>>> > Here's the pseudo code of my job:
>>>>>> >
>>>>>> > DataSet X = readParquetDir();
>>>>>> > X1 = X.flatMap(...);
>>>>>> > X2 = X.flatMap(...);
>>>>>> >
>>>>>> > Best,
>>>>>> > Flavio
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>

Re: Question about datasource replication

Posted by Flavio Pompermaier <po...@okkam.it>.
Does this duplication happen when I write directly to disk after the
flatMaps?

On Fri, May 4, 2018 at 6:02 PM, Fabian Hueske <fh...@gmail.com> wrote:

> That will happen if you join (or coGroup) the branched DataSets, i.e., you
> have branching and merging pattern in your stream.
>
> The problem in that case is that one of the inputs is pipelined (e.g., the
> probe side of a hash join) and the other one is blocking.
> In order to execute such a plan, we must spill the pipelined data set to
> disk to ensure that the other input can be fully consumed (to build the
> hash table).
>
> There's not really a solution to this.
> You could change the join strategy to sort-merge-join but this will sort
> both inputs and also result in spilling both to disk.
>
> 2018-05-04 17:25 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Hi Fabian,
>> thanks for the detailed reply.
>> The problem I see is that the source dataset is huge and, since it
>> doesn't fit in memory, it's spilled twice to disk (I checked the increasing
>> disk usage during the job and it was corresponding exactly to the size
>> estimated by the Flink UI, that is twice it's initial size).
>> Probably there are no problem until you keep data in memory but in my
>> case it's very problematic this memory explosion :(
>>
>> On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> No, there's no way around it.
>>> DataSets that are processed by more than one operator cannot be
>>> processed by chained operators.
>>> The records need to be copied to avoid concurrent modifications.
>>> However, the data should not be shipped over the network if all operators
>>> have the same parallelism.
>>> Instead records are serialized and handed over via local byte[]
>>> in-memory channels.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
>>>>
>>>> On Fri, May 4, 2018 at 2:50 PM, Amit Jain <aj...@gmail.com> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> Which version of Flink are you using?
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Amit
>>>>>
>>>>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>> > Hi all,
>>>>> > I've a Flink batch job that reads a parquet dataset and then applies
>>>>> 2
>>>>> > flatMap to it (see pseudocode below).
>>>>> > The problem is that this dataset is quite big and Flink duplicates
>>>>> it before
>>>>> > sending the data to these 2 operators (I've guessed this from the
>>>>> doubling
>>>>> > amount of sent bytes) .
>>>>> > Is there a way to avoid this behaviour?
>>>>> >
>>>>> > -------------------------------------------------------
>>>>> > Here's the pseudo code of my job:
>>>>> >
>>>>> > DataSet X = readParquetDir();
>>>>> > X1 = X.flatMap(...);
>>>>> > X2 = X.flatMap(...);
>>>>> >
>>>>> > Best,
>>>>> > Flavio
>>>>>
>>>>
>>>>
>>>
>>

Re: Question about datasource replication

Posted by Fabian Hueske <fh...@gmail.com>.
That will happen if you join (or coGroup) the branched DataSets, i.e., you
have branching and merging pattern in your stream.

The problem in that case is that one of the inputs is pipelined (e.g., the
probe side of a hash join) and the other one is blocking.
In order to execute such a plan, we must spill the pipelined data set to
disk to ensure that the other input can be fully consumed (to build the
hash table).

There's not really a solution to this.
You could change the join strategy to sort-merge-join but this will sort
both inputs and also result in spilling both to disk.

2018-05-04 17:25 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Hi Fabian,
> thanks for the detailed reply.
> The problem I see is that the source dataset is huge and, since it doesn't
> fit in memory, it's spilled twice to disk (I checked the increasing disk
> usage during the job and it was corresponding exactly to the size estimated
> by the Flink UI, that is twice it's initial size).
> Probably there are no problem until you keep data in memory but in my case
> it's very problematic this memory explosion :(
>
> On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> No, there's no way around it.
>> DataSets that are processed by more than one operator cannot be processed
>> by chained operators.
>> The records need to be copied to avoid concurrent modifications. However,
>> the data should not be shipped over the network if all operators have the
>> same parallelism.
>> Instead records are serialized and handed over via local byte[] in-memory
>> channels.
>>
>> Best, Fabian
>>
>>
>> 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
>>>
>>> On Fri, May 4, 2018 at 2:50 PM, Amit Jain <aj...@gmail.com> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> Which version of Flink are you using?
>>>>
>>>> --
>>>> Thanks,
>>>> Amit
>>>>
>>>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>> > Hi all,
>>>> > I've a Flink batch job that reads a parquet dataset and then applies 2
>>>> > flatMap to it (see pseudocode below).
>>>> > The problem is that this dataset is quite big and Flink duplicates it
>>>> before
>>>> > sending the data to these 2 operators (I've guessed this from the
>>>> doubling
>>>> > amount of sent bytes) .
>>>> > Is there a way to avoid this behaviour?
>>>> >
>>>> > -------------------------------------------------------
>>>> > Here's the pseudo code of my job:
>>>> >
>>>> > DataSet X = readParquetDir();
>>>> > X1 = X.flatMap(...);
>>>> > X2 = X.flatMap(...);
>>>> >
>>>> > Best,
>>>> > Flavio
>>>>
>>>
>>>
>>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809
>

Re: Question about datasource replication

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Fabian,
thanks for the detailed reply.
The problem I see is that the source dataset is huge and, since it doesn't
fit in memory, it's spilled twice to disk (I checked the increasing disk
usage during the job and it was corresponding exactly to the size estimated
by the Flink UI, that is twice it's initial size).
Probably there are no problem until you keep data in memory but in my case
it's very problematic this memory explosion :(

On Fri, May 4, 2018 at 5:14 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Flavio,
>
> No, there's no way around it.
> DataSets that are processed by more than one operator cannot be processed
> by chained operators.
> The records need to be copied to avoid concurrent modifications. However,
> the data should not be shipped over the network if all operators have the
> same parallelism.
> Instead records are serialized and handed over via local byte[] in-memory
> channels.
>
> Best, Fabian
>
>
> 2018-05-04 14:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
>>
>> On Fri, May 4, 2018 at 2:50 PM, Amit Jain <aj...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> Which version of Flink are you using?
>>>
>>> --
>>> Thanks,
>>> Amit
>>>
>>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <po...@okkam.it>
>>> wrote:
>>> > Hi all,
>>> > I've a Flink batch job that reads a parquet dataset and then applies 2
>>> > flatMap to it (see pseudocode below).
>>> > The problem is that this dataset is quite big and Flink duplicates it
>>> before
>>> > sending the data to these 2 operators (I've guessed this from the
>>> doubling
>>> > amount of sent bytes) .
>>> > Is there a way to avoid this behaviour?
>>> >
>>> > -------------------------------------------------------
>>> > Here's the pseudo code of my job:
>>> >
>>> > DataSet X = readParquetDir();
>>> > X1 = X.flatMap(...);
>>> > X2 = X.flatMap(...);
>>> >
>>> > Best,
>>> > Flavio
>>>
>>
>>
>


-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809

Re: Question about datasource replication

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Flavio,

No, there's no way around it.
DataSets that are processed by more than one operator cannot be processed
by chained operators.
The records need to be copied to avoid concurrent modifications. However,
the data should not be shipped over the network if all operators have the
same parallelism.
Instead records are serialized and handed over via local byte[] in-memory
channels.

Best, Fabian


2018-05-04 14:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Flink 1.3.1 (I'm waiting 1.5 before upgrading..)
>
> On Fri, May 4, 2018 at 2:50 PM, Amit Jain <aj...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> Which version of Flink are you using?
>>
>> --
>> Thanks,
>> Amit
>>
>> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>> > Hi all,
>> > I've a Flink batch job that reads a parquet dataset and then applies 2
>> > flatMap to it (see pseudocode below).
>> > The problem is that this dataset is quite big and Flink duplicates it
>> before
>> > sending the data to these 2 operators (I've guessed this from the
>> doubling
>> > amount of sent bytes) .
>> > Is there a way to avoid this behaviour?
>> >
>> > -------------------------------------------------------
>> > Here's the pseudo code of my job:
>> >
>> > DataSet X = readParquetDir();
>> > X1 = X.flatMap(...);
>> > X2 = X.flatMap(...);
>> >
>> > Best,
>> > Flavio
>>
>
>

Re: Question about datasource replication

Posted by Flavio Pompermaier <po...@okkam.it>.
Flink 1.3.1 (I'm waiting 1.5 before upgrading..)

On Fri, May 4, 2018 at 2:50 PM, Amit Jain <aj...@gmail.com> wrote:

> Hi Flavio,
>
> Which version of Flink are you using?
>
> --
> Thanks,
> Amit
>
> On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
> > Hi all,
> > I've a Flink batch job that reads a parquet dataset and then applies 2
> > flatMap to it (see pseudocode below).
> > The problem is that this dataset is quite big and Flink duplicates it
> before
> > sending the data to these 2 operators (I've guessed this from the
> doubling
> > amount of sent bytes) .
> > Is there a way to avoid this behaviour?
> >
> > -------------------------------------------------------
> > Here's the pseudo code of my job:
> >
> > DataSet X = readParquetDir();
> > X1 = X.flatMap(...);
> > X2 = X.flatMap(...);
> >
> > Best,
> > Flavio
>

Re: Question about datasource replication

Posted by Amit Jain <aj...@gmail.com>.
Hi Flavio,

Which version of Flink are you using?

--
Thanks,
Amit

On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier <po...@okkam.it> wrote:
> Hi all,
> I've a Flink batch job that reads a parquet dataset and then applies 2
> flatMap to it (see pseudocode below).
> The problem is that this dataset is quite big and Flink duplicates it before
> sending the data to these 2 operators (I've guessed this from the doubling
> amount of sent bytes) .
> Is there a way to avoid this behaviour?
>
> -------------------------------------------------------
> Here's the pseudo code of my job:
>
> DataSet X = readParquetDir();
> X1 = X.flatMap(...);
> X2 = X.flatMap(...);
>
> Best,
> Flavio