You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Guillaume Pitel <gu...@exensa.com> on 2015/03/20 23:24:40 UTC

Directly broadcasting (sort of) RDDs

Hi,

I have an idea that I would like to discuss with the Spark devs. The 
idea comes from a very real problem that I have struggled with since 
almost a year. My problem is very simple, it's a dense matrix * sparse 
matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is 
divided in X large blocks (one block per partition), and a sparse matrix 
RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The 
most efficient way to perform the operation is to collectAsMap() the 
dense matrix and broadcast it, then perform the block-local 
mutliplications, and combine the results by column.

This is quite fine, unless the matrix is too big to fit in memory 
(especially since the multiplication is performed several times 
iteratively, and the broadcasts are not always cleaned from memory as I 
would naively expect).

When the dense matrix is too big, a second solution is to split the big 
sparse matrix in several RDD, and do several broadcasts. Doing this 
creates quite a big overhead, but it mostly works, even though I often 
face some problems with unaccessible broadcast files, for instance.

Then there is the terrible but apparently very effective good old join. 
Since X blocks of the sparse matrix use the same block from the dense 
matrix, I suspect that the dense matrix is somehow replicated X times 
(either on disk or in the network), which is the reason why the join 
takes so much time.

After this bit of a context, here is my idea : would it be possible to 
somehow "broadcast" (or maybe more accurately, share or serve) a 
persisted RDD which is distributed on all workers, in a way that would, 
a bit like the IndexedRDD, allow a task to access a partition or an 
element of a partition in the closure, with a worker-local memory cache 
. i.e. the information about where each block resides would be 
distributed on the workers, to allow them to access parts of the RDD 
directly. I think that's already a bit how RDD are shuffled ?

The RDD could stay distributed (no need to collect then broadcast), and 
only necessary transfers would be required.

Is this a bad idea, is it already implemented somewhere (I would love it 
!) ?or is it something that could add efficiency not only for my use 
case, but maybe for others ? Could someone give me some hint about how I 
could add this possibility to Spark ? I would probably try to extend a 
RDD into a specific SharedIndexedRDD with a special lookup that would be 
allowed from tasks as a special case, and that would try to contact the 
blockManager and reach the corresponding data from the right worker.

Thanks in advance for your advices

Guillaume
-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705


Re: Directly broadcasting (sort of) RDDs

Posted by Sean Owen <so...@cloudera.com>.
Since RDDs aren't designed as random-access maps, and are basically
bits of bookkeeping that make sense only on the driver, I think the
realization of something like this in Spark would realistically be
"collect RDD to local data structure" if anything.

It sounds like you're looking for a distributed cache, and there are
frameworks for that that can be used with Spark without Spark
rebuilding that too.

