You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Florian Hönicke <ro...@gmail.com> on 2014/10/02 19:29:58 UTC
Re: long runtime
Thanks a lot :)
I set some semantic annotations.
Now it needs 2 minutes.
Am 25.09.2014 11:32, schrieb Fabian Hueske:
> Your program is doing quite a few repartitioning steps, where all data
> comes from a single data source.
> You could try two things:
> - triple the DataSource and Map Function that go into the two
> Signature FlatMaps and the two later CoGroups such that you have two
> source->map for each FlatMap and another one for the two later CoGroups.
> - check out if SemanticAnnotations can help you to prevent expensive
> repartitionings and sortings for the cogroups
> (http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html).
>
> Best, Fabian
>
> 2014-09-25 10:51 GMT+02:00 Fabian Hueske <fhueske@apache.org
> <ma...@apache.org>>:
>
> Hi,
>
> the plan shows all operator DOPs as 1.
> Did you create the plan locally or on the cluster with the correct
> DOP? The CLI client offers the -p parameter also for "info -e".
>
> BTW, you could try to set the DOP to the number of cores in your
> cluster. (But that doesn't explain why the job is so slow).
>
> 2014-09-25 10:01 GMT+02:00 Florian Hönicke <rockstarflo@gmail.com
> <ma...@gmail.com>>:
>
> yes. I ran the massJoin on the cluster as well on 500MB.
> I attached the execution plan.
>
> Greetings,
> Florian
>
>
> Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
>> OK, the log shows that the tasks are evenly distributed to
>> all nodes.
>> I assume you run the program on the cluster as well on 500MB,
>> right?
>>
>> Can you please also post the execution plan for the cluster
>> execution?
>> You get it with (See also:
>> http://flink.incubator.apache.org/docs/0.6-incubating/cli.html):
>> ./flink info -e jarfile.jar <parameters>
>>
>> Thanks, Fabian
>>
>> 2014-09-25 0:21 GMT+02:00 Florian Hönicke
>> <rockstarflo@gmail.com <ma...@gmail.com>>:
>>
>> Thanks for your quick answer.
>> In the following, I roughly sketch the mass-join algorithm.
>> http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
>> <http://www.cs.berkeley.edu/%7Ejnwang/papers/icde14_massjoin.pdf>
>>
>> It's a R-S-Join which i modified to a self-join.
>> Given a set of token sets. The massJoin finds all similar
>> sets (regarding to the Jaccard
>> Similarity(intersection/union))
>> First, it calculates a global token grouping, i.e., each
>> to token is grouped in one of 30 groups. Each group has
>> almost the same token count.
>> Than, it generates two types of signatures for each input
>> set.
>> If two sets are similar, they must share a common signature.
>> In the next step, we find all candidate pairs (pairs
>> which share a common signature).
>> Some candidate pairs are filtered using the global token
>> grouping.
>> The remaining candidate pairs are verified to filter out
>> all dissimilar pairs.
>>
>> @Fabian
>> I specified the DOP via the command-line client as follows:
>> /home/hoenicke/flink-0.6-incubating/bin/flink run -p 11
>> /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9
>> \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt
>> file:///home/hoenicke/flink-0.6-incubating/output -v
>>
>> The log file is attached.
>>
>> Best, Florian
>>
>> Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
>>> Hi,
>>>
>>> how did you specify the degree of parallelism DOP for
>>> your program?
>>> Via the command-line client or system-configuration or
>>> otherwise?
>>>
>>> The JobManager log file (./log/*jobManager*.log)
>>> contains you the DOP of each task.
>>>
>>> Best, Fabian
>>>
>>> 2014-09-24 18:41 GMT+02:00 Stephan Ewen
>>> <sewen@apache.org <ma...@apache.org>>:
>>>
>>> Hi!
>>>
>>> Ad-hoc, that is not easy to say. It depends on your
>>> algorithm, how much data replication it does...
>>>
>>> We'd need a bit of time to look into the code. It
>>> would help if you could roughly sketch the algorithm
>>> for us and give us a breakdown of how much time is
>>> spent in which operator (like a screenshot of the
>>> runtime web monitor).
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke
>>> <rockstarflo@gmail.com
>>> <ma...@gmail.com>> wrote:
>>>
>>> Hello :)
>>>
>>> my Flink program is extreme slow.
>>> I implemented a set similarity join in Flink
>>> (Mass-Join).
>>> Furthermore, I implemented a local version in Java.
>>> I compared both Implementations.
>>> The Local version needs one minute to compute a
>>> 500MB Dataset.
>>> My Flink program needs 5 minutes (cluster: 11
>>> nodes, 20 000 MB RAM).
>>> I use the Flink version 0.6.
>>> What could be the cause?
>>>
>>> I would welcome your response,
>>> Florian Hönicke
>>>
>>>
>>>
>>
>>
>
>
>
Re: Re: long runtime
Posted by Flavio Pompermaier <po...@okkam.it>.
Thanks Florian, I'll try it too in the next weeks!
On Oct 2, 2014 8:00 PM, "Florian Hönicke" <ro...@gmail.com> wrote:
> The code is attached.
> Input format:
> <SetID=1, token_1, token_7, token_11, token_20...token_i>
> <SetID=2, token_2, token_4...token_j>
> ....
> In the file it looks like:
> 1 1,7,11,20
> 2 2,4
> We assume that all tokens (token_1...token_n) are sorted by their global
> token frequency.
> Token_1 is the least frequent token and token_n is the most frequent token.
>
> Greetings Florian
>
> -------- Original-Nachricht -------- Betreff: Re: long runtime Datum: Thu,
> 2 Oct 2014 19:42:58 +0200 Von: Flavio Pompermaier <po...@okkam.it>
> <po...@okkam.it> Antwort an: user@flink.incubator.apache.org An:
> user@flink.incubator.apache.org
>
> Could you share the code?it sounds interesting to try!
> On Oct 2, 2014 7:31 PM, "Florian Hönicke" <ro...@gmail.com> wrote:
>
>> Thanks a lot :)
>> I set some semantic annotations.
>> Now it needs 2 minutes.
>> Edit: the triple DataSource does not have an influence.
>>
>> Am 25.09.2014 11:32, schrieb Fabian Hueske:
>>
>> Your program is doing quite a few repartitioning steps, where all data
>> comes from a single data source.
>> You could try two things:
>> - triple the DataSource and Map Function that go into the two Signature
>> FlatMaps and the two later CoGroups such that you have two source->map for
>> each FlatMap and another one for the two later CoGroups.
>> - check out if SemanticAnnotations can help you to prevent expensive
>> repartitionings and sortings for the cogroups (
>> http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html
>> ).
>>
>> Best, Fabian
>>
>> 2014-09-25 10:51 GMT+02:00 Fabian Hueske <fh...@apache.org>:
>>
>>> Hi,
>>>
>>> the plan shows all operator DOPs as 1.
>>> Did you create the plan locally or on the cluster with the correct DOP?
>>> The CLI client offers the -p parameter also for "info -e".
>>>
>>> BTW, you could try to set the DOP to the number of cores in your
>>> cluster. (But that doesn't explain why the job is so slow).
>>>
>>> 2014-09-25 10:01 GMT+02:00 Florian Hönicke <ro...@gmail.com>:
>>>
>>>> yes. I ran the massJoin on the cluster as well on 500MB.
>>>> I attached the execution plan.
>>>>
>>>> Greetings,
>>>> Florian
>>>>
>>>>
>>>> Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
>>>>
>>>> OK, the log shows that the tasks are evenly distributed to all nodes.
>>>> I assume you run the program on the cluster as well on 500MB, right?
>>>>
>>>> Can you please also post the execution plan for the cluster execution?
>>>> You get it with (See also:
>>>> http://flink.incubator.apache.org/docs/0.6-incubating/cli.html):
>>>> ./flink info -e jarfile.jar <parameters>
>>>>
>>>> Thanks, Fabian
>>>>
>>>> 2014-09-25 0:21 GMT+02:00 Florian Hönicke <ro...@gmail.com>:
>>>>
>>>>> Thanks for your quick answer.
>>>>> In the following, I roughly sketch the mass-join algorithm.
>>>>> http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
>>>>> It's a R-S-Join which i modified to a self-join.
>>>>> Given a set of token sets. The massJoin finds all similar sets
>>>>> (regarding to the Jaccard Similarity(intersection/union))
>>>>> First, it calculates a global token grouping, i.e., each to token is
>>>>> grouped in one of 30 groups. Each group has almost the same token count.
>>>>> Than, it generates two types of signatures for each input set.
>>>>> If two sets are similar, they must share a common signature.
>>>>> In the next step, we find all candidate pairs (pairs which share a
>>>>> common signature).
>>>>> Some candidate pairs are filtered using the global token grouping.
>>>>> The remaining candidate pairs are verified to filter out all
>>>>> dissimilar pairs.
>>>>>
>>>>> @Fabian
>>>>> I specified the DOP via the command-line client as follows:
>>>>> /home/hoenicke/flink-0.6-incubating/bin/flink run -p 11
>>>>> /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \
>>>>> file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt
>>>>> file:///home/hoenicke/flink-0.6-incubating/output -v
>>>>>
>>>>> The log file is attached.
>>>>>
>>>>> Best, Florian
>>>>>
>>>>> Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
>>>>>
>>>>> Hi,
>>>>>
>>>>> how did you specify the degree of parallelism DOP for your program?
>>>>> Via the command-line client or system-configuration or otherwise?
>>>>>
>>>>> The JobManager log file (./log/*jobManager*.log) contains you the
>>>>> DOP of each task.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2014-09-24 18:41 GMT+02:00 Stephan Ewen <se...@apache.org>:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> Ad-hoc, that is not easy to say. It depends on your algorithm, how
>>>>>> much data replication it does...
>>>>>>
>>>>>> We'd need a bit of time to look into the code. It would help if you
>>>>>> could roughly sketch the algorithm for us and give us a breakdown of how
>>>>>> much time is spent in which operator (like a screenshot of the runtime web
>>>>>> monitor).
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <
>>>>>> rockstarflo@gmail.com> wrote:
>>>>>>
>>>>>>> Hello :)
>>>>>>>
>>>>>>> my Flink program is extreme slow.
>>>>>>> I implemented a set similarity join in Flink (Mass-Join).
>>>>>>> Furthermore, I implemented a local version in Java.
>>>>>>> I compared both Implementations.
>>>>>>> The Local version needs one minute to compute a 500MB Dataset.
>>>>>>> My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
>>>>>>> I use the Flink version 0.6.
>>>>>>> What could be the cause?
>>>>>>>
>>>>>>> I would welcome your response,
>>>>>>> Florian Hönicke
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>
>
Re: Re: long runtime
Posted by Florian Hönicke <ro...@gmail.com>.
The code is attached.
Input format:
<SetID=1, token_1, token_7, token_11, token_20...token_i>
<SetID=2, token_2, token_4...token_j>
....
In the file it looks like:
1 1,7,11,20
2 2,4
We assume that all tokens (token_1...token_n) are sorted by their global
token frequency.
Token_1 is the least frequent token and token_n is the most frequent token.
Greetings Florian
-------- Original-Nachricht --------
Betreff: Re: long runtime
Datum: Thu, 2 Oct 2014 19:42:58 +0200
Von: Flavio Pompermaier <po...@okkam.it>
Antwort an: user@flink.incubator.apache.org
An: user@flink.incubator.apache.org
Could you share the code?it sounds interesting to try!
On Oct 2, 2014 7:31 PM, "Florian Hönicke" <rockstarflo@gmail.com
<ma...@gmail.com>> wrote:
Thanks a lot :)
I set some semantic annotations.
Now it needs 2 minutes.
Edit: the triple DataSource does not have an influence.
Am 25.09.2014 11:32, schrieb Fabian Hueske:
> Your program is doing quite a few repartitioning steps, where all
> data comes from a single data source.
> You could try two things:
> - triple the DataSource and Map Function that go into the two
> Signature FlatMaps and the two later CoGroups such that you
> have two source->map for each FlatMap and another one for the two
> later CoGroups.
> - check out if SemanticAnnotations can help you to prevent
> expensive repartitionings and sortings for the cogroups
> (http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html).
>
> Best, Fabian
>
> 2014-09-25 10:51 GMT+02:00 Fabian Hueske <fhueske@apache.org
> <ma...@apache.org>>:
>
> Hi,
>
> the plan shows all operator DOPs as 1.
> Did you create the plan locally or on the cluster with the
> correct DOP? The CLI client offers the -p parameter also for
> "info -e".
>
> BTW, you could try to set the DOP to the number of cores in
> your cluster. (But that doesn't explain why the job is so slow).
>
> 2014-09-25 10:01 GMT+02:00 Florian Hönicke
> <rockstarflo@gmail.com <ma...@gmail.com>>:
>
> yes. I ran the massJoin on the cluster as well on 500MB.
> I attached the execution plan.
>
> Greetings,
> Florian
>
>
> Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
>> OK, the log shows that the tasks are evenly distributed
>> to all nodes.
>> I assume you run the program on the cluster as well on
>> 500MB, right?
>>
>> Can you please also post the execution plan for the
>> cluster execution?
>> You get it with (See also:
>> http://flink.incubator.apache.org/docs/0.6-incubating/cli.html):
>> ./flink info -e jarfile.jar <parameters>
>>
>> Thanks, Fabian
>>
>> 2014-09-25 0:21 GMT+02:00 Florian Hönicke
>> <rockstarflo@gmail.com <ma...@gmail.com>>:
>>
>> Thanks for your quick answer.
>> In the following, I roughly sketch the mass-join
>> algorithm.
>> http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
>> <http://www.cs.berkeley.edu/%7Ejnwang/papers/icde14_massjoin.pdf>
>>
>> It's a R-S-Join which i modified to a self-join.
>> Given a set of token sets. The massJoin finds all
>> similar sets (regarding to the Jaccard
>> Similarity(intersection/union))
>> First, it calculates a global token grouping, i.e.,
>> each to token is grouped in one of 30 groups. Each
>> group has almost the same token count.
>> Than, it generates two types of signatures for each
>> input set.
>> If two sets are similar, they must share a common
>> signature.
>> In the next step, we find all candidate pairs (pairs
>> which share a common signature).
>> Some candidate pairs are filtered using the global
>> token grouping.
>> The remaining candidate pairs are verified to filter
>> out all dissimilar pairs.
>>
>> @Fabian
>> I specified the DOP via the command-line client as
>> follows:
>> /home/hoenicke/flink-0.6-incubating/bin/flink run -p
>> 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar
>> 0.9
>> \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt
>> file:///home/hoenicke/flink-0.6-incubating/output -v
>>
>> The log file is attached.
>>
>> Best, Florian
>>
>> Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
>>> Hi,
>>>
>>> how did you specify the degree of parallelism
>>> DOP for your program?
>>> Via the command-line client or system-configuration
>>> or otherwise?
>>>
>>> The JobManager log file (./log/*jobManager*.log)
>>> contains you the DOP of each task.
>>>
>>> Best, Fabian
>>>
>>> 2014-09-24 18:41 GMT+02:00 Stephan Ewen
>>> <sewen@apache.org <ma...@apache.org>>:
>>>
>>> Hi!
>>>
>>> Ad-hoc, that is not easy to say. It depends on
>>> your algorithm, how much data replication it does...
>>>
>>> We'd need a bit of time to look into the code.
>>> It would help if you could roughly sketch the
>>> algorithm for us and give us a breakdown of how
>>> much time is spent in which operator (like a
>>> screenshot of the runtime web monitor).
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke
>>> <rockstarflo@gmail.com
>>> <ma...@gmail.com>> wrote:
>>>
>>> Hello :)
>>>
>>> my Flink program is extreme slow.
>>> I implemented a set similarity join in Flink
>>> (Mass-Join).
>>> Furthermore, I implemented a local version
>>> in Java.
>>> I compared both Implementations.
>>> The Local version needs one minute to
>>> compute a 500MB Dataset.
>>> My Flink program needs 5 minutes (cluster:
>>> 11 nodes, 20 000 MB RAM).
>>> I use the Flink version 0.6.
>>> What could be the cause?
>>>
>>> I would welcome your response,
>>> Florian Hönicke
>>>
>>>
>>>
>>
>>
>
>
>
Re: long runtime
Posted by Flavio Pompermaier <po...@okkam.it>.
Could you share the code?it sounds interesting to try!
On Oct 2, 2014 7:31 PM, "Florian Hönicke" <ro...@gmail.com> wrote:
> Thanks a lot :)
> I set some semantic annotations.
> Now it needs 2 minutes.
>
> Am 25.09.2014 11:32, schrieb Fabian Hueske:
>
> Your program is doing quite a few repartitioning steps, where all data
> comes from a single data source.
> You could try two things:
> - triple the DataSource and Map Function that go into the two Signature
> FlatMaps and the two later CoGroups such that you have two source->map for
> each FlatMap and another one for the two later CoGroups.
> - check out if SemanticAnnotations can help you to prevent expensive
> repartitionings and sortings for the cogroups (
> http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html
> ).
>
> Best, Fabian
>
> 2014-09-25 10:51 GMT+02:00 Fabian Hueske <fh...@apache.org>:
>
>> Hi,
>>
>> the plan shows all operator DOPs as 1.
>> Did you create the plan locally or on the cluster with the correct DOP?
>> The CLI client offers the -p parameter also for "info -e".
>>
>> BTW, you could try to set the DOP to the number of cores in your
>> cluster. (But that doesn't explain why the job is so slow).
>>
>> 2014-09-25 10:01 GMT+02:00 Florian Hönicke <ro...@gmail.com>:
>>
>>> yes. I ran the massJoin on the cluster as well on 500MB.
>>> I attached the execution plan.
>>>
>>> Greetings,
>>> Florian
>>>
>>>
>>> Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
>>>
>>> OK, the log shows that the tasks are evenly distributed to all nodes.
>>> I assume you run the program on the cluster as well on 500MB, right?
>>>
>>> Can you please also post the execution plan for the cluster execution?
>>> You get it with (See also:
>>> http://flink.incubator.apache.org/docs/0.6-incubating/cli.html):
>>> ./flink info -e jarfile.jar <parameters>
>>>
>>> Thanks, Fabian
>>>
>>> 2014-09-25 0:21 GMT+02:00 Florian Hönicke <ro...@gmail.com>:
>>>
>>>> Thanks for your quick answer.
>>>> In the following, I roughly sketch the mass-join algorithm.
>>>> http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
>>>> It's a R-S-Join which i modified to a self-join.
>>>> Given a set of token sets. The massJoin finds all similar sets
>>>> (regarding to the Jaccard Similarity(intersection/union))
>>>> First, it calculates a global token grouping, i.e., each to token is
>>>> grouped in one of 30 groups. Each group has almost the same token count.
>>>> Than, it generates two types of signatures for each input set.
>>>> If two sets are similar, they must share a common signature.
>>>> In the next step, we find all candidate pairs (pairs which share a
>>>> common signature).
>>>> Some candidate pairs are filtered using the global token grouping.
>>>> The remaining candidate pairs are verified to filter out all dissimilar
>>>> pairs.
>>>>
>>>> @Fabian
>>>> I specified the DOP via the command-line client as follows:
>>>> /home/hoenicke/flink-0.6-incubating/bin/flink run -p 11
>>>> /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \
>>>> file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt
>>>> file:///home/hoenicke/flink-0.6-incubating/output -v
>>>>
>>>> The log file is attached.
>>>>
>>>> Best, Florian
>>>>
>>>> Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
>>>>
>>>> Hi,
>>>>
>>>> how did you specify the degree of parallelism DOP for your program?
>>>> Via the command-line client or system-configuration or otherwise?
>>>>
>>>> The JobManager log file (./log/*jobManager*.log) contains you the DOP
>>>> of each task.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2014-09-24 18:41 GMT+02:00 Stephan Ewen <se...@apache.org>:
>>>>
>>>>> Hi!
>>>>>
>>>>> Ad-hoc, that is not easy to say. It depends on your algorithm, how
>>>>> much data replication it does...
>>>>>
>>>>> We'd need a bit of time to look into the code. It would help if you
>>>>> could roughly sketch the algorithm for us and give us a breakdown of how
>>>>> much time is spent in which operator (like a screenshot of the runtime web
>>>>> monitor).
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <
>>>>> rockstarflo@gmail.com> wrote:
>>>>>
>>>>>> Hello :)
>>>>>>
>>>>>> my Flink program is extreme slow.
>>>>>> I implemented a set similarity join in Flink (Mass-Join).
>>>>>> Furthermore, I implemented a local version in Java.
>>>>>> I compared both Implementations.
>>>>>> The Local version needs one minute to compute a 500MB Dataset.
>>>>>> My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
>>>>>> I use the Flink version 0.6.
>>>>>> What could be the cause?
>>>>>>
>>>>>> I would welcome your response,
>>>>>> Florian Hönicke
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>
>