You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stefano Bortoli <s....@gmail.com> on 2014/11/06 09:19:28 UTC

PartitionByHash and usage of KeySelector

Hi all,

I am moving my first steps into becoming an Apache Flink user! I have
configured and run some simple jobs on a small cluster, and everything
worked quite fine so far.

What I am trying to do right now is to run a duplication detection task on
dataset of about 9.5M records. The records are well structured, and
therefore we can exploit the semantic of attributes to narrow down
expensive match executions.

My idea is the following:
1. partition the dataset according to a macro-parameter written in the
record. This allows me to get to 7 partitions of different sizes but also
certainly disjoint. I do that by filtering on a specific type.
2. create partitions of each of the partitions created in step 1 based on
some simple similarity that would reduce the number of expensive function.
I would like to do that by using partitionByHash and KeySelector.
3. compute Cross product for each of the partitions defined in step 2;
4. filter each pair of the cross product by applying an expensive boolean
matching function. Only positive matching duplicates will be retained.

Currently I am working on the step 2, and I have some problems
understanding how to use the partitionByHash function. The main problem is
that I need to have a 'rich key' to support partition, and I discovered the
ExpressionKeys that would allow me to define hash keys with sets of Strings
I can collect from the record. However, the partitionByHash function does
not allow to use these objects as the hash must implement comparable.

So, here is my question: how can I partition considering hash keys of more
than one String?

Is there a better strategy to implement a de-duplication using Flink?


thanks a lot for your support.

kind regards,

Stefano Bortoli, PhD

*ENS Technical Director *_______________________________________________
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Email:* bortoli@okkam.it

*Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *

*Headquarters:* Trento (Italy), Via Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.

Re: PartitionByHash and usage of KeySelector

Posted by Stefano Bortoli <s....@gmail.com>.
Hi Fabian,

the rich map with Tuple creation was exactly what I did to interact with
the global index, and then filter match and group results. No problem for
the moment.

Thanks for you help anyway.

saluti,
Stefano

2014-11-10 16:24 GMT+01:00 Fabian Hueske <fh...@apache.org>:

> Hi Stefano,
>
> right now, there is no such thing as a RichKeyExtractor.
>
> However, KeySelector functions are serialized and passed to the execution
> engine. That means, you can configure your KeySelector via the constructor
> at program construction time and the "same" object is passed to the engine
> at runtime.
> The Configuration object is kind of a legacy feature from the time when
> user functions were not serializable but new objects were created and
> configured.
>
> Another alternative is to use a RichMapFunction instead of a KeySelector
> and convert a Type A into a Tuple2<Key, A>. In fact this is what happens
> internally when using key selector function.
>
> Best, Fabian
>
> 2014-11-10 14:36 GMT+01:00 Stefano Bortoli <s....@gmail.com>:
>
>> Hi Fabian,
>>
>> is it possible to create a RichKeySelector? I would need to read some
>> configuration files to process the record and build the 'key' using a
>> custom function. There is no interface/abstract class to implement/extend
>> and I wonder whether this is the right way to do it. Meaning, maybe there
>> is I reason I don't get to not have a rich key selection. I thank you a lot
>> in advance for you time!
>>
>> saluti,
>> Stefano
>>
>> 2014-11-10 12:05 GMT+01:00 Fabian Hueske <fh...@apache.org>:
>>
>>> Yes, if you'd split the data set manually (maybe using filter) into
>>> multiple data sets, you could use Cross.
>>> However, Cross is a binary operation, such that you'd need to use it as
>>> a self-cross which would result in symmetric pairs as the join.
>>>
>>> I'm not sure if I would do this in a single job, i.e., run all cross
>>> operations concurrently.
>>> It might be better to partition the data up-front and run multiple jobs
>>> for each group.
>>>
>>> Best, Fabian
>>>
>>> 2014-11-10 11:08 GMT+01:00 Stefano Bortoli <s....@gmail.com>:
>>>
>>>> Thanks a lot Fabian. You clarified many points. Currently I am try to
>>>> run the job relying on a global index built with SOLR. It worked on a
>>>> dataset of about 1M record, but it failed with obscure exception on the one
>>>> of 9.2M. If I cannot make it work, I will go back to the grouping approach.
>>>>
>>>> Just a question. If I create a dataset for each group of a dataset,
>>>> then I could use the cross on each of the group. Right? However, I guess it
>>>> would be smarter to have a reduceGroup capable of generating just the pairs
>>>> that would need to be compared.
>>>>
>>>> thanks a lot again. keep on the great work! :-)
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>>
>>>> 2014-11-10 10:50 GMT+01:00 Fabian Hueske <fh...@apache.org>:
>>>>
>>>>> Hi Stefano,
>>>>>
>>>>> I'm not sure if we use the same terminology here. What you call
>>>>> partitioning might be called grouping in Flinks API / documentation.
>>>>>
>>>>> Grouping builds groups of element that share the same key. This is a
>>>>> deterministic operation.
>>>>> Partitioning distributes elements over a set of machines / parallel
>>>>> workers. If this is done using hash partitioning, Flink determines the
>>>>> parallel worker for an element by hashing the element's partition key (
>>>>> mod(hash(key), #workers) ). Consequently, all elements with the same
>>>>> partition key will be shipped to the same worker, BUT also all other
>>>>> elements for which mod(hash(key), #workers) is the same will be shipped to
>>>>> the same worker. If you partition map over these partitions all of these
>>>>> elements will be mixed. If the number of workers (or the hash function)
>>>>> changes, partitions will look different. When grouping all elements of the
>>>>> group will have the same key (and all elements with that key will be in the
>>>>> group).
>>>>>
>>>>> Flink's cross operator builds a dataset wide cross product. It does
>>>>> not respect groups (or partitions). If you want to build a cross product
>>>>> within a group, you can do that with a groupReduce which requires to hold
>>>>> all elements of the group in memory or manually spill them to disk in your
>>>>> UDF. Alternatively, you can use a self join (join a data set with itself)
>>>>> which will give you all pairs of the CP in individual function calls.
>>>>> However, Flink is currently not treating self joins special, such that the
>>>>> performance could be optimized. You'll also get symmetric pairs (a-b, b-a,
>>>>> a-a, b-b, for two element a, b with the same join key).
>>>>>
>>>>> If it is possible to combine the marco-parameter keys and the
>>>>> minor-blocking keys into a single key, you could specify a key-selector
>>>>> function x() and either do
>>>>> - dataSet.groupBy(x).reduceGroup( *read full group into memory, and
>>>>> apply expensive function to each pair of elements* ); or
>>>>> - dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric
>>>>> pair and apply expensive compare function* ).
>>>>>
>>>>> BTW. there was a similar use case a few days back on the mailing list.
>>>>> Might be worth reading that thread [1].
>>>>> Since there this is the second time that this issue came up, we might
>>>>> consider to add better support for group-wise cross operations.
>>>>>
>>>>> Cheers, Fabian
>>>>>
>>>>> [1]
>>>>> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/load-balancing-groups-td2287.html
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: PartitionByHash and usage of KeySelector

Posted by Fabian Hueske <fh...@apache.org>.
Hi Stefano,

right now, there is no such thing as a RichKeyExtractor.

However, KeySelector functions are serialized and passed to the execution
engine. That means, you can configure your KeySelector via the constructor
at program construction time and the "same" object is passed to the engine
at runtime.
The Configuration object is kind of a legacy feature from the time when
user functions were not serializable but new objects were created and
configured.

Another alternative is to use a RichMapFunction instead of a KeySelector
and convert a Type A into a Tuple2<Key, A>. In fact this is what happens
internally when using key selector function.

Best, Fabian