On Mon, Mar 23, 2015 at 12:00 PM, Guillaume Pitel
<gu...@exensa.com> wrote:
> Not far, but not exactly. The RDD could be too big to fit in memory,
>
> The idea is more like a worker-side rdd.lookup() with local cache.
>
> Guillaume
>
> In a sentence, is this the idea of collecting an RDD to memory on each
> executor directly?
>
> On Sun, Mar 22, 2015 at 10:56 PM, Sandy Ryza <sa...@cloudera.com>
> wrote:
>
> Hi Guillaume,
>
> I've long thought something like this would be useful - i.e. the ability to
> broadcast RDDs directly without first pulling data through the driver.  If I
> understand correctly, your requirement to "block" a matrix up and only fetch
> the needed parts could be implemented on top of this by splitting an RDD
> into a set of smaller RDDs and then broadcasting each one on its own.
>
> Unfortunately nobody is working on this currently (and I couldn't promise to
> have bandwidth to review it at the moment either), but I suspect we'll
> eventually need to add something like this for map joins in Hive on Spark
> and Spark SQL.
>
> -Sandy
>
>
>
> On Sat, Mar 21, 2015 at 3:11 AM, Guillaume Pitel
> <gu...@exensa.com> wrote:
>
> Hi,
>
> Thanks for your answer. This is precisely the use case I'm interested in,
> but I know it already, I should have mentionned it. Unfortunately this
> implementation of BlockMatrix has (in my opinion) some disadvantages (the
> fact that it split the matrix by range instead of using a modulo is bad for
> block skewness). Besides, and more importantly, as I was writing, it uses
> the join solution (actually a cogroup :
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala,
> line 361). The reduplication of the elements of the dense matrix is thus
> dependent on the block size.
>
> Actually I'm wondering if what I want to achieve could be made with a
> simple modification to the join, allowing a partition to be weakly cached
> wafter being retrieved.
>
> Guillaume
>
>
> There is block matrix in Spark 1.3 -
> http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix
>
>
>
>
>
> However I believe it only supports dense matrix blocks.
>
>
>
>
> Still, might be possible to use it or exetend
>
>
>
>
> JIRAs:
>
>
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434
>
>
>
>
>
> Was based on
>
>
> https://github.com/amplab/ml-matrix
>
>
>
>
>
> Another lib:
>
>
> https://github.com/PasaLab/marlin/blob/master/README.md
>
>
>
>
>
>
>
> —
> Sent from Mailbox
>
> On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel
> <gu...@exensa.com> wrote:
>
> Hi,
> I have an idea that I would like to discuss with the Spark devs. The
> idea comes from a very real problem that I have struggled with since
> almost a year. My problem is very simple, it's a dense matrix * sparse
> matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
> divided in X large blocks (one block per partition), and a sparse matrix
> RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
> most efficient way to perform the operation is to collectAsMap() the
> dense matrix and broadcast it, then perform the block-local
> mutliplications, and combine the results by column.
> This is quite fine, unless the matrix is too big to fit in memory
> (especially since the multiplication is performed several times
> iteratively, and the broadcasts are not always cleaned from memory as I
> would naively expect).
> When the dense matrix is too big, a second solution is to split the big
> sparse matrix in several RDD, and do several broadcasts. Doing this
> creates quite a big overhead, but it mostly works, even though I often
> face some problems with unaccessible broadcast files, for instance.
> Then there is the terrible but apparently very effective good old join.
> Since X blocks of the sparse matrix use the same block from the dense
> matrix, I suspect that the dense matrix is somehow replicated X times
> (either on disk or in the network), which is the reason why the join
> takes so much time.
> After this bit of a context, here is my idea : would it be possible to
> somehow "broadcast" (or maybe more accurately, share or serve) a
> persisted RDD which is distributed on all workers, in a way that would,
> a bit like the IndexedRDD, allow a task to access a partition or an
> element of a partition in the closure, with a worker-local memory cache
> . i.e. the information about where each block resides would be
> distributed on the workers, to allow them to access parts of the RDD
> directly. I think that's already a bit how RDD are shuffled ?
> The RDD could stay distributed (no need to collect then broadcast), and
> only necessary transfers would be required.
> Is this a bad idea, is it already implemented somewhere (I would love it
> !) ?or is it something that could add efficiency not only for my use
> case, but maybe for others ? Could someone give me some hint about how I
> could add this possibility to Spark ? I would probably try to extend a
> RDD into a specific SharedIndexedRDD with a special lookup that would be
> allowed from tasks as a special case, and that would try to contact the
> blockManager and reach the corresponding data from the right worker.
> Thanks in advance for your advices
> Guillaume
> --
> eXenSa
> 	
> *Guillaume PITEL, Président*
> +33(0)626 222 431
> eXenSa S.A.S. <http://www.exensa.com/>
> 41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>
>
>
> --
> Guillaume PITEL, Président
> +33(0)626 222 431
>
> eXenSa S.A.S.
> 41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>
>
> --
> Guillaume PITEL, Président
> +33(0)626 222 431
>
> eXenSa S.A.S.
> 41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Directly broadcasting (sort of) RDDs

Posted by Guillaume Pitel <gu...@exensa.com>.
Not far, but not exactly. The RDD could be too big to fit in memory,

The idea is more like a worker-side rdd.lookup() with local cache.

