You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mustafa Elbehery <el...@gmail.com> on 2015/05/18 15:43:46 UTC

Informing the runtime about data already repartitioned using "output contracts"

Hi,

I am writing a flink job, in which I have three datasets.  I have
partitionedByHash the first two before coGrouping them.

My plan is to spill the result of coGrouping to disk, and then re-read it
again before coGrouping with the third dataset.

My question is, is there anyway to inform flink that the first coGroup
result is already partitioned ?!  I know I can re-partition again before
coGrouping but I would like to know if there is anyway to avoid a step
which was already executed,

Regards.

-- 
Mustafa Elbehery
EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
+49(0)15750363097
skype: mustafaelbehery87

Re: Informing the runtime about data already repartitioned using "output contracts"

Posted by Stephan Ewen <se...@apache.org>.
I think we are still talking about the same issue as in a related question.
I suspect that the MutableInputFormatTest does not properly return the
splits in the "createInputSplits()" function.

To validate that, you can write yourself a unit test that checks whether
the input format returns your splits from the method "createInputSplits()".

On Fri, May 29, 2015 at 5:59 PM, Mustafa Elbehery <elbeherymustafa@gmail.com
> wrote:

> Hi Folks,
>
> I am reviving this thread again, as I am stuck in one step to achieve my
> target.
>
> the following code is doing partitioning, before coGrouping, then writing
> to disk.  I am trying to re-read the data from disk, so I have create*LocatableInputSPlits
> [] *with the size of DOP. Find the code Below
>
> inPerson.partitionByHash("name")
>       .map(new TrackHost())
>       .coGroup(inStudent.partitionByHash("name"))
>       .where("name").equalTo("name")
>       .with(new ComputeStudiesProfile())
>       .write(new TextOutputFormat(new Path()), "file:///home/mustafa/Documents/tst/", FileSystem.WriteMode.OVERWRITE);
>
> LocatableInputSplit [] splits = new LocatableInputSplit[env.getParallelism()];
> splits[0] = new LocatableInputSplit(env.getParallelism(),"localhost");
> splits[1] = new LocatableInputSplit(env.getParallelism(),"localhost");
> splits[2] = new LocatableInputSplit(env.getParallelism(),"localhost");
> DataSet<Person> secondIn = env.createInput(new MutableInputFormatTest(new Path("file:///home/mustafa/Documents/tst/1"),splits)).map(new PersonMapper());
> secondIn.print();
>
>
>
> TrackHost is an Accumulator to track the host information, && MutuableInputFormat, is an customInputFormat which extends TextInputFormat && implements StrictlyLocalAssignment ..
>
> I am using LocatableInputSplit as a instanceField, as implementing InputSplit is conflicting with TextInputFormat, on the createInputSplit method, they both have the same method and the compiler complained for that.
>
>
> Again, while debugging I could see the problem in *ExectionJobVertex line 146 . *the execution ignores the Locatables I am shipping with my splits, and re-create inputSplits again which get the hostInfo(Machine Name) from the execution somehow, while the taskManagers prepared by the scheduler waiting for a machine with "LocalHost".
>
> Any Suggestion ??
>
> Regards.
>
>
>
>
> On Tue, May 19, 2015 at 2:16 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Alright, so if both inputs of the CoGroup are read from the file system,
>> there should be a way to do the co-group on co-located data without
>> repartitioning.
>> In fact, I have some code lying around to do co-located joins from local
>> FS [1]. Haven't tested it thoroughly and it also relies on a number of
>> assumptions. If the data is also sorted you can even get around sorting it
>> if you inject a few lines into the optimizer (see change for FLINK-1444)
>> and ensure that each source reads exactly one! input split.
>>
>> Regarding your question about the PACT output contracts, there were three
>> types which were defined wrt to a Key/Value pair data model:
>> - Same key: UDF does not modify the key
>> - Super key: UDF extends the key (Partitioning remains valid, sorting not)
>> - Unique key: Keys from UDF or source are unique
>>
>> Let me know, if you have questions.
>> Cheers, Fabian
>>
>> [1] https://github.com/fhueske/flink-localjoin-utils
>>
>> 2015-05-19 13:49 GMT+02:00 Alexander Alexandrov <
>> alexander.s.alexandrov@gmail.com>:
>>
>>> Thanks for the feedback, Fabian.
>>>
>>> This is related to the question I sent on the user mailing list
>>> yesterday. Mustafa is working on a master thesis where we try to abstract
>>> an operator for the update of stateful datasets (decoupled from the current
>>> native iterations logic) and use it in conjunction with lazy unrolling of
>>> iterations.
>>>
>>> The assumptions are as follows:
>>>
>>>    - Each iteration runs a job with the same structure and the same DOP;
>>>    - Updates a realized through a coGroup with a fixed DOP (let's say
>>>    *N*), which consumes a *(state, updates)* pair of datasets and
>>>    produces a new version of the state (let's call it *state'*);
>>>    - We keep track where the *N* output partitions of *state'* are
>>>    located and use this information for local placement of the corresponding
>>>    *N* DataSource tasks in the next iteration (via FLINK-1478);
>>>    - The remaining piece of the puzzle is to figure out how to tell the
>>>    coGroup that one of the inputs is already partitioned so id avoids an
>>>    unnecessary shuffle;
>>>
>>> If I remember correctly back in the day we had a PACT output contract
>>> that served a similar purpose avoid unnecessary shuffles), but I was not
>>> able to find it yesterday.
>>>
>>> In either case, I think even if that does not work out of the box at the
>>> moment, that most of the logic is in place (e.g. co-location groups in the
>>> scheduler), and we are willing to either hack the code or add the missing
>>> functionality in order to realize the above described goal.
>>>
>>> Suggestions are welcome!
>>>
>>> Regards,
>>> Alex
>>>
>>>
>>>
>>>
>>> 2015-05-18 17:42 GMT+02:00 Fabian Hueske <fh...@gmail.com>:
>>>
>>>> Hi Mustafa,
>>>>
>>>> I'm afraid, this is not possible.
>>>> Although you can annotate DataSources with partitioning information,
>>>> this is not enough to avoid repartitioning for a CoGroup. The reason for
>>>> that is that CoGroup requires co-partitioning of both inputs, i.e., both
>>>> inputs must be equally partitioned (same number of partitions, same
>>>> partitioning function, same location of partitions). Since Flink is
>>>> dynamically assigning tasks to execution slots, it is not possible to
>>>> co-locate data that was read from a data source and data coming from the
>>>> result of another computation.
>>>>
>>>> If you just need the result of the first co-group on disk, you could
>>>> also build a single program that does both co-groups and additional writes
>>>> the result of the first co-group to disk (Flink supports multiple data
>>>> sinks).
>>>>
>>>> Best, Fabian
>>>>
>>>> 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <el...@gmail.com>
>>>> :
>>>>
>>>>> Hi,
>>>>>
>>>>> I am writing a flink job, in which I have three datasets.  I have
>>>>> partitionedByHash the first two before coGrouping them.
>>>>>
>>>>> My plan is to spill the result of coGrouping to disk, and then re-read
>>>>> it again before coGrouping with the third dataset.
>>>>>
>>>>> My question is, is there anyway to inform flink that the first coGroup
>>>>> result is already partitioned ?!  I know I can re-partition again before
>>>>> coGrouping but I would like to know if there is anyway to avoid a step
>>>>> which was already executed,
>>>>>
>>>>> Regards.
>>>>>
>>>>> --
>>>>> Mustafa Elbehery
>>>>> EIT ICT Labs Master School
>>>>> <http://www.masterschool.eitictlabs.eu/home/>
>>>>> +49(0)15750363097
>>>>> skype: mustafaelbehery87
>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Mustafa Elbehery
> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
> +49(0)15750363097
> skype: mustafaelbehery87
>
>