2014-11-10 14:36 GMT+01:00 Stefano Bortoli <s....@gmail.com>:

> Hi Fabian,
>
> is it possible to create a RichKeySelector? I would need to read some
> configuration files to process the record and build the 'key' using a
> custom function. There is no interface/abstract class to implement/extend
> and I wonder whether this is the right way to do it. Meaning, maybe there
> is I reason I don't get to not have a rich key selection. I thank you a lot
> in advance for you time!
>
> saluti,
> Stefano
>
> 2014-11-10 12:05 GMT+01:00 Fabian Hueske <fh...@apache.org>:
>
>> Yes, if you'd split the data set manually (maybe using filter) into
>> multiple data sets, you could use Cross.
>> However, Cross is a binary operation, such that you'd need to use it as a
>> self-cross which would result in symmetric pairs as the join.
>>
>> I'm not sure if I would do this in a single job, i.e., run all cross
>> operations concurrently.
>> It might be better to partition the data up-front and run multiple jobs
>> for each group.
>>
>> Best, Fabian
>>
>> 2014-11-10 11:08 GMT+01:00 Stefano Bortoli <s....@gmail.com>:
>>
>>> Thanks a lot Fabian. You clarified many points. Currently I am try to
>>> run the job relying on a global index built with SOLR. It worked on a
>>> dataset of about 1M record, but it failed with obscure exception on the one
>>> of 9.2M. If I cannot make it work, I will go back to the grouping approach.
>>>
>>> Just a question. If I create a dataset for each group of a dataset, then
>>> I could use the cross on each of the group. Right? However, I guess it
>>> would be smarter to have a reduceGroup capable of generating just the pairs
>>> that would need to be compared.
>>>
>>> thanks a lot again. keep on the great work! :-)
>>>
>>> saluti,
>>> Stefano
>>>
>>>
>>> 2014-11-10 10:50 GMT+01:00 Fabian Hueske <fh...@apache.org>:
>>>
>>>> Hi Stefano,
>>>>
>>>> I'm not sure if we use the same terminology here. What you call
>>>> partitioning might be called grouping in Flinks API / documentation.
>>>>
>>>> Grouping builds groups of element that share the same key. This is a
>>>> deterministic operation.
>>>> Partitioning distributes elements over a set of machines / parallel
>>>> workers. If this is done using hash partitioning, Flink determines the
>>>> parallel worker for an element by hashing the element's partition key (
>>>> mod(hash(key), #workers) ). Consequently, all elements with the same
>>>> partition key will be shipped to the same worker, BUT also all other
>>>> elements for which mod(hash(key), #workers) is the same will be shipped to
>>>> the same worker. If you partition map over these partitions all of these
>>>> elements will be mixed. If the number of workers (or the hash function)
>>>> changes, partitions will look different. When grouping all elements of the
>>>> group will have the same key (and all elements with that key will be in the
>>>> group).
>>>>
>>>> Flink's cross operator builds a dataset wide cross product. It does not
>>>> respect groups (or partitions). If you want to build a cross product within
>>>> a group, you can do that with a groupReduce which requires to hold all
>>>> elements of the group in memory or manually spill them to disk in your UDF.
>>>> Alternatively, you can use a self join (join a data set with itself) which
>>>> will give you all pairs of the CP in individual function calls. However,
>>>> Flink is currently not treating self joins special, such that the
>>>> performance could be optimized. You'll also get symmetric pairs (a-b, b-a,
>>>> a-a, b-b, for two element a, b with the same join key).
>>>>
>>>> If it is possible to combine the marco-parameter keys and the
>>>> minor-blocking keys into a single key, you could specify a key-selector
>>>> function x() and either do
>>>> - dataSet.groupBy(x).reduceGroup( *read full group into memory, and
>>>> apply expensive function to each pair of elements* ); or
>>>> - dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric
>>>> pair and apply expensive compare function* ).
>>>>
>>>> BTW. there was a similar use case a few days back on the mailing list.
>>>> Might be worth reading that thread [1].
>>>> Since there this is the second time that this issue came up, we might
>>>> consider to add better support for group-wise cross operations.
>>>>
>>>> Cheers, Fabian
>>>>
>>>> [1]
>>>> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/load-balancing-groups-td2287.html
>>>>
>>>>
>>>>
>>>
>>
>