Guillaume
> In a sentence, is this the idea of collecting an RDD to memory on each
> executor directly?
>
> On Sun, Mar 22, 2015 at 10:56 PM, Sandy Ryza <sa...@cloudera.com> wrote:
>> Hi Guillaume,
>>
>> I've long thought something like this would be useful - i.e. the ability to
>> broadcast RDDs directly without first pulling data through the driver.  If I
>> understand correctly, your requirement to "block" a matrix up and only fetch
>> the needed parts could be implemented on top of this by splitting an RDD
>> into a set of smaller RDDs and then broadcasting each one on its own.
>>
>> Unfortunately nobody is working on this currently (and I couldn't promise to
>> have bandwidth to review it at the moment either), but I suspect we'll
>> eventually need to add something like this for map joins in Hive on Spark
>> and Spark SQL.
>>
>> -Sandy
>>
>>
>>
>> On Sat, Mar 21, 2015 at 3:11 AM, Guillaume Pitel
>> <gu...@exensa.com> wrote:
>>> Hi,
>>>
>>> Thanks for your answer. This is precisely the use case I'm interested in,
>>> but I know it already, I should have mentionned it. Unfortunately this
>>> implementation of BlockMatrix has (in my opinion) some disadvantages (the
>>> fact that it split the matrix by range instead of using a modulo is bad for
>>> block skewness). Besides, and more importantly, as I was writing, it uses
>>> the join solution (actually a cogroup :
>>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala,
>>> line 361). The reduplication of the elements of the dense matrix is thus
>>> dependent on the block size.
>>>
>>> Actually I'm wondering if what I want to achieve could be made with a
>>> simple modification to the join, allowing a partition to be weakly cached
>>> wafter being retrieved.
>>>
>>> Guillaume
>>>
>>>
>>> There is block matrix in Spark 1.3 -
>>> http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix
>>>
>>>
>>>
>>>
>>>
>>> However I believe it only supports dense matrix blocks.
>>>
>>>
>>>
>>>
>>> Still, might be possible to use it or exetend
>>>
>>>
>>>
>>>
>>> JIRAs:
>>>
>>>
>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434
>>>
>>>
>>>
>>>
>>>
>>> Was based on
>>>
>>>
>>> https://github.com/amplab/ml-matrix
>>>
>>>
>>>
>>>
>>>
>>> Another lib:
>>>
>>>
>>> https://github.com/PasaLab/marlin/blob/master/README.md
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> —
>>> Sent from Mailbox
>>>
>>> On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel
>>> <gu...@exensa.com> wrote:
>>>
>>> Hi,
>>> I have an idea that I would like to discuss with the Spark devs. The
>>> idea comes from a very real problem that I have struggled with since
>>> almost a year. My problem is very simple, it's a dense matrix * sparse
>>> matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
>>> divided in X large blocks (one block per partition), and a sparse matrix
>>> RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
>>> most efficient way to perform the operation is to collectAsMap() the
>>> dense matrix and broadcast it, then perform the block-local
>>> mutliplications, and combine the results by column.
>>> This is quite fine, unless the matrix is too big to fit in memory
>>> (especially since the multiplication is performed several times
>>> iteratively, and the broadcasts are not always cleaned from memory as I
>>> would naively expect).
>>> When the dense matrix is too big, a second solution is to split the big
>>> sparse matrix in several RDD, and do several broadcasts. Doing this
>>> creates quite a big overhead, but it mostly works, even though I often
>>> face some problems with unaccessible broadcast files, for instance.
>>> Then there is the terrible but apparently very effective good old join.
>>> Since X blocks of the sparse matrix use the same block from the dense
>>> matrix, I suspect that the dense matrix is somehow replicated X times
>>> (either on disk or in the network), which is the reason why the join
>>> takes so much time.
>>> After this bit of a context, here is my idea : would it be possible to
>>> somehow "broadcast" (or maybe more accurately, share or serve) a
>>> persisted RDD which is distributed on all workers, in a way that would,
>>> a bit like the IndexedRDD, allow a task to access a partition or an
>>> element of a partition in the closure, with a worker-local memory cache
>>> . i.e. the information about where each block resides would be
>>> distributed on the workers, to allow them to access parts of the RDD
>>> directly. I think that's already a bit how RDD are shuffled ?
>>> The RDD could stay distributed (no need to collect then broadcast), and
>>> only necessary transfers would be required.
>>> Is this a bad idea, is it already implemented somewhere (I would love it
>>> !) ?or is it something that could add efficiency not only for my use
>>> case, but maybe for others ? Could someone give me some hint about how I
>>> could add this possibility to Spark ? I would probably try to extend a
>>> RDD into a specific SharedIndexedRDD with a special lookup that would be
>>> allowed from tasks as a special case, and that would try to contact the
>>> blockManager and reach the corresponding data from the right worker.
>>> Thanks in advance for your advices
>>> Guillaume
>>> --
>>> eXenSa
>>> 	
>>> *Guillaume PITEL, Président*
>>> +33(0)626 222 431
>>> eXenSa S.A.S. <http://www.exensa.com/>
>>> 41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>>>
>>>
>>>
>>> --
>>> Guillaume PITEL, Président
>>> +33(0)626 222 431
>>>
>>> eXenSa S.A.S.
>>> 41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>