Re: Informing the runtime about data already repartitioned using "output contracts"

Posted by Mustafa Elbehery <el...@gmail.com>.
Hi Folks,

I am reviving this thread again, as I am stuck in one step to achieve my
target.

the following code is doing partitioning, before coGrouping, then writing
to disk.  I am trying to re-read the data from disk, so I have
create*LocatableInputSPlits
[] *with the size of DOP. Find the code Below

inPerson.partitionByHash("name")
      .map(new TrackHost())
      .coGroup(inStudent.partitionByHash("name"))
      .where("name").equalTo("name")
      .with(new ComputeStudiesProfile())
      .write(new TextOutputFormat(new Path()),
"file:///home/mustafa/Documents/tst/",
FileSystem.WriteMode.OVERWRITE);

LocatableInputSplit [] splits = new LocatableInputSplit[env.getParallelism()];
splits[0] = new LocatableInputSplit(env.getParallelism(),"localhost");
splits[1] = new LocatableInputSplit(env.getParallelism(),"localhost");
splits[2] = new LocatableInputSplit(env.getParallelism(),"localhost");
DataSet<Person> secondIn = env.createInput(new
MutableInputFormatTest(new
Path("file:///home/mustafa/Documents/tst/1"),splits)).map(new
PersonMapper());
secondIn.print();