Re: PartitionByHash and usage of KeySelector

Posted by Stefano Bortoli <s....@gmail.com>.
Hi Fabian,

is it possible to create a RichKeySelector? I would need to read some
configuration files to process the record and build the 'key' using a
custom function. There is no interface/abstract class to implement/extend
and I wonder whether this is the right way to do it. Meaning, maybe there
is I reason I don't get to not have a rich key selection. I thank you a lot
in advance for you time!

saluti,
Stefano

2014-11-10 12:05 GMT+01:00 Fabian Hueske <fh...@apache.org>:

> Yes, if you'd split the data set manually (maybe using filter) into
> multiple data sets, you could use Cross.
> However, Cross is a binary operation, such that you'd need to use it as a
> self-cross which would result in symmetric pairs as the join.
>
> I'm not sure if I would do this in a single job, i.e., run all cross
> operations concurrently.
> It might be better to partition the data up-front and run multiple jobs
> for each group.
>
> Best, Fabian
>
> 2014-11-10 11:08 GMT+01:00 Stefano Bortoli <s....@gmail.com>:
>
>> Thanks a lot Fabian. You clarified many points. Currently I am try to run
>> the job relying on a global index built with SOLR. It worked on a dataset
>> of about 1M record, but it failed with obscure exception on the one of
>> 9.2M. If I cannot make it work, I will go back to the grouping approach.
>>
>> Just a question. If I create a dataset for each group of a dataset, then
>> I could use the cross on each of the group. Right? However, I guess it
>> would be smarter to have a reduceGroup capable of generating just the pairs
>> that would need to be compared.
>>
>> thanks a lot again. keep on the great work! :-)
>>
>> saluti,
>> Stefano
>>
>>
>> 2014-11-10 10:50 GMT+01:00 Fabian Hueske <fh...@apache.org>:
>>
>>> Hi Stefano,
>>>
>>> I'm not sure if we use the same terminology here. What you call
>>> partitioning might be called grouping in Flinks API / documentation.
>>>
>>> Grouping builds groups of element that share the same key. This is a
>>> deterministic operation.
>>> Partitioning distributes elements over a set of machines / parallel
>>> workers. If this is done using hash partitioning, Flink determines the
>>> parallel worker for an element by hashing the element's partition key (
>>> mod(hash(key), #workers) ). Consequently, all elements with the same
>>> partition key will be shipped to the same worker, BUT also all other
>>> elements for which mod(hash(key), #workers) is the same will be shipped to
>>> the same worker. If you partition map over these partitions all of these
>>> elements will be mixed. If the number of workers (or the hash function)
>>> changes, partitions will look different. When grouping all elements of the
>>> group will have the same key (and all elements with that key will be in the
>>> group).
>>>
>>> Flink's cross operator builds a dataset wide cross product. It does not
>>> respect groups (or partitions). If you want to build a cross product within
>>> a group, you can do that with a groupReduce which requires to hold all
>>> elements of the group in memory or manually spill them to disk in your UDF.
>>> Alternatively, you can use a self join (join a data set with itself) which
>>> will give you all pairs of the CP in individual function calls. However,
>>> Flink is currently not treating self joins special, such that the
>>> performance could be optimized. You'll also get symmetric pairs (a-b, b-a,
>>> a-a, b-b, for two element a, b with the same join key).
>>>
>>> If it is possible to combine the marco-parameter keys and the
>>> minor-blocking keys into a single key, you could specify a key-selector
>>> function x() and either do
>>> - dataSet.groupBy(x).reduceGroup( *read full group into memory, and
>>> apply expensive function to each pair of elements* ); or
>>> - dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric
>>> pair and apply expensive compare function* ).
>>>
>>> BTW. there was a similar use case a few days back on the mailing list.
>>> Might be worth reading that thread [1].
>>> Since there this is the second time that this issue came up, we might
>>> consider to add better support for group-wise cross operations.
>>>
>>> Cheers, Fabian
>>>
>>> [1]
>>> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/load-balancing-groups-td2287.html
>>>
>>>
>>>
>>
>

