You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mahout.apache.org by Gokhan Capan <gk...@gmail.com> on 2014/11/08 21:42:11 UTC

SGD Implementation and Questions for mapBlock like functionality

Hi,

Based on Zinkevich et al.'s Parallelized Stochastic Gradient paper (
http://martin.zinkevich.org/publications/nips2010.pdf), I tried to
implement SGD, and a regularized least squares solution for linear
regression (can easily be extended to other GLMs, too).

How the algorithm works is as follows:
1. Split data into partitions of T examples
2. in parallel, for each partition:
   2.0. shuffle partition
   2.1. initialize parameter vector
   2.2. for each example in the shuffled partition
       2.2.1 update the parameter vector
3. Aggregate all the parameter vectors and return

Here is an initial implementation to illustrate where I am stuck:
https://github.com/gcapan/mahout/compare/optimization

(See TODO in SGD.minimizeWithSgd[K])

I was thinking that using a blockified matrix of training instances, step 2
of the algorithm can run on blocks, and they can be aggregated in
client-side. However, the only operator that I know in the DSL is mapBlock,
and it requires the BlockMapFunction to map a block to another block of the
same row size. In this context, I want to map a block (numRows x n) to the
parameter vector of size n.

The question is:
1- Is it possible to easily implement the above algorithm using DSL's
current functionality? Could you tell me what I'm missing?
2- If there is not an easy way other than using the currently-non-existing
mapBlock-like method, shall we add such an operator?

Best,

Gokhan

RE: SGD Implementation and Questions for mapBlock like functionality

Posted by Andrew Palumbo <ap...@outlook.com>.
Yeah- I don't think this can be easily and efficiently implented in the DSL as is.  You'd have to iterativly rowBind all of the vectors returned by minimizePartial(...)  onto a (k x n) matrix within your mapBlock(Matrix(m x n)) bmf where k is the number of blocks and m is the number of (total) observations.  mapBlock(...) requires that you return a matrix with the same number of rows.  So with k!=m this is difficult to do in a straightforward way.

What about implementing it sequentially in the pure DSL and then extending and  overriding the higher order function (to calculate the bVector at each iteration) to the spark module and like Pat said use the Spark operations?

> Subject: Re: SGD Implementation and Questions for mapBlock like functionality
> From: pat@occamsmachete.com
> Date: Tue, 11 Nov 2014 09:54:52 -0800
> To: dev@mahout.apache.org
> 
> Still not sure what you need but if mapBlock and broadcast vals aren’t enough you’ll have to look at Spark’s available operations like join, reduce, etc. As well as the Spark accumulators. None of these have been made generic enough for the DSL yet AFAIK. I use accumulators in Spark specific code but that doesn’t need to be reflected in the DSL. You’ll have to decide if the new ops you need are worth putting in the DSL or just leaving in your engine-specific implementation.
>  
> On Nov 10, 2014, at 10:47 AM, Gokhan Capan <gk...@gmail.com> wrote:
> 
> Well, in that specific case, I will accumulate in the client side,
> collection of the intermediate parameters is not that big (numBlocks x
> X.ncol). What I need is just mapping (keys, block) to a vector (currently,
> a mapBlock has to map the block to the new block)
> 
> From a general perspective, you are right, this is an accumulation.
> 
> Gokhan
> 
> On Mon, Nov 10, 2014 at 8:26 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:
> 
> > Do you need a reduce or could you use an accumulator? Either is not really
> > supported in the DSL but clearly these are required for certain algos.
> > Broadcast vals supported but are read only.
> > 
> > On Nov 8, 2014, at 12:42 PM, Gokhan Capan <gk...@gmail.com> wrote:
> > 
> > Hi,
> > 
> > Based on Zinkevich et al.'s Parallelized Stochastic Gradient paper (
> > http://martin.zinkevich.org/publications/nips2010.pdf), I tried to
> > implement SGD, and a regularized least squares solution for linear
> > regression (can easily be extended to other GLMs, too).
> > 
> > How the algorithm works is as follows:
> > 1. Split data into partitions of T examples
> > 2. in parallel, for each partition:
> >  2.0. shuffle partition
> >  2.1. initialize parameter vector
> >  2.2. for each example in the shuffled partition
> >      2.2.1 update the parameter vector
> > 3. Aggregate all the parameter vectors and return
> > 
> > Here is an initial implementation to illustrate where I am stuck:
> > https://github.com/gcapan/mahout/compare/optimization
> > 
> > (See TODO in SGD.minimizeWithSgd[K])
> > 
> > I was thinking that using a blockified matrix of training instances, step 2
> > of the algorithm can run on blocks, and they can be aggregated in
> > client-side. However, the only operator that I know in the DSL is mapBlock,
> > and it requires the BlockMapFunction to map a block to another block of the
> > same row size. In this context, I want to map a block (numRows x n) to the
> > parameter vector of size n.
> > 
> > The question is:
> > 1- Is it possible to easily implement the above algorithm using DSL's
> > current functionality? Could you tell me what I'm missing?
> > 2- If there is not an easy way other than using the currently-non-existing
> > mapBlock-like method, shall we add such an operator?
> > 
> > Best,
> > 
> > Gokhan
> > 
> > 
> 
 		 	   		  

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Pat Ferrel <pa...@occamsmachete.com>.
Still not sure what you need but if mapBlock and broadcast vals aren’t enough you’ll have to look at Spark’s available operations like join, reduce, etc. As well as the Spark accumulators. None of these have been made generic enough for the DSL yet AFAIK. I use accumulators in Spark specific code but that doesn’t need to be reflected in the DSL. You’ll have to decide if the new ops you need are worth putting in the DSL or just leaving in your engine-specific implementation.
 
On Nov 10, 2014, at 10:47 AM, Gokhan Capan <gk...@gmail.com> wrote:

Well, in that specific case, I will accumulate in the client side,
collection of the intermediate parameters is not that big (numBlocks x
X.ncol). What I need is just mapping (keys, block) to a vector (currently,
a mapBlock has to map the block to the new block)

From a general perspective, you are right, this is an accumulation.

Gokhan

On Mon, Nov 10, 2014 at 8:26 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:

> Do you need a reduce or could you use an accumulator? Either is not really
> supported in the DSL but clearly these are required for certain algos.
> Broadcast vals supported but are read only.
> 
> On Nov 8, 2014, at 12:42 PM, Gokhan Capan <gk...@gmail.com> wrote:
> 
> Hi,
> 
> Based on Zinkevich et al.'s Parallelized Stochastic Gradient paper (
> http://martin.zinkevich.org/publications/nips2010.pdf), I tried to
> implement SGD, and a regularized least squares solution for linear
> regression (can easily be extended to other GLMs, too).
> 
> How the algorithm works is as follows:
> 1. Split data into partitions of T examples
> 2. in parallel, for each partition:
>  2.0. shuffle partition
>  2.1. initialize parameter vector
>  2.2. for each example in the shuffled partition
>      2.2.1 update the parameter vector
> 3. Aggregate all the parameter vectors and return
> 
> Here is an initial implementation to illustrate where I am stuck:
> https://github.com/gcapan/mahout/compare/optimization
> 
> (See TODO in SGD.minimizeWithSgd[K])
> 
> I was thinking that using a blockified matrix of training instances, step 2
> of the algorithm can run on blocks, and they can be aggregated in
> client-side. However, the only operator that I know in the DSL is mapBlock,
> and it requires the BlockMapFunction to map a block to another block of the
> same row size. In this context, I want to map a block (numRows x n) to the
> parameter vector of size n.
> 
> The question is:
> 1- Is it possible to easily implement the above algorithm using DSL's
> current functionality? Could you tell me what I'm missing?
> 2- If there is not an easy way other than using the currently-non-existing
> mapBlock-like method, shall we add such an operator?
> 
> Best,
> 
> Gokhan
> 
> 


Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Gokhan Capan <gk...@gmail.com>.
Well, in that specific case, I will accumulate in the client side,
collection of the intermediate parameters is not that big (numBlocks x
X.ncol). What I need is just mapping (keys, block) to a vector (currently,
a mapBlock has to map the block to the new block)

>From a general perspective, you are right, this is an accumulation.

Gokhan

On Mon, Nov 10, 2014 at 8:26 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:

> Do you need a reduce or could you use an accumulator? Either is not really
> supported in the DSL but clearly these are required for certain algos.
> Broadcast vals supported but are read only.
>
> On Nov 8, 2014, at 12:42 PM, Gokhan Capan <gk...@gmail.com> wrote:
>
> Hi,
>
> Based on Zinkevich et al.'s Parallelized Stochastic Gradient paper (
> http://martin.zinkevich.org/publications/nips2010.pdf), I tried to
> implement SGD, and a regularized least squares solution for linear
> regression (can easily be extended to other GLMs, too).
>
> How the algorithm works is as follows:
> 1. Split data into partitions of T examples
> 2. in parallel, for each partition:
>   2.0. shuffle partition
>   2.1. initialize parameter vector
>   2.2. for each example in the shuffled partition
>       2.2.1 update the parameter vector
> 3. Aggregate all the parameter vectors and return
>
> Here is an initial implementation to illustrate where I am stuck:
> https://github.com/gcapan/mahout/compare/optimization
>
> (See TODO in SGD.minimizeWithSgd[K])
>
> I was thinking that using a blockified matrix of training instances, step 2
> of the algorithm can run on blocks, and they can be aggregated in
> client-side. However, the only operator that I know in the DSL is mapBlock,
> and it requires the BlockMapFunction to map a block to another block of the
> same row size. In this context, I want to map a block (numRows x n) to the
> parameter vector of size n.
>
> The question is:
> 1- Is it possible to easily implement the above algorithm using DSL's
> current functionality? Could you tell me what I'm missing?
> 2- If there is not an easy way other than using the currently-non-existing
> mapBlock-like method, shall we add such an operator?
>
> Best,
>
> Gokhan
>
>

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Pat Ferrel <pa...@occamsmachete.com>.
Do you need a reduce or could you use an accumulator? Either is not really supported in the DSL but clearly these are required for certain algos. Broadcast vals supported but are read only. 

On Nov 8, 2014, at 12:42 PM, Gokhan Capan <gk...@gmail.com> wrote:

Hi,

Based on Zinkevich et al.'s Parallelized Stochastic Gradient paper (
http://martin.zinkevich.org/publications/nips2010.pdf), I tried to
implement SGD, and a regularized least squares solution for linear
regression (can easily be extended to other GLMs, too).

How the algorithm works is as follows:
1. Split data into partitions of T examples
2. in parallel, for each partition:
  2.0. shuffle partition
  2.1. initialize parameter vector
  2.2. for each example in the shuffled partition
      2.2.1 update the parameter vector
3. Aggregate all the parameter vectors and return

Here is an initial implementation to illustrate where I am stuck:
https://github.com/gcapan/mahout/compare/optimization

(See TODO in SGD.minimizeWithSgd[K])

I was thinking that using a blockified matrix of training instances, step 2
of the algorithm can run on blocks, and they can be aggregated in
client-side. However, the only operator that I know in the DSL is mapBlock,
and it requires the BlockMapFunction to map a block to another block of the
same row size. In this context, I want to map a block (numRows x n) to the
parameter vector of size n.

The question is:
1- Is it possible to easily implement the above algorithm using DSL's
current functionality? Could you tell me what I'm missing?
2- If there is not an easy way other than using the currently-non-existing
mapBlock-like method, shall we add such an operator?

Best,

Gokhan


Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
On Wed, Nov 12, 2014 at 2:04 PM, Ted Dunning <te...@gmail.com> wrote:

> On Wed, Nov 12, 2014 at 9:53 AM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
>
> > once we start mapping aggregate, there's no reason not to
> > map other engine specific capabilities, which are vast. At this point
> > dilemma is, no matter what we do we are losing coherency: if we map it
> all,
> > then other engines will have trouble supporting all of it. If we don't
> map
> > it all, then we are forcing capability reduction compared to what the
> > engine actually can do.
> >
> > It is obvious to me that all-reduce aggregate will make a lot of sense --
> > even if it means math checkpoint. but then where do we stop in mapping
> > those. E.g. do we do fold? cartesian? And what is that true reason we are
> > remapping everything if it is already natively available? etc. etc. For
> > myself, I still haven't figured a good answer to those .
> >
>
> Actually, I disagree with the premise here.
>
> There *is* a reason not to map all other engine specific capabilities.
> That reason is we don't need them.  Yet.
>

First, i am, sort of, advocating the same thing. I am not in favor of
mapping anything beyond map. I am in favor of going to native capabilities
directly.

Second, actually "we don't need them" is not a good argument because if the
goal is to build an ML environment offering and not just a collection of ML
methodologies, then we must assume 3rd parties will (and in fact, do, on my
own behalf) build much more than there is ever planned or done in Mahout.
And such 3rd parties can attest to the fact that the issue of all-reduce
aggregation is indeed coming up every second full moon.

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Suneel Marthi <su...@gmail.com>.
Yep it is part of onlinesummarizer

Sent from my iPhone

> On Nov 12, 2014, at 2:23 PM, Ted Dunning <te...@gmail.com> wrote:
> 
>> On Wed, Nov 12, 2014 at 2:08 PM, Gokhan Capan <gk...@gmail.com> wrote:
>> 
>> Can we easily integrate t-digest for descriptives once we have block
>> aggregates? This might count one more reason.
> 
> Presumably.
> 
> T-digest is already in Mahout as part of the OnlineSummarizer.

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Ted Dunning <te...@gmail.com>.
On Wed, Nov 12, 2014 at 2:08 PM, Gokhan Capan <gk...@gmail.com> wrote:

> Can we easily integrate t-digest for descriptives once we have block
> aggregates? This might count one more reason.
>

Presumably.

T-digest is already in Mahout as part of the OnlineSummarizer.

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Gokhan Capan <gk...@gmail.com>.
Ted,

Can we easily integrate t-digest for descriptives once we have block
aggregates? This might count one more reason.

Gokhan

On Thu, Nov 13, 2014 at 12:04 AM, Ted Dunning <te...@gmail.com> wrote:

> On Wed, Nov 12, 2014 at 9:53 AM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
>
> > once we start mapping aggregate, there's no reason not to
> > map other engine specific capabilities, which are vast. At this point
> > dilemma is, no matter what we do we are losing coherency: if we map it
> all,
> > then other engines will have trouble supporting all of it. If we don't
> map
> > it all, then we are forcing capability reduction compared to what the
> > engine actually can do.
> >
> > It is obvious to me that all-reduce aggregate will make a lot of sense --
> > even if it means math checkpoint. but then where do we stop in mapping
> > those. E.g. do we do fold? cartesian? And what is that true reason we are
> > remapping everything if it is already natively available? etc. etc. For
> > myself, I still haven't figured a good answer to those .
> >
>
> Actually, I disagree with the premise here.
>
> There *is* a reason not to map all other engine specific capabilities.
> That reason is we don't need them.  Yet.
>
> So far, we *clearly* need some sort of block aggregate for a host of
> hog-wild sorts of algorithms.  That doesn't imply that we need all kinds of
> mapping aggregates.  It just means that we are clear on one need for now.
>
> So let's get this one in and see how far we can go.
>
> Also, having one kind of aggregation in the DSL does not restrict anyone
> from using engine specific capabilities.  It just means that one kind of
> idiom can be done without engine specificity.
>

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Ted Dunning <te...@gmail.com>.
On Wed, Nov 12, 2014 at 9:53 AM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> once we start mapping aggregate, there's no reason not to
> map other engine specific capabilities, which are vast. At this point
> dilemma is, no matter what we do we are losing coherency: if we map it all,
> then other engines will have trouble supporting all of it. If we don't map
> it all, then we are forcing capability reduction compared to what the
> engine actually can do.
>
> It is obvious to me that all-reduce aggregate will make a lot of sense --
> even if it means math checkpoint. but then where do we stop in mapping
> those. E.g. do we do fold? cartesian? And what is that true reason we are
> remapping everything if it is already natively available? etc. etc. For
> myself, I still haven't figured a good answer to those .
>

Actually, I disagree with the premise here.

There *is* a reason not to map all other engine specific capabilities.
That reason is we don't need them.  Yet.

So far, we *clearly* need some sort of block aggregate for a host of
hog-wild sorts of algorithms.  That doesn't imply that we need all kinds of
mapping aggregates.  It just means that we are clear on one need for now.

So let's get this one in and see how far we can go.

Also, having one kind of aggregation in the DSL does not restrict anyone
from using engine specific capabilities.  It just means that one kind of
idiom can be done without engine specificity.

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
i promise to make a review of this by next monday. I looked briefly and had
some suggestions, I think it might be ok. My only concern is what i have
already said -- once we start mapping aggregate, there's no reason not to
map other engine specific capabilities, which are vast. At this point
dilemma is, no matter what we do we are losing coherency: if we map it all,
then other engines will have trouble supporting all of it. If we don't map
it all, then we are forcing capability reduction compared to what the
engine actually can do.

It is obvious to me that all-reduce aggregate will make a lot of sense --
even if it means math checkpoint. but then where do we stop in mapping
those. E.g. do we do fold? cartesian? And what is that true reason we are
remapping everything if it is already natively available? etc. etc. For
myself, I still haven't figured a good answer to those .

On Tue, Nov 11, 2014 at 1:18 PM, Gokhan Capan <gk...@gmail.com> wrote:

> So the alternatives are:
>
> 1- mapBlock to a matrix whose all rows-but-the first are empty, then
> aggregate
> 2- depend on a backend
>
> 1 is obviously OK.
>
> I don't like the idea of depending on a backend since SGD is a generic loss
> minimization, on which other algorithms will possibly depend.
>
> In this context, client-side aggregation is not an overhead, but even if it
> happens to be so, it doesn't have to be a client-side aggregate at all.
>
> Alternative to 1, I am thinking of at least having an aggregation
> operation, which will return an accumulated value anyway, and shouldn't
> affect algebra optimizations.
>
> I quickly implemented a naive one (supporting only Spark- I know I said
> that I don't like depending on a backend, but at least the backends-wide
> interface is consistent, and as a client, I still don't have to deal with
> Spark primitives directly).
>
> Is this nice enough? Is it too bad to have in the DSL?
> https://github.com/gcapan/mahout/compare/accumulateblocks
>
> Best
>
> Gokhan
>
> On Tue, Nov 11, 2014 at 10:45 PM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
>
> > Oh. algorithm actually collects the vectors and runs another cycle in the
> > client!
> >
> > Still, technically, you can collect almost-empty blocks to the client
> > (since they are mostly empty, it won't cause THAT huge overhead compared
> to
> > collecting single vectors, after all, how many partitions are we talking
> > about? 1000? ).
> >
> > On Tue, Nov 11, 2014 at 12:41 PM, Dmitriy Lyubimov <dl...@gmail.com>
> > wrote:
> >
> >>
> >>
> >> On Sat, Nov 8, 2014 at 12:42 PM, Gokhan Capan <gk...@gmail.com>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> Based on Zinkevich et al.'s Parallelized Stochastic Gradient paper (
> >>> http://martin.zinkevich.org/publications/nips2010.pdf), I tried to
> >>> implement SGD, and a regularized least squares solution for linear
> >>> regression (can easily be extended to other GLMs, too).
> >>>
> >>> How the algorithm works is as follows:
> >>> 1. Split data into partitions of T examples
> >>> 2. in parallel, for each partition:
> >>>    2.0. shuffle partition
> >>>    2.1. initialize parameter vector
> >>>    2.2. for each example in the shuffled partition
> >>>        2.2.1 update the parameter vector
> >>> 3. Aggregate all the parameter vectors and return
> >>>
> >>
> >> I guess technically it is possible (transform each block to a
> >> SparseRowMatrix or SparseMatrix with only first valid row) and then
> invoke
> >> colSums() or colMeans() (whatever aggregate means).
> >>
> >> However, i am not sure it is worth the ugliness. isn't it easier to
> >> declare these things quasi-algebraic and just do direct spark calls on
> the
> >> matrix RDD (map, aggregate)?
> >>
> >> The real danger is to introduce non-algebra things into algebra so that
> >> the rest of the algebra doesn't optimize any more.
> >>
> >>
> >
>

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Gokhan Capan <gk...@gmail.com>.
Awesome.

So we are going to implement certain required DistributedOperations, in a
separate trait similar to, but other than the DistributedEngine.

I'll think about this a little more, and propose an initial implementation
that hopefully we can agree on.

Best,

Gokhan

On Thu, Nov 13, 2014 at 1:35 AM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> On Wed, Nov 12, 2014 at 1:44 PM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
>
> >
> >
> > On Wed, Nov 12, 2014 at 1:27 PM, Gokhan Capan <gk...@gmail.com> wrote:
> >
> >> My only concern is to add certain loss minimization tools for people to
> >> write machine learning algorithms.
> >>
> >> mapBlock as you suggested can work equally, but I happened to have
> >> implemented the aggregate op while thinking.
> >>
> >> Apart from this SGD implementation,
> >> blockify-a-matrix-and-run-an-operation-in-parallel-on-blocks is, I
> >> believe,
> >> certainly required, since block level parallelization is really common
> in
> >> matrix computations. Plus, if we are to add, say, a descriptive
> statistics
> >> package, that would require a similar functionality, too.
> >>
> >> If mapBlocks for passing custom operators was more flexible, I'd be more
> >> than happy, but I understand the idea behind its requirement of mapping
> >> should be block-to-block with the same row size.
> >>
> >> Could you give a little more detail on the 'common distributed strategy'
> >> idea?
> >>
> >
> the idea is simple.
>
> First, not use logical plan construction. In practice it means that while
> say "A.%*%(B)" create a logical plan element (which is subsequently run
> thru optimizer), something like aggregate(..) does not do that. Instead, it
> just produces ... whatever it produces, directly. So it doesn't form any
> new logical nor physical plan.
>
> Second, it means that we can define internal strategy trait, something like
> DistributedOperations, which will include this set of operations.
> Subsequently, we will define native implementations of this trait in the
> same way we defined some native stuff for DistributedEngine trait. (but
> don't make it part of DistributedEngine trait please -- maybe an attribute
> perhaps). At run time we will have to ask current engine to provide
> distributed operation implementation and delegate execution of common
> fragments to it .
>

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
On Wed, Nov 12, 2014 at 1:44 PM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

>
>
> On Wed, Nov 12, 2014 at 1:27 PM, Gokhan Capan <gk...@gmail.com> wrote:
>
>> My only concern is to add certain loss minimization tools for people to
>> write machine learning algorithms.
>>
>> mapBlock as you suggested can work equally, but I happened to have
>> implemented the aggregate op while thinking.
>>
>> Apart from this SGD implementation,
>> blockify-a-matrix-and-run-an-operation-in-parallel-on-blocks is, I
>> believe,
>> certainly required, since block level parallelization is really common in
>> matrix computations. Plus, if we are to add, say, a descriptive statistics
>> package, that would require a similar functionality, too.
>>
>> If mapBlocks for passing custom operators was more flexible, I'd be more
>> than happy, but I understand the idea behind its requirement of mapping
>> should be block-to-block with the same row size.
>>
>> Could you give a little more detail on the 'common distributed strategy'
>> idea?
>>
>
the idea is simple.