-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705


Re: Directly broadcasting (sort of) RDDs

Posted by Sean Owen <so...@cloudera.com>.
In a sentence, is this the idea of collecting an RDD to memory on each
executor directly?

On Sun, Mar 22, 2015 at 10:56 PM, Sandy Ryza <sa...@cloudera.com> wrote:
> Hi Guillaume,
>
> I've long thought something like this would be useful - i.e. the ability to
> broadcast RDDs directly without first pulling data through the driver.  If I
> understand correctly, your requirement to "block" a matrix up and only fetch
> the needed parts could be implemented on top of this by splitting an RDD
> into a set of smaller RDDs and then broadcasting each one on its own.
>
> Unfortunately nobody is working on this currently (and I couldn't promise to
> have bandwidth to review it at the moment either), but I suspect we'll
> eventually need to add something like this for map joins in Hive on Spark
> and Spark SQL.
>
> -Sandy
>
>
>
> On Sat, Mar 21, 2015 at 3:11 AM, Guillaume Pitel
> <gu...@exensa.com> wrote:
>>
>> Hi,
>>
>> Thanks for your answer. This is precisely the use case I'm interested in,
>> but I know it already, I should have mentionned it. Unfortunately this
>> implementation of BlockMatrix has (in my opinion) some disadvantages (the
>> fact that it split the matrix by range instead of using a modulo is bad for
>> block skewness). Besides, and more importantly, as I was writing, it uses
>> the join solution (actually a cogroup :
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala,
>> line 361). The reduplication of the elements of the dense matrix is thus
>> dependent on the block size.
>>
>> Actually I'm wondering if what I want to achieve could be made with a
>> simple modification to the join, allowing a partition to be weakly cached
>> wafter being retrieved.
>>
>> Guillaume
>>
>>
>> There is block matrix in Spark 1.3 -
>> http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix
>>
>>
>>
>>
>>
>> However I believe it only supports dense matrix blocks.
>>
>>
>>
>>
>> Still, might be possible to use it or exetend
>>
>>
>>
>>
>> JIRAs:
>>
>>
>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434
>>
>>
>>
>>
>>
>> Was based on
>>
>>
>> https://github.com/amplab/ml-matrix
>>
>>
>>
>>
>>
>> Another lib:
>>
>>
>> https://github.com/PasaLab/marlin/blob/master/README.md
>>
>>
>>
>>
>>
>>
>>
>> —
>> Sent from Mailbox
>>
>> On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel
>> <gu...@exensa.com> wrote:
>>
>> Hi,
>> I have an idea that I would like to discuss with the Spark devs. The
>> idea comes from a very real problem that I have struggled with since
>> almost a year. My problem is very simple, it's a dense matrix * sparse
>> matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
>> divided in X large blocks (one block per partition), and a sparse matrix
>> RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
>> most efficient way to perform the operation is to collectAsMap() the
>> dense matrix and broadcast it, then perform the block-local
>> mutliplications, and combine the results by column.
>> This is quite fine, unless the matrix is too big to fit in memory
>> (especially since the multiplication is performed several times
>> iteratively, and the broadcasts are not always cleaned from memory as I
>> would naively expect).
>> When the dense matrix is too big, a second solution is to split the big
>> sparse matrix in several RDD, and do several broadcasts. Doing this
>> creates quite a big overhead, but it mostly works, even though I often
>> face some problems with unaccessible broadcast files, for instance.
>> Then there is the terrible but apparently very effective good old join.
>> Since X blocks of the sparse matrix use the same block from the dense
>> matrix, I suspect that the dense matrix is somehow replicated X times
>> (either on disk or in the network), which is the reason why the join
>> takes so much time.
>> After this bit of a context, here is my idea : would it be possible to
>> somehow "broadcast" (or maybe more accurately, share or serve) a
>> persisted RDD which is distributed on all workers, in a way that would,
>> a bit like the IndexedRDD, allow a task to access a partition or an
>> element of a partition in the closure, with a worker-local memory cache
>> . i.e. the information about where each block resides would be
>> distributed on the workers, to allow them to access parts of the RDD
>> directly. I think that's already a bit how RDD are shuffled ?
>> The RDD could stay distributed (no need to collect then broadcast), and
>> only necessary transfers would be required.
>> Is this a bad idea, is it already implemented somewhere (I would love it
>> !) ?or is it something that could add efficiency not only for my use
>> case, but maybe for others ? Could someone give me some hint about how I
>> could add this possibility to Spark ? I would probably try to extend a
>> RDD into a specific SharedIndexedRDD with a special lookup that would be
>> allowed from tasks as a special case, and that would try to contact the
>> blockManager and reach the corresponding data from the right worker.
>> Thanks in advance for your advices
>> Guillaume
>> --
>> eXenSa
>> 	
>> *Guillaume PITEL, Président*
>> +33(0)626 222 431
>> eXenSa S.A.S. <http://www.exensa.com/>
>> 41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>>
>>
>>
>> --
>> Guillaume PITEL, Président
>> +33(0)626 222 431
>>
>> eXenSa S.A.S.
>> 41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Directly broadcasting (sort of) RDDs