Re: PartitionByHash and usage of KeySelector

Posted by Fabian Hueske <fh...@apache.org>.
Yes, if you'd split the data set manually (maybe using filter) into
multiple data sets, you could use Cross.
However, Cross is a binary operation, such that you'd need to use it as a
self-cross which would result in symmetric pairs as the join.

I'm not sure if I would do this in a single job, i.e., run all cross
operations concurrently.
It might be better to partition the data up-front and run multiple jobs for
each group.

Best, Fabian

2014-11-10 11:08 GMT+01:00 Stefano Bortoli <s....@gmail.com>:

> Thanks a lot Fabian. You clarified many points. Currently I am try to run
> the job relying on a global index built with SOLR. It worked on a dataset
> of about 1M record, but it failed with obscure exception on the one of
> 9.2M. If I cannot make it work, I will go back to the grouping approach.
>
> Just a question. If I create a dataset for each group of a dataset, then I
> could use the cross on each of the group. Right? However, I guess it would
> be smarter to have a reduceGroup capable of generating just the pairs that
> would need to be compared.
>
> thanks a lot again. keep on the great work! :-)
>
> saluti,
> Stefano
>
>
> 2014-11-10 10:50 GMT+01:00 Fabian Hueske <fh...@apache.org>:
>
>> Hi Stefano,
>>
>> I'm not sure if we use the same terminology here. What you call
>> partitioning might be called grouping in Flinks API / documentation.
>>
>> Grouping builds groups of element that share the same key. This is a
>> deterministic operation.
>> Partitioning distributes elements over a set of machines / parallel
>> workers. If this is done using hash partitioning, Flink determines the
>> parallel worker for an element by hashing the element's partition key (
>> mod(hash(key), #workers) ). Consequently, all elements with the same
>> partition key will be shipped to the same worker, BUT also all other
>> elements for which mod(hash(key), #workers) is the same will be shipped to
>> the same worker. If you partition map over these partitions all of these
>> elements will be mixed. If the number of workers (or the hash function)
>> changes, partitions will look different. When grouping all elements of the
>> group will have the same key (and all elements with that key will be in the
>> group).
>>
>> Flink's cross operator builds a dataset wide cross product. It does not
>> respect groups (or partitions). If you want to build a cross product within
>> a group, you can do that with a groupReduce which requires to hold all
>> elements of the group in memory or manually spill them to disk in your UDF.
>> Alternatively, you can use a self join (join a data set with itself) which
>> will give you all pairs of the CP in individual function calls. However,
>> Flink is currently not treating self joins special, such that the
>> performance could be optimized. You'll also get symmetric pairs (a-b, b-a,
>> a-a, b-b, for two element a, b with the same join key).
>>
>> If it is possible to combine the marco-parameter keys and the
>> minor-blocking keys into a single key, you could specify a key-selector
>> function x() and either do
>> - dataSet.groupBy(x).reduceGroup( *read full group into memory, and apply
>> expensive function to each pair of elements* ); or
>> - dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric
>> pair and apply expensive compare function* ).
>>
>> BTW. there was a similar use case a few days back on the mailing list.
>> Might be worth reading that thread [1].
>> Since there this is the second time that this issue came up, we might
>> consider to add better support for group-wise cross operations.
>>
>> Cheers, Fabian
>>
>> [1]
>> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/load-balancing-groups-td2287.html
>>
>>
>>
>