First, not use logical plan construction. In practice it means that while
say "A.%*%(B)" create a logical plan element (which is subsequently run
thru optimizer), something like aggregate(..) does not do that. Instead, it
just produces ... whatever it produces, directly. So it doesn't form any
new logical nor physical plan.

Second, it means that we can define internal strategy trait, something like
DistributedOperations, which will include this set of operations.
Subsequently, we will define native implementations of this trait in the
same way we defined some native stuff for DistributedEngine trait. (but
don't make it part of DistributedEngine trait please -- maybe an attribute
perhaps). At run time we will have to ask current engine to provide
distributed operation implementation and delegate execution of common
fragments to it .

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
On Wed, Nov 12, 2014 at 1:27 PM, Gokhan Capan <gk...@gmail.com> wrote:

> My only concern is to add certain loss minimization tools for people to
> write machine learning algorithms.
>
> mapBlock as you suggested can work equally, but I happened to have
> implemented the aggregate op while thinking.
>
> Apart from this SGD implementation,
> blockify-a-matrix-and-run-an-operation-in-parallel-on-blocks is, I believe,
> certainly required, since block level parallelization is really common in
> matrix computations. Plus, if we are to add, say, a descriptive statistics
> package, that would require a similar functionality, too.
>
> If mapBlocks for passing custom operators was more flexible, I'd be more
> than happy, but I understand the idea behind its requirement of mapping
> should be block-to-block with the same row size.
>
> Could you give a little more detail on the 'common distributed strategy'
> idea?
>
>
> Aside: Do we have certain elementwise Math functions in Matrix DSL? That
> is, how can I do this?
>
> 1 + exp(drmA)
>

if i understand is correctly, you mean you want to compute  a_i,j <- 1+
exp(a_i,j) ?

you can do simple things like 1 + drmA, that works.
to apply more interesting functions elementwise, there's no concise dsl
notation for that. but of course you can always use mapblock such as:

drmA.mapBlock() { case (keys,block) =>
  block := ((r,c,x) => 1+exp(x))
  keys -> block
}

i guess you can figure dsl shortcut for that indeed, as in R, by defining
package-level exp(DrmLike[K]) to expend to this fragment. But i guess then
you'd need to get on the entire set of synonyms for  scala.math functions,
really. Not entirely untreasonable idea IMO. although single mapblock will
probably be faster because it will not be a chain of composed closures.