Posted by Sandy Ryza <sa...@cloudera.com>.
Hi Guillaume,

I've long thought something like this would be useful - i.e. the ability to
broadcast RDDs directly without first pulling data through the driver.  If
I understand correctly, your requirement to "block" a matrix up and only
fetch the needed parts could be implemented on top of this by splitting an
RDD into a set of smaller RDDs and then broadcasting each one on its own.

Unfortunately nobody is working on this currently (and I couldn't promise
to have bandwidth to review it at the moment either), but I suspect we'll
eventually need to add something like this for map joins in Hive on Spark
and Spark SQL.

-Sandy



On Sat, Mar 21, 2015 at 3:11 AM, Guillaume Pitel <guillaume.pitel@exensa.com
> wrote:

>  Hi,
>
> Thanks for your answer. This is precisely the use case I'm interested in,
> but I know it already, I should have mentionned it. Unfortunately this
> implementation of BlockMatrix has (in my opinion) some disadvantages (the
> fact that it split the matrix by range instead of using a modulo is bad for
> block skewness). Besides, and more importantly, as I was writing, it uses
> the join solution (actually a cogroup :
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala,
> line 361). The reduplication of the elements of the dense matrix is thus
> dependent on the block size.
>
> Actually I'm wondering if what I want to achieve could be made with a
> simple modification to the join, allowing a partition to be weakly cached
> wafter being retrieved.
>
> Guillaume
>
>
>  There is block matrix in Spark 1.3 - http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix
>
>
>
>
>
> However I believe it only supports dense matrix blocks.
>
>
>
>
> Still, might be possible to use it or exetend
>
>
>
>
> JIRAs:
>
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434
>
>
>
>
>
> Was based on
>
> https://github.com/amplab/ml-matrix
>
>
>
>
>
> Another lib:
>
> https://github.com/PasaLab/marlin/blob/master/README.md
>
>
>
>
>
>
>
> —
> Sent from Mailbox
>
> On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel<gu...@exensa.com> <gu...@exensa.com> wrote:
>
>
>  Hi,
> I have an idea that I would like to discuss with the Spark devs. The
> idea comes from a very real problem that I have struggled with since
> almost a year. My problem is very simple, it's a dense matrix * sparse
> matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
> divided in X large blocks (one block per partition), and a sparse matrix
> RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
> most efficient way to perform the operation is to collectAsMap() the
> dense matrix and broadcast it, then perform the block-local
> mutliplications, and combine the results by column.
> This is quite fine, unless the matrix is too big to fit in memory
> (especially since the multiplication is performed several times
> iteratively, and the broadcasts are not always cleaned from memory as I
> would naively expect).
> When the dense matrix is too big, a second solution is to split the big
> sparse matrix in several RDD, and do several broadcasts. Doing this
> creates quite a big overhead, but it mostly works, even though I often
> face some problems with unaccessible broadcast files, for instance.
> Then there is the terrible but apparently very effective good old join.
> Since X blocks of the sparse matrix use the same block from the dense
> matrix, I suspect that the dense matrix is somehow replicated X times
> (either on disk or in the network), which is the reason why the join
> takes so much time.
> After this bit of a context, here is my idea : would it be possible to
> somehow "broadcast" (or maybe more accurately, share or serve) a
> persisted RDD which is distributed on all workers, in a way that would,
> a bit like the IndexedRDD, allow a task to access a partition or an
> element of a partition in the closure, with a worker-local memory cache
> . i.e. the information about where each block resides would be
> distributed on the workers, to allow them to access parts of the RDD
> directly. I think that's already a bit how RDD are shuffled ?
> The RDD could stay distributed (no need to collect then broadcast), and
> only necessary transfers would be required.
> Is this a bad idea, is it already implemented somewhere (I would love it
> !) ?or is it something that could add efficiency not only for my use
> case, but maybe for others ? Could someone give me some hint about how I
> could add this possibility to Spark ? I would probably try to extend a
> RDD into a specific SharedIndexedRDD with a special lookup that would be
> allowed from tasks as a special case, and that would try to contact the
> blockManager and reach the corresponding data from the right worker.
> Thanks in advance for your advices
> Guillaume
> --
> eXenSa
> 	
> *Guillaume PITEL, Président*
> +33(0)626 222 431
> eXenSa S.A.S. <http://www.exensa.com/> <http://www.exensa.com/>
> 41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>
>
>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)626 222 431
>
> eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>