Re: PartitionByHash and usage of KeySelector

Posted by Stefano Bortoli <s....@gmail.com>.
Thanks a lot Fabian. You clarified many points. Currently I am try to run
the job relying on a global index built with SOLR. It worked on a dataset
of about 1M record, but it failed with obscure exception on the one of
9.2M. If I cannot make it work, I will go back to the grouping approach.

Just a question. If I create a dataset for each group of a dataset, then I
could use the cross on each of the group. Right? However, I guess it would
be smarter to have a reduceGroup capable of generating just the pairs that
would need to be compared.

thanks a lot again. keep on the great work! :-)

saluti,
Stefano


2014-11-10 10:50 GMT+01:00 Fabian Hueske <fh...@apache.org>:

> Hi Stefano,
>
> I'm not sure if we use the same terminology here. What you call
> partitioning might be called grouping in Flinks API / documentation.
>
> Grouping builds groups of element that share the same key. This is a
> deterministic operation.
> Partitioning distributes elements over a set of machines / parallel
> workers. If this is done using hash partitioning, Flink determines the
> parallel worker for an element by hashing the element's partition key (
> mod(hash(key), #workers) ). Consequently, all elements with the same
> partition key will be shipped to the same worker, BUT also all other
> elements for which mod(hash(key), #workers) is the same will be shipped to
> the same worker. If you partition map over these partitions all of these
> elements will be mixed. If the number of workers (or the hash function)
> changes, partitions will look different. When grouping all elements of the
> group will have the same key (and all elements with that key will be in the
> group).
>
> Flink's cross operator builds a dataset wide cross product. It does not
> respect groups (or partitions). If you want to build a cross product within
> a group, you can do that with a groupReduce which requires to hold all
> elements of the group in memory or manually spill them to disk in your UDF.
> Alternatively, you can use a self join (join a data set with itself) which
> will give you all pairs of the CP in individual function calls. However,
> Flink is currently not treating self joins special, such that the
> performance could be optimized. You'll also get symmetric pairs (a-b, b-a,
> a-a, b-b, for two element a, b with the same join key).
>
> If it is possible to combine the marco-parameter keys and the
> minor-blocking keys into a single key, you could specify a key-selector
> function x() and either do
> - dataSet.groupBy(x).reduceGroup( *read full group into memory, and apply
> expensive function to each pair of elements* ); or
> - dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric pair
> and apply expensive compare function* ).
>
> BTW. there was a similar use case a few days back on the mailing list.
> Might be worth reading that thread [1].
> Since there this is the second time that this issue came up, we might
> consider to add better support for group-wise cross operations.
>
> Cheers, Fabian
>
> [1]
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/load-balancing-groups-td2287.html
>
>
>

Re: PartitionByHash and usage of KeySelector

Posted by Fabian Hueske <fh...@apache.org>.
Hi Stefano,

I'm not sure if we use the same terminology here. What you call
partitioning might be called grouping in Flinks API / documentation.

Grouping builds groups of element that share the same key. This is a
deterministic operation.
Partitioning distributes elements over a set of machines / parallel
workers. If this is done using hash partitioning, Flink determines the
parallel worker for an element by hashing the element's partition key (
mod(hash(key), #workers) ). Consequently, all elements with the same
partition key will be shipped to the same worker, BUT also all other
elements for which mod(hash(key), #workers) is the same will be shipped to
the same worker. If you partition map over these partitions all of these
elements will be mixed. If the number of workers (or the hash function)
changes, partitions will look different. When grouping all elements of the
group will have the same key (and all elements with that key will be in the
group).

Flink's cross operator builds a dataset wide cross product. It does not
respect groups (or partitions). If you want to build a cross product within
a group, you can do that with a groupReduce which requires to hold all
elements of the group in memory or manually spill them to disk in your UDF.
Alternatively, you can use a self join (join a data set with itself) which
will give you all pairs of the CP in individual function calls. However,
Flink is currently not treating self joins special, such that the
performance could be optimized. You'll also get symmetric pairs (a-b, b-a,
a-a, b-b, for two element a, b with the same join key).

If it is possible to combine the marco-parameter keys and the
minor-blocking keys into a single key, you could specify a key-selector
function x() and either do
- dataSet.groupBy(x).reduceGroup( *read full group into memory, and apply
expensive function to each pair of elements* ); or
- dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric pair
and apply expensive compare function* ).