TrackHost is an Accumulator to track the host information, &&
MutuableInputFormat, is an customInputFormat which extends
TextInputFormat && implements StrictlyLocalAssignment ..

I am using LocatableInputSplit as a instanceField, as implementing
InputSplit is conflicting with TextInputFormat, on the
createInputSplit method, they both have the same method and the
compiler complained for that.


Again, while debugging I could see the problem in *ExectionJobVertex
line 146 . *the execution ignores the Locatables I am shipping with my
splits, and re-create inputSplits again which get the hostInfo(Machine
Name) from the execution somehow, while the taskManagers prepared by
the scheduler waiting for a machine with "LocalHost".

Any Suggestion ??

Regards.




On Tue, May 19, 2015 at 2:16 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Alright, so if both inputs of the CoGroup are read from the file system,
> there should be a way to do the co-group on co-located data without
> repartitioning.
> In fact, I have some code lying around to do co-located joins from local
> FS [1]. Haven't tested it thoroughly and it also relies on a number of
> assumptions. If the data is also sorted you can even get around sorting it
> if you inject a few lines into the optimizer (see change for FLINK-1444)
> and ensure that each source reads exactly one! input split.
>
> Regarding your question about the PACT output contracts, there were three
> types which were defined wrt to a Key/Value pair data model:
> - Same key: UDF does not modify the key
> - Super key: UDF extends the key (Partitioning remains valid, sorting not)
> - Unique key: Keys from UDF or source are unique
>
> Let me know, if you have questions.
> Cheers, Fabian
>
> [1] https://github.com/fhueske/flink-localjoin-utils
>
> 2015-05-19 13:49 GMT+02:00 Alexander Alexandrov <
> alexander.s.alexandrov@gmail.com>:
>
>> Thanks for the feedback, Fabian.
>>
>> This is related to the question I sent on the user mailing list
>> yesterday. Mustafa is working on a master thesis where we try to abstract
>> an operator for the update of stateful datasets (decoupled from the current
>> native iterations logic) and use it in conjunction with lazy unrolling of
>> iterations.
>>
>> The assumptions are as follows:
>>
>>    - Each iteration runs a job with the same structure and the same DOP;
>>    - Updates a realized through a coGroup with a fixed DOP (let's say *N*),
>>    which consumes a *(state, updates)* pair of datasets and produces a
>>    new version of the state (let's call it *state'*);
>>    - We keep track where the *N* output partitions of *state'* are
>>    located and use this information for local placement of the corresponding
>>    *N* DataSource tasks in the next iteration (via FLINK-1478);
>>    - The remaining piece of the puzzle is to figure out how to tell the
>>    coGroup that one of the inputs is already partitioned so id avoids an
>>    unnecessary shuffle;
>>
>> If I remember correctly back in the day we had a PACT output contract
>> that served a similar purpose avoid unnecessary shuffles), but I was not
>> able to find it yesterday.
>>
>> In either case, I think even if that does not work out of the box at the
>> moment, that most of the logic is in place (e.g. co-location groups in the
>> scheduler), and we are willing to either hack the code or add the missing
>> functionality in order to realize the above described goal.
>>
>> Suggestions are welcome!
>>
>> Regards,
>> Alex
>>
>>
>>
>>
>> 2015-05-18 17:42 GMT+02:00 Fabian Hueske <fh...@gmail.com>:
>>
>>> Hi Mustafa,
>>>
>>> I'm afraid, this is not possible.
>>> Although you can annotate DataSources with partitioning information,
>>> this is not enough to avoid repartitioning for a CoGroup. The reason for
>>> that is that CoGroup requires co-partitioning of both inputs, i.e., both
>>> inputs must be equally partitioned (same number of partitions, same
>>> partitioning function, same location of partitions). Since Flink is
>>> dynamically assigning tasks to execution slots, it is not possible to
>>> co-locate data that was read from a data source and data coming from the
>>> result of another computation.
>>>
>>> If you just need the result of the first co-group on disk, you could
>>> also build a single program that does both co-groups and additional writes
>>> the result of the first co-group to disk (Flink supports multiple data
>>> sinks).
>>>
>>> Best, Fabian
>>>
>>> 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <el...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> I am writing a flink job, in which I have three datasets.  I have
>>>> partitionedByHash the first two before coGrouping them.
>>>>
>>>> My plan is to spill the result of coGrouping to disk, and then re-read
>>>> it again before coGrouping with the third dataset.
>>>>
>>>> My question is, is there anyway to inform flink that the first coGroup
>>>> result is already partitioned ?!  I know I can re-partition again before
>>>> coGrouping but I would like to know if there is anyway to avoid a step
>>>> which was already executed,
>>>>
>>>> Regards.
>>>>
>>>> --
>>>> Mustafa Elbehery
>>>> EIT ICT Labs Master School
>>>> <http://www.masterschool.eitictlabs.eu/home/>
>>>> +49(0)15750363097
>>>> skype: mustafaelbehery87
>>>>
>>>>
>>>
>>
>