Re: Directly broadcasting (sort of) RDDs

Posted by Guillaume Pitel <gu...@exensa.com>.
Hi,

Thanks for your answer. This is precisely the use case I'm interested 
in, but I know it already, I should have mentionned it. Unfortunately 
this implementation of BlockMatrix has (in my opinion) some 
disadvantages (the fact that it split the matrix by range instead of 
using a modulo is bad for block skewness). Besides, and more 
importantly, as I was writing, it uses the join solution (actually a 
cogroup : 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala, 
line 361). The reduplication of the elements of the dense matrix is thus 
dependent on the block size.

Actually I'm wondering if what I want to achieve could be made with a 
simple modification to the join, allowing a partition to be weakly 
cached wafter being retrieved.

Guillaume


> There is block matrix in Spark 1.3 - http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix
>
>
>
>
>
> However I believe it only supports dense matrix blocks.
>
>
>
>
> Still, might be possible to use it or exetend
>
>
>
>
> JIRAs:
>
>
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434
>
>
>
>
>
> Was based on
>
>
> https://github.com/amplab/ml-matrix
>
>
>
>
>
> Another lib:
>
>
> https://github.com/PasaLab/marlin/blob/master/README.md
>
>
>
>
>
>
>
> —
> Sent from Mailbox
>
> On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel
> <gu...@exensa.com> wrote:
>
>> Hi,
>> I have an idea that I would like to discuss with the Spark devs. The
>> idea comes from a very real problem that I have struggled with since
>> almost a year. My problem is very simple, it's a dense matrix * sparse
>> matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
>> divided in X large blocks (one block per partition), and a sparse matrix
>> RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
>> most efficient way to perform the operation is to collectAsMap() the
>> dense matrix and broadcast it, then perform the block-local
>> mutliplications, and combine the results by column.
>> This is quite fine, unless the matrix is too big to fit in memory
>> (especially since the multiplication is performed several times
>> iteratively, and the broadcasts are not always cleaned from memory as I
>> would naively expect).
>> When the dense matrix is too big, a second solution is to split the big
>> sparse matrix in several RDD, and do several broadcasts. Doing this
>> creates quite a big overhead, but it mostly works, even though I often
>> face some problems with unaccessible broadcast files, for instance.
>> Then there is the terrible but apparently very effective good old join.
>> Since X blocks of the sparse matrix use the same block from the dense
>> matrix, I suspect that the dense matrix is somehow replicated X times
>> (either on disk or in the network), which is the reason why the join
>> takes so much time.
>> After this bit of a context, here is my idea : would it be possible to
>> somehow "broadcast" (or maybe more accurately, share or serve) a
>> persisted RDD which is distributed on all workers, in a way that would,
>> a bit like the IndexedRDD, allow a task to access a partition or an
>> element of a partition in the closure, with a worker-local memory cache
>> . i.e. the information about where each block resides would be
>> distributed on the workers, to allow them to access parts of the RDD
>> directly. I think that's already a bit how RDD are shuffled ?
>> The RDD could stay distributed (no need to collect then broadcast), and
>> only necessary transfers would be required.
>> Is this a bad idea, is it already implemented somewhere (I would love it
>> !) ?or is it something that could add efficiency not only for my use
>> case, but maybe for others ? Could someone give me some hint about how I
>> could add this possibility to Spark ? I would probably try to extend a
>> RDD into a specific SharedIndexedRDD with a special lookup that would be
>> allowed from tasks as a special case, and that would try to contact the
>> blockManager and reach the corresponding data from the right worker.
>> Thanks in advance for your advices
>> Guillaume
>> -- 
>> eXenSa
>> 	
>> *Guillaume PITEL, Président*
>> +33(0)626 222 431
>> eXenSa S.A.S. <http://www.exensa.com/>
>> 41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705


