You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Boromir Widas <vc...@gmail.com> on 2014/09/30 23:12:38 UTC

Handling tree reduction algorithm with Spark in parallel

Hello Folks,

I have been trying to implement a tree reduction algorithm recently in
spark but could not find suitable parallel operations. Assuming I have a
general tree like the following -



I have to do the following -
1) Do some computation at each leaf node to get an array of doubles.(This
can be pre computed)
2) For each non leaf node, starting with the root node compute the sum of
these arrays for all child nodes. So to get the array for node B, I need to
get the array for E, which is the sum of G + H.

////////////////////// Start Snippet
case class Node(name: String, children: Array[Node], values: Array[Double])

// read in the tree here

def getSumOfChildren(node: Node) : Array[Double] = {
    if(node.isLeafNode) {
      return node.values
   }
    foreach(child in node.children) {
       // can use an accumulator here
       node.values = (node.values, getSumOfChildren(child)).zipped.map(_+_)
   }
   node.values
}
////////////////////////// End Snippet

Any pointers to how this can be done in parallel to use all cores will be
greatly appreciated.

Thanks,
Boromir.

Re: Handling tree reduction algorithm with Spark in parallel

Posted by Boromir Widas <vc...@gmail.com>.
Thanks Matei, will check out the MLLib implementation.

On Wed, Oct 1, 2014 at 2:24 PM, Andy Twigg <an...@gmail.com> wrote:

> Yes, that makes sense. It's similar to the all reduce pattern in vw.
>
>
> On Wednesday, 1 October 2014, Matei Zaharia <ma...@gmail.com>
> wrote:
>
>> Some of the MLlib algorithms do tree reduction in 1.1:
>> http://databricks.com/blog/2014/09/22/spark-1-1-mllib-performance-improvements.html.
>> You can check out how they implemented it -- it is a series of reduce
>> operations.
>>
>> Matei
>>
>> On Oct 1, 2014, at 11:02 AM, Boromir Widas <vc...@gmail.com> wrote:
>>
>> Thanks a lot Andy and Debashish, your suggestions were of great help.
>>
>> On Tue, Sep 30, 2014 at 6:44 PM, Debasish Das <de...@gmail.com>
>> wrote:
>>
>>> If the tree is too big build it on graphx....but it will need thorough
>>> analysis so that the partitions are well balanced...
>>>
>>> On Tue, Sep 30, 2014 at 2:45 PM, Andy Twigg <an...@gmail.com>
>>> wrote:
>>>
>>>> Hi Boromir,
>>>>
>>>> Assuming the tree fits in memory, and what you want to do is
>>>> parallelize the computation, the 'obvious' way is the following:
>>>>
>>>> * broadcast the tree T to each worker (ok since it fits in memory)
>>>> * construct an RDD for the deepest level - each element in the RDD is
>>>> (parent,data_at_node)
>>>> * aggregate this by key (=parent) -> RDD[parent,data]
>>>> * map each element (p, data) -> (parent(p), data) using T
>>>> * repeat until you have an RDD of size = 1 (assuming T is connected)
>>>>
>>>> If T cannot fit in memory, or is very deep, then there are more exotic
>>>> techniques, but hopefully this suffices.
>>>>
>>>> Andy
>>>>
>>>>
>>>> --
>>>> http://www.cs.ox.ac.uk/people/andy.twigg/
>>>>
>>>> On 30 September 2014 14:12, Boromir Widas <vc...@gmail.com> wrote:
>>>>
>>>>> Hello Folks,
>>>>>
>>>>> I have been trying to implement a tree reduction algorithm recently in
>>>>> spark but could not find suitable parallel operations. Assuming I have a
>>>>> general tree like the following -
>>>>>
>>>>>
>>>>>
>>>>> I have to do the following -
>>>>> 1) Do some computation at each leaf node to get an array of
>>>>> doubles.(This can be pre computed)
>>>>> 2) For each non leaf node, starting with the root node compute the sum
>>>>> of these arrays for all child nodes. So to get the array for node B, I need
>>>>> to get the array for E, which is the sum of G + H.
>>>>>
>>>>> ////////////////////// Start Snippet
>>>>> case class Node(name: String, children: Array[Node], values:
>>>>> Array[Double])
>>>>>
>>>>> // read in the tree here
>>>>>
>>>>> def getSumOfChildren(node: Node) : Array[Double] = {
>>>>>     if(node.isLeafNode) {
>>>>>       return node.values
>>>>>    }
>>>>>     foreach(child in node.children) {
>>>>>        // can use an accumulator here
>>>>>        node.values = (node.values,
>>>>> getSumOfChildren(child)).zipped.map(_+_)
>>>>>    }
>>>>>    node.values
>>>>> }
>>>>> ////////////////////////// End Snippet
>>>>>
>>>>> Any pointers to how this can be done in parallel to use all cores will
>>>>> be greatly appreciated.
>>>>>
>>>>> Thanks,
>>>>> Boromir.
>>>>>
>>>>>
>>>>
>>>
>>
>>