BTW. there was a similar use case a few days back on the mailing list.
Might be worth reading that thread [1].
Since there this is the second time that this issue came up, we might
consider to add better support for group-wise cross operations.

Cheers, Fabian

[1]
http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/load-balancing-groups-td2287.html

Re: PartitionByHash and usage of KeySelector

Posted by Stefano Bortoli <s....@gmail.com>.
Hi Aljoscha,

with "creating partitions of each of the partitions" I mean that I need to
further partition the sets of objects result of the preliminary
macro-filter.

My problem is that the objects I deal with are relatively complex, and
rich. Furthermore, schema-less when it comes to 'selection of keys'.
Namely, I have a sets of Attributes, and each attribute has a name and
value. What I would like to do is to create rich KEY objects processing
these sets of Attributes according to some logic. Practically, I would have
to extract keys from each objects, and then let the partition function to
group records around these keys. Hopefully, the partitions generated by
these keys will be small enough to make the cross produce manageable, and
therefore reduce the cost of expensive matching functions implemented as
filters.

In my head, what I am trying to do is to prune as much as possible the
search space for duplicates (performing some sort of distributed blocking),
and then work expensive cross product comparison on relatively small sets.

Do you think it is feasible? Does it make sense?

Meanwhile I am creating a global index I can query in a map function, but
then Flink looses a bit of its appeal on this task. :-)

saluti,
Stefano


2014-11-06 10:18 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> Hi Stefano,
> what to you mean by "creating partitions of each of the partitions"?
>
> The expressions keys can be used to specify fields of objects that
> should be used for hashing. So if you had objects of this class in a
> DataSet:
>
> class Foo {
>   public String bar;
>   public Integer baz;
>   public Float bat;
> }
>
> You could use for example:
>
> input.partitionByHash("baz", "bat")
>
> to perform the partitioning only on those two fields of the objects.
>
> Regards,
> Aljoscha
>
> On Thu, Nov 6, 2014 at 9:19 AM, Stefano Bortoli <s....@gmail.com>
> wrote:
> > Hi all,
> >
> > I am moving my first steps into becoming an Apache Flink user! I have
> > configured and run some simple jobs on a small cluster, and everything
> > worked quite fine so far.
> >
> > What I am trying to do right now is to run a duplication detection task
> on
> > dataset of about 9.5M records. The records are well structured, and
> > therefore we can exploit the semantic of attributes to narrow down
> expensive
> > match executions.
> >
> > My idea is the following:
> > 1. partition the dataset according to a macro-parameter written in the
> > record. This allows me to get to 7 partitions of different sizes but also
> > certainly disjoint. I do that by filtering on a specific type.
> > 2. create partitions of each of the partitions created in step 1 based on
> > some simple similarity that would reduce the number of expensive
> function. I
> > would like to do that by using partitionByHash and KeySelector.
> > 3. compute Cross product for each of the partitions defined in step 2;
> > 4. filter each pair of the cross product by applying an expensive boolean
> > matching function. Only positive matching duplicates will be retained.
> >
> > Currently I am working on the step 2, and I have some problems
> understanding
> > how to use the partitionByHash function. The main problem is that I need
> to
> > have a 'rich key' to support partition, and I discovered the
> ExpressionKeys
> > that would allow me to define hash keys with sets of Strings I can
> collect
> > from the record. However, the partitionByHash function does not allow to
> use
> > these objects as the hash must implement comparable.
> >
> > So, here is my question: how can I partition considering hash keys of
> more
> > than one String?
> >
> > Is there a better strategy to implement a de-duplication using Flink?
> >
> >
> > thanks a lot for your support.
> >
> > kind regards,
> >
> > Stefano Bortoli, PhD
> > ENS Technical Director
> > _______________________________________________
> > OKKAMSrl - www.okkam.it
> >
> > Email: bortoli@okkam.it
> >
> > Phone nr: +39 0461 1823912
> >
> > Headquarters: Trento (Italy), Via Trener 8
> > Registered office: Trento (Italy), via Segantini 23
> >
> > Confidentially notice. This e-mail transmission may contain legally
> > privileged and/or confidential information. Please do not read it if you
> are
> > not the intended recipient(S). Any use, distribution, reproduction or
> > disclosure by any other person is strictly prohibited. If you have
> received
> > this e-mail in error, please notify the sender and destroy the original
> > transmission and its attachments without reading or saving it in any
> manner.
>