>
>
>
>
> Gokhan
>
> On Wed, Nov 12, 2014 at 7:55 PM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
>
> > yes i usually follow #2 too.
> >
> > The thing is, pretty often algorithm can define its own set of strategies
> > the backend need to support (like this distributedEngine strategy) and
> keep
> > a lot of logic still common accross all strategies. But then if
> all-reduce
> > aggregate operation is incredibly common among such algorithm speicfic
> > strategies, then it stands to reason to implement it only once.
> >
> > I have an idea.
> >
> > Maybe we need a common distributed strategy which is different from
> > algebraic optimizer. That way we don't have to mess with algebraic
> > rewrites. how about that?
> >
> > On Wed, Nov 12, 2014 at 9:12 AM, Pat Ferrel <pa...@occamsmachete.com>
> wrote:
> >
> > > So you are following #2, which is good. #1 seems a bit like a hack.
> For a
> > > long time to come we will have to add things to the DSL if it is to be
> > kept
> > > engine independent. Yours looks pretty general and simple.
> > >
> > > Are you familiar with the existing Mahout aggregate methods? They show
> up
> > > in the SGDHelper.java and other places in legacy code. I don’t know
> much
> > > about them but they seem to be a pre-functional programming attempt at
> > this
> > > kind of thing. It looks like you are proposing a replacement for those
> > > based on rdd.aggregate, if so that would be very useful. For one thing
> it
> > > looks like the old aggregate was not parallel, rdd.aggregate is.
> > >
> > >
> > > On Nov 11, 2014, at 1:18 PM, Gokhan Capan <gk...@gmail.com> wrote:
> > >
> > > So the alternatives are:
> > >
> > > 1- mapBlock to a matrix whose all rows-but-the first are empty, then
> > > aggregate
> > > 2- depend on a backend
> > >
> > > 1 is obviously OK.
> > >
> > > I don't like the idea of depending on a backend since SGD is a generic
> > loss
> > > minimization, on which other algorithms will possibly depend.
> > >
> > > In this context, client-side aggregation is not an overhead, but even
> if
> > it
> > > happens to be so, it doesn't have to be a client-side aggregate at all.
> > >
> > > Alternative to 1, I am thinking of at least having an aggregation
> > > operation, which will return an accumulated value anyway, and shouldn't
> > > affect algebra optimizations.
> > >
> > > I quickly implemented a naive one (supporting only Spark- I know I said
> > > that I don't like depending on a backend, but at least the
> backends-wide
> > > interface is consistent, and as a client, I still don't have to deal
> with
> > > Spark primitives directly).
> > >
> > > Is this nice enough? Is it too bad to have in the DSL?
> > > https://github.com/gcapan/mahout/compare/accumulateblocks
> > >
> > > Best
> > >
> > > Gokhan
> > >
> > > On Tue, Nov 11, 2014 at 10:45 PM, Dmitriy Lyubimov <dl...@gmail.com>
> > > wrote:
> > >
> > > > Oh. algorithm actually collects the vectors and runs another cycle in
> > the
> > > > client!
> > > >
> > > > Still, technically, you can collect almost-empty blocks to the client
> > > > (since they are mostly empty, it won't cause THAT huge overhead
> > compared
> > > to
> > > > collecting single vectors, after all, how many partitions are we
> > talking
> > > > about? 1000? ).
> > > >
> > > > On Tue, Nov 11, 2014 at 12:41 PM, Dmitriy Lyubimov <
> dlieu.7@gmail.com>
> > > > wrote:
> > > >
> > > >>
> > > >>
> > > >> On Sat, Nov 8, 2014 at 12:42 PM, Gokhan Capan <gk...@gmail.com>
> > > wrote:
> > > >>
> > > >>> Hi,
> > > >>>
> > > >>> Based on Zinkevich et al.'s Parallelized Stochastic Gradient paper
> (
> > > >>> http://martin.zinkevich.org/publications/nips2010.pdf), I tried to
> > > >>> implement SGD, and a regularized least squares solution for linear
> > > >>> regression (can easily be extended to other GLMs, too).
> > > >>>
> > > >>> How the algorithm works is as follows:
> > > >>> 1. Split data into partitions of T examples
> > > >>> 2. in parallel, for each partition:
> > > >>>   2.0. shuffle partition
> > > >>>   2.1. initialize parameter vector
> > > >>>   2.2. for each example in the shuffled partition
> > > >>>       2.2.1 update the parameter vector
> > > >>> 3. Aggregate all the parameter vectors and return
> > > >>>
> > > >>
> > > >> I guess technically it is possible (transform each block to a
> > > >> SparseRowMatrix or SparseMatrix with only first valid row) and then
> > > invoke
> > > >> colSums() or colMeans() (whatever aggregate means).
> > > >>
> > > >> However, i am not sure it is worth the ugliness. isn't it easier to
> > > >> declare these things quasi-algebraic and just do direct spark calls
> on
> > > the
> > > >> matrix RDD (map, aggregate)?
> > > >>
> > > >> The real danger is to introduce non-algebra things into algebra so
> > that
> > > >> the rest of the algebra doesn't optimize any more.
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Gokhan Capan <gk...@gmail.com>.
My only concern is to add certain loss minimization tools for people to
write machine learning algorithms.

mapBlock as you suggested can work equally, but I happened to have
implemented the aggregate op while thinking.

Apart from this SGD implementation,
blockify-a-matrix-and-run-an-operation-in-parallel-on-blocks is, I believe,
certainly required, since block level parallelization is really common in
matrix computations. Plus, if we are to add, say, a descriptive statistics
package, that would require a similar functionality, too.

If mapBlocks for passing custom operators was more flexible, I'd be more
than happy, but I understand the idea behind its requirement of mapping
should be block-to-block with the same row size.

Could you give a little more detail on the 'common distributed strategy'
idea?


Aside: Do we have certain elementwise Math functions in Matrix DSL? That
is, how can I do this?

1 + exp(drmA)




Gokhan

On Wed, Nov 12, 2014 at 7:55 PM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> yes i usually follow #2 too.
>
> The thing is, pretty often algorithm can define its own set of strategies
> the backend need to support (like this distributedEngine strategy) and keep
> a lot of logic still common accross all strategies. But then if all-reduce
> aggregate operation is incredibly common among such algorithm speicfic
> strategies, then it stands to reason to implement it only once.
>
> I have an idea.
>
> Maybe we need a common distributed strategy which is different from
> algebraic optimizer. That way we don't have to mess with algebraic
> rewrites. how about that?
>
> On Wed, Nov 12, 2014 at 9:12 AM, Pat Ferrel <pa...@occamsmachete.com> wrote:
>
> > So you are following #2, which is good. #1 seems a bit like a hack. For a
> > long time to come we will have to add things to the DSL if it is to be
> kept
> > engine independent. Yours looks pretty general and simple.
> >
> > Are you familiar with the existing Mahout aggregate methods? They show up
> > in the SGDHelper.java and other places in legacy code. I don’t know much
> > about them but they seem to be a pre-functional programming attempt at
> this
> > kind of thing. It looks like you are proposing a replacement for those
> > based on rdd.aggregate, if so that would be very useful. For one thing it
> > looks like the old aggregate was not parallel, rdd.aggregate is.
> >
> >
> > On Nov 11, 2014, at 1:18 PM, Gokhan Capan <gk...@gmail.com> wrote:
> >
> > So the alternatives are:
> >
> > 1- mapBlock to a matrix whose all rows-but-the first are empty, then
> > aggregate
> > 2- depend on a backend
> >
> > 1 is obviously OK.
> >
> > I don't like the idea of depending on a backend since SGD is a generic
> loss
> > minimization, on which other algorithms will possibly depend.
> >
> > In this context, client-side aggregation is not an overhead, but even if
> it
> > happens to be so, it doesn't have to be a client-side aggregate at all.
> >
> > Alternative to 1, I am thinking of at least having an aggregation
> > operation, which will return an accumulated value anyway, and shouldn't
> > affect algebra optimizations.
> >
> > I quickly implemented a naive one (supporting only Spark- I know I said
> > that I don't like depending on a backend, but at least the backends-wide
> > interface is consistent, and as a client, I still don't have to deal with
> > Spark primitives directly).
> >
> > Is this nice enough? Is it too bad to have in the DSL?
> > https://github.com/gcapan/mahout/compare/accumulateblocks
> >
> > Best
> >
> > Gokhan
> >
> > On Tue, Nov 11, 2014 at 10:45 PM, Dmitriy Lyubimov <dl...@gmail.com>
> > wrote:
> >
> > > Oh. algorithm actually collects the vectors and runs another cycle in
> the
> > > client!
> > >
> > > Still, technically, you can collect almost-empty blocks to the client
> > > (since they are mostly empty, it won't cause THAT huge overhead
> compared
> > to
> > > collecting single vectors, after all, how many partitions are we
> talking
> > > about? 1000? ).
> > >
> > > On Tue, Nov 11, 2014 at 12:41 PM, Dmitriy Lyubimov <dl...@gmail.com>
> > > wrote:
> > >
> > >>
> > >>
> > >> On Sat, Nov 8, 2014 at 12:42 PM, Gokhan Capan <gk...@gmail.com>
> > wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> Based on Zinkevich et al.'s Parallelized Stochastic Gradient paper (
> > >>> http://martin.zinkevich.org/publications/nips2010.pdf), I tried to
> > >>> implement SGD, and a regularized least squares solution for linear
> > >>> regression (can easily be extended to other GLMs, too).
> > >>>
> > >>> How the algorithm works is as follows:
> > >>> 1. Split data into partitions of T examples
> > >>> 2. in parallel, for each partition:
> > >>>   2.0. shuffle partition
> > >>>   2.1. initialize parameter vector
> > >>>   2.2. for each example in the shuffled partition
> > >>>       2.2.1 update the parameter vector
> > >>> 3. Aggregate all the parameter vectors and return
> > >>>
> > >>
> > >> I guess technically it is possible (transform each block to a
> > >> SparseRowMatrix or SparseMatrix with only first valid row) and then
> > invoke
> > >> colSums() or colMeans() (whatever aggregate means).
> > >>
> > >> However, i am not sure it is worth the ugliness. isn't it easier to
> > >> declare these things quasi-algebraic and just do direct spark calls on
> > the
> > >> matrix RDD (map, aggregate)?
> > >>
> > >> The real danger is to introduce non-algebra things into algebra so
> that
> > >> the rest of the algebra doesn't optimize any more.
> > >>
> > >>
> > >
> >
> >
>

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
yes i usually follow #2 too.

The thing is, pretty often algorithm can define its own set of strategies
the backend need to support (like this distributedEngine strategy) and keep
a lot of logic still common accross all strategies. But then if all-reduce
aggregate operation is incredibly common among such algorithm speicfic
strategies, then it stands to reason to implement it only once.

I have an idea.

Maybe we need a common distributed strategy which is different from
algebraic optimizer. That way we don't have to mess with algebraic
rewrites. how about that?

On Wed, Nov 12, 2014 at 9:12 AM, Pat Ferrel <pa...@occamsmachete.com> wrote:

> So you are following #2, which is good. #1 seems a bit like a hack. For a
> long time to come we will have to add things to the DSL if it is to be kept
> engine independent. Yours looks pretty general and simple.
>
> Are you familiar with the existing Mahout aggregate methods? They show up
> in the SGDHelper.java and other places in legacy code. I don’t know much
> about them but they seem to be a pre-functional programming attempt at this
> kind of thing. It looks like you are proposing a replacement for those
> based on rdd.aggregate, if so that would be very useful. For one thing it
> looks like the old aggregate was not parallel, rdd.aggregate is.
>
>
> On Nov 11, 2014, at 1:18 PM, Gokhan Capan <gk...@gmail.com> wrote:
>
> So the alternatives are:
>
> 1- mapBlock to a matrix whose all rows-but-the first are empty, then
> aggregate
> 2- depend on a backend
>
> 1 is obviously OK.
>
> I don't like the idea of depending on a backend since SGD is a generic loss
> minimization, on which other algorithms will possibly depend.
>
> In this context, client-side aggregation is not an overhead, but even if it
> happens to be so, it doesn't have to be a client-side aggregate at all.
>
> Alternative to 1, I am thinking of at least having an aggregation
> operation, which will return an accumulated value anyway, and shouldn't
> affect algebra optimizations.
>
> I quickly implemented a naive one (supporting only Spark- I know I said
> that I don't like depending on a backend, but at least the backends-wide
> interface is consistent, and as a client, I still don't have to deal with
> Spark primitives directly).
>
> Is this nice enough? Is it too bad to have in the DSL?
> https://github.com/gcapan/mahout/compare/accumulateblocks
>
> Best
>
> Gokhan
>
> On Tue, Nov 11, 2014 at 10:45 PM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
>
> > Oh. algorithm actually collects the vectors and runs another cycle in the
> > client!
> >
> > Still, technically, you can collect almost-empty blocks to the client
> > (since they are mostly empty, it won't cause THAT huge overhead compared
> to
> > collecting single vectors, after all, how many partitions are we talking
> > about? 1000? ).
> >
> > On Tue, Nov 11, 2014 at 12:41 PM, Dmitriy Lyubimov <dl...@gmail.com>
> > wrote:
> >
> >>
> >>
> >> On Sat, Nov 8, 2014 at 12:42 PM, Gokhan Capan <gk...@gmail.com>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> Based on Zinkevich et al.'s Parallelized Stochastic Gradient paper (
> >>> http://martin.zinkevich.org/publications/nips2010.pdf), I tried to
> >>> implement SGD, and a regularized least squares solution for linear
> >>> regression (can easily be extended to other GLMs, too).
> >>>
> >>> How the algorithm works is as follows:
> >>> 1. Split data into partitions of T examples
> >>> 2. in parallel, for each partition:
> >>>   2.0. shuffle partition
> >>>   2.1. initialize parameter vector
> >>>   2.2. for each example in the shuffled partition
> >>>       2.2.1 update the parameter vector
> >>> 3. Aggregate all the parameter vectors and return
> >>>
> >>
> >> I guess technically it is possible (transform each block to a
> >> SparseRowMatrix or SparseMatrix with only first valid row) and then
> invoke
> >> colSums() or colMeans() (whatever aggregate means).
> >>
> >> However, i am not sure it is worth the ugliness. isn't it easier to
> >> declare these things quasi-algebraic and just do direct spark calls on
> the
> >> matrix RDD (map, aggregate)?
> >>
> >> The real danger is to introduce non-algebra things into algebra so that
> >> the rest of the algebra doesn't optimize any more.
> >>
> >>
> >
>
>

Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Pat Ferrel <pa...@occamsmachete.com>.
So you are following #2, which is good. #1 seems a bit like a hack. For a long time to come we will have to add things to the DSL if it is to be kept engine independent. Yours looks pretty general and simple. 

Are you familiar with the existing Mahout aggregate methods? They show up in the SGDHelper.java and other places in legacy code. I don’t know much about them but they seem to be a pre-functional programming attempt at this kind of thing. It looks like you are proposing a replacement for those based on rdd.aggregate, if so that would be very useful. For one thing it looks like the old aggregate was not parallel, rdd.aggregate is.


On Nov 11, 2014, at 1:18 PM, Gokhan Capan <gk...@gmail.com> wrote:

So the alternatives are:

1- mapBlock to a matrix whose all rows-but-the first are empty, then
aggregate
2- depend on a backend

1 is obviously OK.

I don't like the idea of depending on a backend since SGD is a generic loss
minimization, on which other algorithms will possibly depend.

In this context, client-side aggregation is not an overhead, but even if it
happens to be so, it doesn't have to be a client-side aggregate at all.

Alternative to 1, I am thinking of at least having an aggregation
operation, which will return an accumulated value anyway, and shouldn't
affect algebra optimizations.

I quickly implemented a naive one (supporting only Spark- I know I said
that I don't like depending on a backend, but at least the backends-wide
interface is consistent, and as a client, I still don't have to deal with
Spark primitives directly).

Is this nice enough? Is it too bad to have in the DSL?
https://github.com/gcapan/mahout/compare/accumulateblocks

Best

Gokhan

On Tue, Nov 11, 2014 at 10:45 PM, Dmitriy Lyubimov <dl...@gmail.com>
wrote:

> Oh. algorithm actually collects the vectors and runs another cycle in the
> client!
> 
> Still, technically, you can collect almost-empty blocks to the client
> (since they are mostly empty, it won't cause THAT huge overhead compared to
> collecting single vectors, after all, how many partitions are we talking
> about? 1000? ).
> 
> On Tue, Nov 11, 2014 at 12:41 PM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
> 
>> 
>> 
>> On Sat, Nov 8, 2014 at 12:42 PM, Gokhan Capan <gk...@gmail.com> wrote:
>> 
>>> Hi,
>>> 
>>> Based on Zinkevich et al.'s Parallelized Stochastic Gradient paper (
>>> http://martin.zinkevich.org/publications/nips2010.pdf), I tried to
>>> implement SGD, and a regularized least squares solution for linear
>>> regression (can easily be extended to other GLMs, too).
>>> 
>>> How the algorithm works is as follows:
>>> 1. Split data into partitions of T examples
>>> 2. in parallel, for each partition:
>>>   2.0. shuffle partition
>>>   2.1. initialize parameter vector
>>>   2.2. for each example in the shuffled partition
>>>       2.2.1 update the parameter vector
>>> 3. Aggregate all the parameter vectors and return
>>> 
>> 
>> I guess technically it is possible (transform each block to a
>> SparseRowMatrix or SparseMatrix with only first valid row) and then invoke
>> colSums() or colMeans() (whatever aggregate means).
>> 
>> However, i am not sure it is worth the ugliness. isn't it easier to
>> declare these things quasi-algebraic and just do direct spark calls on the
>> matrix RDD (map, aggregate)?
>> 
>> The real danger is to introduce non-algebra things into algebra so that
>> the rest of the algebra doesn't optimize any more.
>> 
>> 
> 


Re: SGD Implementation and Questions for mapBlock like functionality

Posted by Gokhan Capan <gk...@gmail.com>.
So the alternatives are:

1- mapBlock to a matrix whose all rows-but-the first are empty, then
aggregate
2- depend on a backend

1 is obviously OK.

I don't like the idea of depending on a backend since SGD is a generic loss
minimization, on which other algorithms will possibly depend.

In this context, client-side aggregation is not an overhead, but even if it
happens to be so, it doesn't have to be a client-side aggregate at all.

Alternative to 1, I am thinking of at least having an aggregation
operation, which will return an accumulated value anyway, and shouldn't
affect algebra optimizations.

I quickly implemented a naive one (supporting only Spark- I know I said
that I don't like depending on a backend, but at least the backends-wide
interface is consistent, and as a client, I still don't have to deal with
Spark primitives directly).

Is this nice enough? Is it too bad to have in the DSL?
https://github.com/gcapan/mahout/compare/accumulateblocks

Best

Gokhan

On Tue, Nov 11, 2014 at 10:45 PM, Dmitriy Lyubimov <dl...@gmail.com>
wrote:

> Oh. algorithm actually collects the vectors and runs another cycle in the
> client!
>
> Still, technically, you can collect almost-empty blocks to the client
> (since they are mostly empty, it won't cause THAT huge overhead compared to
> collecting single vectors, after all, how many partitions are we talking
> about? 1000? ).
>
> On Tue, Nov 11, 2014 at 12:41 PM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
>
>>
>>
>> On Sat, Nov 8, 2014 at 12:42 PM, Gokhan Capan <gk...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Based on Zinkevich et al.'s Parallelized Stochastic Gradient paper (
>>> http://martin.zinkevich.org/publications/nips2010.pdf), I tried to
>>> implement SGD, and a regularized least squares solution for linear
>>> regression (can easily be extended to other GLMs, too).
>>>
>>> How the algorithm works is as follows:
>>> 1. Split data into partitions of T examples
>>> 2. in parallel, for each partition:
>>>    2.0. shuffle partition
>>>    2.1. initialize parameter vector
>>>    2.2. for each example in the shuffled partition
>>>        2.2.1 update the parameter vector
>>> 3. Aggregate all the parameter vectors and return
>>>
>>
>> I guess technically it is possible (transform each block to a
>> SparseRowMatrix or SparseMatrix with only first valid row) and then invoke
>> colSums() or colMeans() (whatever aggregate means).
>>
>> However, i am not sure it is worth the ugliness. isn't it easier to
>> declare these things quasi-algebraic and just do direct spark calls on the
>> matrix RDD (map, aggregate)?
>>
>> The real danger is to introduce non-algebra things into algebra so that
>> the rest of the algebra doesn't optimize any more.
>>
>>
>