Re: Handling tree reduction algorithm with Spark in parallel

Posted by Andy Twigg <an...@gmail.com>.
Yes, that makes sense. It's similar to the all reduce pattern in vw.

On Wednesday, 1 October 2014, Matei Zaharia <ma...@gmail.com> wrote:

> Some of the MLlib algorithms do tree reduction in 1.1:
> http://databricks.com/blog/2014/09/22/spark-1-1-mllib-performance-improvements.html.
> You can check out how they implemented it -- it is a series of reduce
> operations.
>
> Matei
>
> On Oct 1, 2014, at 11:02 AM, Boromir Widas <vcsubsvc@gmail.com
> <javascript:_e(%7B%7D,'cvml','vcsubsvc@gmail.com');>> wrote:
>
> Thanks a lot Andy and Debashish, your suggestions were of great help.
>
> On Tue, Sep 30, 2014 at 6:44 PM, Debasish Das <debasish.das83@gmail.com
> <javascript:_e(%7B%7D,'cvml','debasish.das83@gmail.com');>> wrote:
>
>> If the tree is too big build it on graphx....but it will need thorough
>> analysis so that the partitions are well balanced...
>>
>> On Tue, Sep 30, 2014 at 2:45 PM, Andy Twigg <andy.twigg@gmail.com
>> <javascript:_e(%7B%7D,'cvml','andy.twigg@gmail.com');>> wrote:
>>
>>> Hi Boromir,
>>>
>>> Assuming the tree fits in memory, and what you want to do is parallelize
>>> the computation, the 'obvious' way is the following:
>>>
>>> * broadcast the tree T to each worker (ok since it fits in memory)
>>> * construct an RDD for the deepest level - each element in the RDD is
>>> (parent,data_at_node)
>>> * aggregate this by key (=parent) -> RDD[parent,data]
>>> * map each element (p, data) -> (parent(p), data) using T
>>> * repeat until you have an RDD of size = 1 (assuming T is connected)
>>>
>>> If T cannot fit in memory, or is very deep, then there are more exotic
>>> techniques, but hopefully this suffices.
>>>
>>> Andy
>>>
>>>
>>> --
>>> http://www.cs.ox.ac.uk/people/andy.twigg/
>>>
>>> On 30 September 2014 14:12, Boromir Widas <vcsubsvc@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','vcsubsvc@gmail.com');>> wrote:
>>>
>>>> Hello Folks,
>>>>
>>>> I have been trying to implement a tree reduction algorithm recently in
>>>> spark but could not find suitable parallel operations. Assuming I have a
>>>> general tree like the following -
>>>>
>>>>
>>>>
>>>> I have to do the following -
>>>> 1) Do some computation at each leaf node to get an array of
>>>> doubles.(This can be pre computed)
>>>> 2) For each non leaf node, starting with the root node compute the sum
>>>> of these arrays for all child nodes. So to get the array for node B, I need
>>>> to get the array for E, which is the sum of G + H.
>>>>
>>>> ////////////////////// Start Snippet
>>>> case class Node(name: String, children: Array[Node], values:
>>>> Array[Double])
>>>>
>>>> // read in the tree here
>>>>
>>>> def getSumOfChildren(node: Node) : Array[Double] = {
>>>>     if(node.isLeafNode) {
>>>>       return node.values
>>>>    }
>>>>     foreach(child in node.children) {
>>>>        // can use an accumulator here
>>>>        node.values = (node.values,
>>>> getSumOfChildren(child)).zipped.map(_+_)
>>>>    }
>>>>    node.values
>>>> }
>>>> ////////////////////////// End Snippet
>>>>
>>>> Any pointers to how this can be done in parallel to use all cores will
>>>> be greatly appreciated.
>>>>
>>>> Thanks,
>>>> Boromir.
>>>>
>>>>
>>>
>>
>
>

Re: Handling tree reduction algorithm with Spark in parallel

Posted by Matei Zaharia <ma...@gmail.com>.
Some of the MLlib algorithms do tree reduction in 1.1: http://databricks.com/blog/2014/09/22/spark-1-1-mllib-performance-improvements.html. You can check out how they implemented it -- it is a series of reduce operations.

Matei

On Oct 1, 2014, at 11:02 AM, Boromir Widas <vc...@gmail.com> wrote:

> Thanks a lot Andy and Debashish, your suggestions were of great help.
> 
> On Tue, Sep 30, 2014 at 6:44 PM, Debasish Das <de...@gmail.com> wrote:
> If the tree is too big build it on graphx....but it will need thorough analysis so that the partitions are well balanced...
> 
> On Tue, Sep 30, 2014 at 2:45 PM, Andy Twigg <an...@gmail.com> wrote:
> Hi Boromir,
> 
> Assuming the tree fits in memory, and what you want to do is parallelize the computation, the 'obvious' way is the following: 
> 
> * broadcast the tree T to each worker (ok since it fits in memory)
> * construct an RDD for the deepest level - each element in the RDD is (parent,data_at_node)
> * aggregate this by key (=parent) -> RDD[parent,data]
> * map each element (p, data) -> (parent(p), data) using T
> * repeat until you have an RDD of size = 1 (assuming T is connected)
> 
> If T cannot fit in memory, or is very deep, then there are more exotic techniques, but hopefully this suffices.
> 
> Andy
> 
> 
> --
> http://www.cs.ox.ac.uk/people/andy.twigg/
> 
> On 30 September 2014 14:12, Boromir Widas <vc...@gmail.com> wrote:
> Hello Folks,
> 
> I have been trying to implement a tree reduction algorithm recently in spark but could not find suitable parallel operations. Assuming I have a general tree like the following -
> 
> 
> 
> I have to do the following -
> 1) Do some computation at each leaf node to get an array of doubles.(This can be pre computed)
> 2) For each non leaf node, starting with the root node compute the sum of these arrays for all child nodes. So to get the array for node B, I need to get the array for E, which is the sum of G + H.
> 
> ////////////////////// Start Snippet
> case class Node(name: String, children: Array[Node], values: Array[Double])
> 
> // read in the tree here
> 
> def getSumOfChildren(node: Node) : Array[Double] = {
>     if(node.isLeafNode) {
>       return node.values
>    }
>     foreach(child in node.children) {
>        // can use an accumulator here
>        node.values = (node.values, getSumOfChildren(child)).zipped.map(_+_)
>    }
>    node.values
> } 
> ////////////////////////// End Snippet
> 
> Any pointers to how this can be done in parallel to use all cores will be greatly appreciated.
> 
> Thanks,
> Boromir.
> 
> 
> 
> 


Re: Handling tree reduction algorithm with Spark in parallel

Posted by Boromir Widas <vc...@gmail.com>.
Thanks a lot Andy and Debashish, your suggestions were of great help.

On Tue, Sep 30, 2014 at 6:44 PM, Debasish Das <de...@gmail.com>
wrote:

> If the tree is too big build it on graphx....but it will need thorough
> analysis so that the partitions are well balanced...
>
> On Tue, Sep 30, 2014 at 2:45 PM, Andy Twigg <an...@gmail.com> wrote:
>
>> Hi Boromir,
>>
>> Assuming the tree fits in memory, and what you want to do is parallelize
>> the computation, the 'obvious' way is the following:
>>
>> * broadcast the tree T to each worker (ok since it fits in memory)
>> * construct an RDD for the deepest level - each element in the RDD is
>> (parent,data_at_node)
>> * aggregate this by key (=parent) -> RDD[parent,data]
>> * map each element (p, data) -> (parent(p), data) using T
>> * repeat until you have an RDD of size = 1 (assuming T is connected)
>>
>> If T cannot fit in memory, or is very deep, then there are more exotic
>> techniques, but hopefully this suffices.
>>
>> Andy
>>
>>
>> --
>> http://www.cs.ox.ac.uk/people/andy.twigg/
>>
>> On 30 September 2014 14:12, Boromir Widas <vc...@gmail.com> wrote:
>>
>>> Hello Folks,
>>>
>>> I have been trying to implement a tree reduction algorithm recently in
>>> spark but could not find suitable parallel operations. Assuming I have a
>>> general tree like the following -
>>>
>>>
>>>
>>> I have to do the following -
>>> 1) Do some computation at each leaf node to get an array of
>>> doubles.(This can be pre computed)
>>> 2) For each non leaf node, starting with the root node compute the sum
>>> of these arrays for all child nodes. So to get the array for node B, I need
>>> to get the array for E, which is the sum of G + H.
>>>
>>> ////////////////////// Start Snippet
>>> case class Node(name: String, children: Array[Node], values:
>>> Array[Double])
>>>
>>> // read in the tree here
>>>
>>> def getSumOfChildren(node: Node) : Array[Double] = {
>>>     if(node.isLeafNode) {
>>>       return node.values
>>>    }
>>>     foreach(child in node.children) {
>>>        // can use an accumulator here
>>>        node.values = (node.values,
>>> getSumOfChildren(child)).zipped.map(_+_)
>>>    }
>>>    node.values
>>> }
>>> ////////////////////////// End Snippet
>>>
>>> Any pointers to how this can be done in parallel to use all cores will
>>> be greatly appreciated.
>>>
>>> Thanks,
>>> Boromir.
>>>
>>>
>>
>