Re: PartitionByHash and usage of KeySelector

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Stefano,
what to you mean by "creating partitions of each of the partitions"?

The expressions keys can be used to specify fields of objects that
should be used for hashing. So if you had objects of this class in a
DataSet:

class Foo {
  public String bar;
  public Integer baz;
  public Float bat;
}

You could use for example:

input.partitionByHash("baz", "bat")

to perform the partitioning only on those two fields of the objects.

Regards,
Aljoscha

On Thu, Nov 6, 2014 at 9:19 AM, Stefano Bortoli <s....@gmail.com> wrote:
> Hi all,
>
> I am moving my first steps into becoming an Apache Flink user! I have
> configured and run some simple jobs on a small cluster, and everything
> worked quite fine so far.
>
> What I am trying to do right now is to run a duplication detection task on
> dataset of about 9.5M records. The records are well structured, and
> therefore we can exploit the semantic of attributes to narrow down expensive
> match executions.
>
> My idea is the following:
> 1. partition the dataset according to a macro-parameter written in the
> record. This allows me to get to 7 partitions of different sizes but also
> certainly disjoint. I do that by filtering on a specific type.
> 2. create partitions of each of the partitions created in step 1 based on
> some simple similarity that would reduce the number of expensive function. I
> would like to do that by using partitionByHash and KeySelector.
> 3. compute Cross product for each of the partitions defined in step 2;
> 4. filter each pair of the cross product by applying an expensive boolean
> matching function. Only positive matching duplicates will be retained.
>
> Currently I am working on the step 2, and I have some problems understanding
> how to use the partitionByHash function. The main problem is that I need to
> have a 'rich key' to support partition, and I discovered the ExpressionKeys
> that would allow me to define hash keys with sets of Strings I can collect
> from the record. However, the partitionByHash function does not allow to use
> these objects as the hash must implement comparable.
>
> So, here is my question: how can I partition considering hash keys of more
> than one String?
>
> Is there a better strategy to implement a de-duplication using Flink?
>
>
> thanks a lot for your support.
>
> kind regards,
>
> Stefano Bortoli, PhD
> ENS Technical Director
> _______________________________________________
> OKKAMSrl - www.okkam.it
>
> Email: bortoli@okkam.it
>
> Phone nr: +39 0461 1823912
>
> Headquarters: Trento (Italy), Via Trener 8
> Registered office: Trento (Italy), via Segantini 23
>
> Confidentially notice. This e-mail transmission may contain legally
> privileged and/or confidential information. Please do not read it if you are
> not the intended recipient(S). Any use, distribution, reproduction or
> disclosure by any other person is strictly prohibited. If you have received
> this e-mail in error, please notify the sender and destroy the original
> transmission and its attachments without reading or saving it in any manner.