-- 
Mustafa Elbehery
EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
+49(0)15750363097
skype: mustafaelbehery87

Re: Informing the runtime about data already repartitioned using "output contracts"

Posted by Fabian Hueske <fh...@gmail.com>.
Alright, so if both inputs of the CoGroup are read from the file system,
there should be a way to do the co-group on co-located data without
repartitioning.
In fact, I have some code lying around to do co-located joins from local FS
[1]. Haven't tested it thoroughly and it also relies on a number of
assumptions. If the data is also sorted you can even get around sorting it
if you inject a few lines into the optimizer (see change for FLINK-1444)
and ensure that each source reads exactly one! input split.

Regarding your question about the PACT output contracts, there were three
types which were defined wrt to a Key/Value pair data model:
- Same key: UDF does not modify the key
- Super key: UDF extends the key (Partitioning remains valid, sorting not)
- Unique key: Keys from UDF or source are unique

Let me know, if you have questions.
Cheers, Fabian

[1] https://github.com/fhueske/flink-localjoin-utils

2015-05-19 13:49 GMT+02:00 Alexander Alexandrov <
alexander.s.alexandrov@gmail.com>:

> Thanks for the feedback, Fabian.
>
> This is related to the question I sent on the user mailing list yesterday.
> Mustafa is working on a master thesis where we try to abstract an operator
> for the update of stateful datasets (decoupled from the current native
> iterations logic) and use it in conjunction with lazy unrolling of
> iterations.
>
> The assumptions are as follows:
>
>    - Each iteration runs a job with the same structure and the same DOP;
>    - Updates a realized through a coGroup with a fixed DOP (let's say *N*),
>    which consumes a *(state, updates)* pair of datasets and produces a
>    new version of the state (let's call it *state'*);
>    - We keep track where the *N* output partitions of *state'* are
>    located and use this information for local placement of the corresponding
>    *N* DataSource tasks in the next iteration (via FLINK-1478);
>    - The remaining piece of the puzzle is to figure out how to tell the
>    coGroup that one of the inputs is already partitioned so id avoids an
>    unnecessary shuffle;
>
> If I remember correctly back in the day we had a PACT output contract that
> served a similar purpose avoid unnecessary shuffles), but I was not able to
> find it yesterday.
>
> In either case, I think even if that does not work out of the box at the
> moment, that most of the logic is in place (e.g. co-location groups in the
> scheduler), and we are willing to either hack the code or add the missing
> functionality in order to realize the above described goal.
>
> Suggestions are welcome!
>
> Regards,
> Alex
>
>
>
>
> 2015-05-18 17:42 GMT+02:00 Fabian Hueske <fh...@gmail.com>:
>
>> Hi Mustafa,
>>
>> I'm afraid, this is not possible.
>> Although you can annotate DataSources with partitioning information, this
>> is not enough to avoid repartitioning for a CoGroup. The reason for that is
>> that CoGroup requires co-partitioning of both inputs, i.e., both inputs
>> must be equally partitioned (same number of partitions, same partitioning
>> function, same location of partitions). Since Flink is dynamically
>> assigning tasks to execution slots, it is not possible to co-locate data
>> that was read from a data source and data coming from the result of another
>> computation.
>>
>> If you just need the result of the first co-group on disk, you could also
>> build a single program that does both co-groups and additional writes the
>> result of the first co-group to disk (Flink supports multiple data sinks).
>>
>> Best, Fabian
>>
>> 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <el...@gmail.com>:
>>
>>> Hi,
>>>
>>> I am writing a flink job, in which I have three datasets.  I have
>>> partitionedByHash the first two before coGrouping them.
>>>
>>> My plan is to spill the result of coGrouping to disk, and then re-read
>>> it again before coGrouping with the third dataset.
>>>
>>> My question is, is there anyway to inform flink that the first coGroup
>>> result is already partitioned ?!  I know I can re-partition again before
>>> coGrouping but I would like to know if there is anyway to avoid a step
>>> which was already executed,
>>>
>>> Regards.
>>>
>>> --
>>> Mustafa Elbehery
>>> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
>>> +49(0)15750363097
>>> skype: mustafaelbehery87
>>>
>>>
>>
>