-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705


Re: Directly broadcasting (sort of) RDDs

Posted by Nick Pentreath <ni...@gmail.com>.
There is block matrix in Spark 1.3 - http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix





However I believe it only supports dense matrix blocks.




Still, might be possible to use it or exetend 




JIRAs:


https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434





Was based on 


https://github.com/amplab/ml-matrix





Another lib:


https://github.com/PasaLab/marlin/blob/master/README.md







—
Sent from Mailbox

On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel
<gu...@exensa.com> wrote:

> Hi,
> I have an idea that I would like to discuss with the Spark devs. The 
> idea comes from a very real problem that I have struggled with since 
> almost a year. My problem is very simple, it's a dense matrix * sparse 
> matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is 
> divided in X large blocks (one block per partition), and a sparse matrix 
> RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The 
> most efficient way to perform the operation is to collectAsMap() the 
> dense matrix and broadcast it, then perform the block-local 
> mutliplications, and combine the results by column.
> This is quite fine, unless the matrix is too big to fit in memory 
> (especially since the multiplication is performed several times 
> iteratively, and the broadcasts are not always cleaned from memory as I 
> would naively expect).
> When the dense matrix is too big, a second solution is to split the big 
> sparse matrix in several RDD, and do several broadcasts. Doing this 
> creates quite a big overhead, but it mostly works, even though I often 
> face some problems with unaccessible broadcast files, for instance.
> Then there is the terrible but apparently very effective good old join. 
> Since X blocks of the sparse matrix use the same block from the dense 
> matrix, I suspect that the dense matrix is somehow replicated X times 
> (either on disk or in the network), which is the reason why the join 
> takes so much time.
> After this bit of a context, here is my idea : would it be possible to 
> somehow "broadcast" (or maybe more accurately, share or serve) a 
> persisted RDD which is distributed on all workers, in a way that would, 
> a bit like the IndexedRDD, allow a task to access a partition or an 
> element of a partition in the closure, with a worker-local memory cache 
> . i.e. the information about where each block resides would be 
> distributed on the workers, to allow them to access parts of the RDD 
> directly. I think that's already a bit how RDD are shuffled ?
> The RDD could stay distributed (no need to collect then broadcast), and 
> only necessary transfers would be required.
> Is this a bad idea, is it already implemented somewhere (I would love it 
> !) ?or is it something that could add efficiency not only for my use 
> case, but maybe for others ? Could someone give me some hint about how I 
> could add this possibility to Spark ? I would probably try to extend a 
> RDD into a specific SharedIndexedRDD with a special lookup that would be 
> allowed from tasks as a special case, and that would try to contact the 
> blockManager and reach the corresponding data from the right worker.
> Thanks in advance for your advices
> Guillaume
> -- 
> eXenSa
> 	
> *Guillaume PITEL, Président*
> +33(0)626 222 431
> eXenSa S.A.S. <http://www.exensa.com/>
> 41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705