Re: Handling tree reduction algorithm with Spark in parallel

Posted by Debasish Das <de...@gmail.com>.
If the tree is too big build it on graphx....but it will need thorough
analysis so that the partitions are well balanced...

On Tue, Sep 30, 2014 at 2:45 PM, Andy Twigg <an...@gmail.com> wrote:

> Hi Boromir,
>
> Assuming the tree fits in memory, and what you want to do is parallelize
> the computation, the 'obvious' way is the following:
>
> * broadcast the tree T to each worker (ok since it fits in memory)
> * construct an RDD for the deepest level - each element in the RDD is
> (parent,data_at_node)
> * aggregate this by key (=parent) -> RDD[parent,data]
> * map each element (p, data) -> (parent(p), data) using T
> * repeat until you have an RDD of size = 1 (assuming T is connected)
>
> If T cannot fit in memory, or is very deep, then there are more exotic
> techniques, but hopefully this suffices.
>
> Andy
>
>
> --
> http://www.cs.ox.ac.uk/people/andy.twigg/
>
> On 30 September 2014 14:12, Boromir Widas <vc...@gmail.com> wrote:
>
>> Hello Folks,
>>
>> I have been trying to implement a tree reduction algorithm recently in
>> spark but could not find suitable parallel operations. Assuming I have a
>> general tree like the following -
>>
>>
>>
>> I have to do the following -
>> 1) Do some computation at each leaf node to get an array of doubles.(This
>> can be pre computed)
>> 2) For each non leaf node, starting with the root node compute the sum of
>> these arrays for all child nodes. So to get the array for node B, I need to
>> get the array for E, which is the sum of G + H.
>>
>> ////////////////////// Start Snippet
>> case class Node(name: String, children: Array[Node], values:
>> Array[Double])
>>
>> // read in the tree here
>>
>> def getSumOfChildren(node: Node) : Array[Double] = {
>>     if(node.isLeafNode) {
>>       return node.values
>>    }
>>     foreach(child in node.children) {
>>        // can use an accumulator here
>>        node.values = (node.values,
>> getSumOfChildren(child)).zipped.map(_+_)
>>    }
>>    node.values
>> }
>> ////////////////////////// End Snippet
>>
>> Any pointers to how this can be done in parallel to use all cores will be
>> greatly appreciated.
>>
>> Thanks,
>> Boromir.
>>
>>
>

Re: Handling tree reduction algorithm with Spark in parallel

Posted by Andy Twigg <an...@gmail.com>.
Hi Boromir,

Assuming the tree fits in memory, and what you want to do is parallelize
the computation, the 'obvious' way is the following:

* broadcast the tree T to each worker (ok since it fits in memory)
* construct an RDD for the deepest level - each element in the RDD is
(parent,data_at_node)
* aggregate this by key (=parent) -> RDD[parent,data]
* map each element (p, data) -> (parent(p), data) using T
* repeat until you have an RDD of size = 1 (assuming T is connected)

If T cannot fit in memory, or is very deep, then there are more exotic
techniques, but hopefully this suffices.

Andy


--
http://www.cs.ox.ac.uk/people/andy.twigg/

On 30 September 2014 14:12, Boromir Widas <vc...@gmail.com> wrote:

> Hello Folks,
>
> I have been trying to implement a tree reduction algorithm recently in
> spark but could not find suitable parallel operations. Assuming I have a
> general tree like the following -
>
>
>
> I have to do the following -
> 1) Do some computation at each leaf node to get an array of doubles.(This
> can be pre computed)
> 2) For each non leaf node, starting with the root node compute the sum of
> these arrays for all child nodes. So to get the array for node B, I need to
> get the array for E, which is the sum of G + H.
>
> ////////////////////// Start Snippet
> case class Node(name: String, children: Array[Node], values: Array[Double])
>
> // read in the tree here
>
> def getSumOfChildren(node: Node) : Array[Double] = {
>     if(node.isLeafNode) {
>       return node.values
>    }
>     foreach(child in node.children) {
>        // can use an accumulator here
>        node.values = (node.values, getSumOfChildren(child)).zipped.map(_+_)
>    }
>    node.values
> }
> ////////////////////////// End Snippet
>
> Any pointers to how this can be done in parallel to use all cores will be
> greatly appreciated.
>
> Thanks,
> Boromir.
>
>