Re: Informing the runtime about data already repartitioned using "output contracts"

Posted by Alexander Alexandrov <al...@gmail.com>.
Thanks for the feedback, Fabian.

This is related to the question I sent on the user mailing list yesterday.
Mustafa is working on a master thesis where we try to abstract an operator
for the update of stateful datasets (decoupled from the current native
iterations logic) and use it in conjunction with lazy unrolling of
iterations.

The assumptions are as follows:

   - Each iteration runs a job with the same structure and the same DOP;
   - Updates a realized through a coGroup with a fixed DOP (let's say *N*),
   which consumes a *(state, updates)* pair of datasets and produces a new
   version of the state (let's call it *state'*);
   - We keep track where the *N* output partitions of *state'* are located
   and use this information for local placement of the corresponding *N*
   DataSource tasks in the next iteration (via FLINK-1478);
   - The remaining piece of the puzzle is to figure out how to tell the
   coGroup that one of the inputs is already partitioned so id avoids an
   unnecessary shuffle;

If I remember correctly back in the day we had a PACT output contract that
served a similar purpose avoid unnecessary shuffles), but I was not able to
find it yesterday.

In either case, I think even if that does not work out of the box at the
moment, that most of the logic is in place (e.g. co-location groups in the
scheduler), and we are willing to either hack the code or add the missing
functionality in order to realize the above described goal.

Suggestions are welcome!

Regards,
Alex




2015-05-18 17:42 GMT+02:00 Fabian Hueske <fh...@gmail.com>:

> Hi Mustafa,
>
> I'm afraid, this is not possible.
> Although you can annotate DataSources with partitioning information, this
> is not enough to avoid repartitioning for a CoGroup. The reason for that is
> that CoGroup requires co-partitioning of both inputs, i.e., both inputs
> must be equally partitioned (same number of partitions, same partitioning
> function, same location of partitions). Since Flink is dynamically
> assigning tasks to execution slots, it is not possible to co-locate data
> that was read from a data source and data coming from the result of another
> computation.
>
> If you just need the result of the first co-group on disk, you could also
> build a single program that does both co-groups and additional writes the
> result of the first co-group to disk (Flink supports multiple data sinks).
>
> Best, Fabian
>
> 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <el...@gmail.com>:
>
>> Hi,
>>
>> I am writing a flink job, in which I have three datasets.  I have
>> partitionedByHash the first two before coGrouping them.
>>
>> My plan is to spill the result of coGrouping to disk, and then re-read it
>> again before coGrouping with the third dataset.
>>
>> My question is, is there anyway to inform flink that the first coGroup
>> result is already partitioned ?!  I know I can re-partition again before
>> coGrouping but I would like to know if there is anyway to avoid a step
>> which was already executed,
>>
>> Regards.
>>
>> --
>> Mustafa Elbehery
>> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
>> +49(0)15750363097
>> skype: mustafaelbehery87
>>
>>
>

Re: Informing the runtime about data already repartitioned using "output contracts"

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Mustafa,

I'm afraid, this is not possible.
Although you can annotate DataSources with partitioning information, this
is not enough to avoid repartitioning for a CoGroup. The reason for that is
that CoGroup requires co-partitioning of both inputs, i.e., both inputs
must be equally partitioned (same number of partitions, same partitioning
function, same location of partitions). Since Flink is dynamically
assigning tasks to execution slots, it is not possible to co-locate data
that was read from a data source and data coming from the result of another
computation.

If you just need the result of the first co-group on disk, you could also
build a single program that does both co-groups and additional writes the
result of the first co-group to disk (Flink supports multiple data sinks).

Best, Fabian

2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <el...@gmail.com>:

> Hi,
>
> I am writing a flink job, in which I have three datasets.  I have
> partitionedByHash the first two before coGrouping them.
>
> My plan is to spill the result of coGrouping to disk, and then re-read it
> again before coGrouping with the third dataset.
>
> My question is, is there anyway to inform flink that the first coGroup
> result is already partitioned ?!  I know I can re-partition again before
> coGrouping but I would like to know if there is anyway to avoid a step
> which was already executed,
>
> Regards.
>
> --
> Mustafa Elbehery
> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
> +49(0)15750363097
> skype: mustafaelbehery87
>
>