You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mahout.apache.org by Pat Ferrel <pa...@gmail.com> on 2014/06/16 01:55:26 UTC

Aggregation api

Seem like a good idea. The current use for aggregation seems pretty limited due to being non-distributed. The drm and rdd allow for easy construction of closures for processing blocks (like drm.mapBlock) but having an api to plug in closures or functions for aggregations/accumulators might be a nice piece of syntactic sugar.

I’ve only seen the Scala aggregation stuff used in tests to compare the results of small in core matrix ops to distributed ones. There are separate Matrix methods (sometimes using aggregations) and DRM methods, one is non-distributed the other distributed. DrmLike currently seems to only support row-wise mapBlock but Dmitriy may know better.

On Jun 14, 2014, at 6:45 PM, Ted Dunning <no...@github.com> wrote:

In math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala:

> @@ -188,8 +188,8 @@ object MatrixOps {
>      def apply(f: Vector): Double = f.sum
>    }
>  
> -  private def vectorCountFunc = new VectorFunction {
> -    def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0))
> +  private def vectorCountNonZeroElementsFunc = new VectorFunction {
> +    def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0))
The issue I have is with the rowAggregation and columnAggregation API. It enforces row by row evaluation. A map-reduce API could evaluate in many different orders and could iterate by rows or by columns for either aggregation and wouldn't require the a custom VectorFunction for simple aggregations.

—
Reply to this email directly or view it on GitHub.



Re: Aggregation api

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
I've read this issue and i think it makes sense.

Note that java api doesn't really have to do anything with scala, or scala
dsl, because mahout-java side notion of function is not scala function, so
it cannot be used as an argument to any scala apis that accept closures. In
that sense it is not really directly relevant to what Pat's was
raising/suggesting.

Something like DoubleDoubleFunction is not really viewed as function in
scala, so if we really wanted to re-do it in scala DSL apis, that whole
thing would all have to be re-cast into scala functions to be really a
scala code. I would not be supportive of that latter effort.


On Sun, Jun 15, 2014 at 11:39 PM, Ted Dunning <te...@gmail.com> wrote:

> Let's take this in small steps.  I have filed M-1582 which describes my
> suggestion at the sequential level.
>
> I have needed this several times recently in code that has nothing to do
> with Scala or parallel computing.  Since my suggestion meets my needs and
> imposes no burden on any other part of Mahout there should be no issues
> with that.
>
> I will move forward with that issue first.
>
>
>
>
> On Sun, Jun 15, 2014 at 9:23 PM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
>
> > i would be opposed to that. some argumentation in order of priority.
> > (1) as i was mentioning elsewhere, the two opposites here is functional
> > programming (spark) and algebraic explicitness (matlab). functional
> things
> > make messy, hard to read code. non-functional things keep things clean.
> >
> > (2) i conciously keep things non-functional with exception of map. I do
> not
> > want functional things that imply shuffle tasks. Remember, we are not in
> > the map-reduce world anymore. Once we introduce shuffle-reduce things,
> > there's no reason not to introduce shuffle-combine, shuffle-cogroup,
> > shuffle-cartesian, shuffle-zip. At which point we are back to Spark. My
> > answer to all those shuffle-dependent operations -- take rdd property and
> > do specific shuffles all you want. Also, doing custom shuffle tasks takes
> > optimiztion away from optimizer -- unlike custom map task (map block)
> with
> > restrictions.
> >
> > (3) We have been going by two things : keep it R-like, and add things as
> > needed. I am still waiting for the case where such aggregate api would be
> > needed, in distributed setting. For example, i dont even see the need for
> > this non-zero count thing. Current api is more than enough to do that,
> e.g.
> > i would have done it something along the lines of transforming A into
> > indicator matrix and taking colsums:
> >
> > val colCounts = a.mapBlock() {
> >    case (keys, block) =>
> >      for ( row <- block; el <- row.iterateNonZero) row(el.index) = 1.0
> >      keys -> block
> > }
> >   .colSums()
> >
> > assuming it doesn't trigger any fail-fast asserts, we are done here, and
> in
> > a far more efficient way than with aggregate api in 1464 patch.
> >
> > (warning: this code may create side effects in certain pipelines).
> >
> > i could probably write up another 3 to 5 weeker argumentation amounting
> to
> > one strong item, but i got a cold and tired and kind of dont care that
> > much.
> > On Jun 15, 2014 4:55 PM, "Pat Ferrel" <pa...@gmail.com> wrote:
> >
> > > Seem like a good idea. The current use for aggregation seems pretty
> > > limited due to being non-distributed. The drm and rdd allow for easy
> > > construction of closures for processing blocks (like drm.mapBlock) but
> > > having an api to plug in closures or functions for
> > > aggregations/accumulators might be a nice piece of syntactic sugar.
> > >
> > > I’ve only seen the Scala aggregation stuff used in tests to compare the
> > > results of small in core matrix ops to distributed ones. There are
> > separate
> > > Matrix methods (sometimes using aggregations) and DRM methods, one is
> > > non-distributed the other distributed. DrmLike currently seems to only
> > > support row-wise mapBlock but Dmitriy may know better.
> > >
> > > On Jun 14, 2014, at 6:45 PM, Ted Dunning <no...@github.com>
> > wrote:
> > >
> > > In
> > >
> >
> math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala:
> > >
> > > > @@ -188,8 +188,8 @@ object MatrixOps {
> > > >      def apply(f: Vector): Double = f.sum
> > > >    }
> > > >
> > > > -  private def vectorCountFunc = new VectorFunction {
> > > > -    def apply(f: Vector): Double = f.aggregate(Functions.PLUS,
> > > Functions.greater(0))
> > > > +  private def vectorCountNonZeroElementsFunc = new VectorFunction {
> > > > +    def apply(f: Vector): Double = f.aggregate(Functions.PLUS,
> > > Functions.notEqual(0))
> > > The issue I have is with the rowAggregation and columnAggregation API.
> It
> > > enforces row by row evaluation. A map-reduce API could evaluate in many
> > > different orders and could iterate by rows or by columns for either
> > > aggregation and wouldn't require the a custom VectorFunction for simple
> > > aggregations.
> > >
> > > —
> > > Reply to this email directly or view it on GitHub.
> > >
> > >
> > >
> >
>

Re: Aggregation api

Posted by Ted Dunning <te...@gmail.com>.
Let's take this in small steps.  I have filed M-1582 which describes my
suggestion at the sequential level.

I have needed this several times recently in code that has nothing to do
with Scala or parallel computing.  Since my suggestion meets my needs and
imposes no burden on any other part of Mahout there should be no issues
with that.

I will move forward with that issue first.




On Sun, Jun 15, 2014 at 9:23 PM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> i would be opposed to that. some argumentation in order of priority.
> (1) as i was mentioning elsewhere, the two opposites here is functional
> programming (spark) and algebraic explicitness (matlab). functional things
> make messy, hard to read code. non-functional things keep things clean.
>
> (2) i conciously keep things non-functional with exception of map. I do not
> want functional things that imply shuffle tasks. Remember, we are not in
> the map-reduce world anymore. Once we introduce shuffle-reduce things,
> there's no reason not to introduce shuffle-combine, shuffle-cogroup,
> shuffle-cartesian, shuffle-zip. At which point we are back to Spark. My
> answer to all those shuffle-dependent operations -- take rdd property and
> do specific shuffles all you want. Also, doing custom shuffle tasks takes
> optimiztion away from optimizer -- unlike custom map task (map block) with
> restrictions.
>
> (3) We have been going by two things : keep it R-like, and add things as
> needed. I am still waiting for the case where such aggregate api would be
> needed, in distributed setting. For example, i dont even see the need for
> this non-zero count thing. Current api is more than enough to do that, e.g.
> i would have done it something along the lines of transforming A into
> indicator matrix and taking colsums:
>
> val colCounts = a.mapBlock() {
>    case (keys, block) =>
>      for ( row <- block; el <- row.iterateNonZero) row(el.index) = 1.0
>      keys -> block
> }
>   .colSums()
>
> assuming it doesn't trigger any fail-fast asserts, we are done here, and in
> a far more efficient way than with aggregate api in 1464 patch.
>
> (warning: this code may create side effects in certain pipelines).
>
> i could probably write up another 3 to 5 weeker argumentation amounting to
> one strong item, but i got a cold and tired and kind of dont care that
> much.
> On Jun 15, 2014 4:55 PM, "Pat Ferrel" <pa...@gmail.com> wrote:
>
> > Seem like a good idea. The current use for aggregation seems pretty
> > limited due to being non-distributed. The drm and rdd allow for easy
> > construction of closures for processing blocks (like drm.mapBlock) but
> > having an api to plug in closures or functions for
> > aggregations/accumulators might be a nice piece of syntactic sugar.
> >
> > I’ve only seen the Scala aggregation stuff used in tests to compare the
> > results of small in core matrix ops to distributed ones. There are
> separate
> > Matrix methods (sometimes using aggregations) and DRM methods, one is
> > non-distributed the other distributed. DrmLike currently seems to only
> > support row-wise mapBlock but Dmitriy may know better.
> >
> > On Jun 14, 2014, at 6:45 PM, Ted Dunning <no...@github.com>
> wrote:
> >
> > In
> >
> math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala:
> >
> > > @@ -188,8 +188,8 @@ object MatrixOps {
> > >      def apply(f: Vector): Double = f.sum
> > >    }
> > >
> > > -  private def vectorCountFunc = new VectorFunction {
> > > -    def apply(f: Vector): Double = f.aggregate(Functions.PLUS,
> > Functions.greater(0))
> > > +  private def vectorCountNonZeroElementsFunc = new VectorFunction {
> > > +    def apply(f: Vector): Double = f.aggregate(Functions.PLUS,
> > Functions.notEqual(0))
> > The issue I have is with the rowAggregation and columnAggregation API. It
> > enforces row by row evaluation. A map-reduce API could evaluate in many
> > different orders and could iterate by rows or by columns for either
> > aggregation and wouldn't require the a custom VectorFunction for simple
> > aggregations.
> >
> > —
> > Reply to this email directly or view it on GitHub.
> >
> >
> >
>

Re: Aggregation api

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
i would be opposed to that. some argumentation in order of priority.
(1) as i was mentioning elsewhere, the two opposites here is functional
programming (spark) and algebraic explicitness (matlab). functional things
make messy, hard to read code. non-functional things keep things clean.

(2) i conciously keep things non-functional with exception of map. I do not
want functional things that imply shuffle tasks. Remember, we are not in
the map-reduce world anymore. Once we introduce shuffle-reduce things,
there's no reason not to introduce shuffle-combine, shuffle-cogroup,
shuffle-cartesian, shuffle-zip. At which point we are back to Spark. My
answer to all those shuffle-dependent operations -- take rdd property and
do specific shuffles all you want. Also, doing custom shuffle tasks takes
optimiztion away from optimizer -- unlike custom map task (map block) with
restrictions.

(3) We have been going by two things : keep it R-like, and add things as
needed. I am still waiting for the case where such aggregate api would be
needed, in distributed setting. For example, i dont even see the need for
this non-zero count thing. Current api is more than enough to do that, e.g.
i would have done it something along the lines of transforming A into
indicator matrix and taking colsums:

val colCounts = a.mapBlock() {
   case (keys, block) =>
     for ( row <- block; el <- row.iterateNonZero) row(el.index) = 1.0
     keys -> block
}
  .colSums()

assuming it doesn't trigger any fail-fast asserts, we are done here, and in
a far more efficient way than with aggregate api in 1464 patch.

(warning: this code may create side effects in certain pipelines).

i could probably write up another 3 to 5 weeker argumentation amounting to
one strong item, but i got a cold and tired and kind of dont care that much.
On Jun 15, 2014 4:55 PM, "Pat Ferrel" <pa...@gmail.com> wrote:

> Seem like a good idea. The current use for aggregation seems pretty
> limited due to being non-distributed. The drm and rdd allow for easy
> construction of closures for processing blocks (like drm.mapBlock) but
> having an api to plug in closures or functions for
> aggregations/accumulators might be a nice piece of syntactic sugar.
>
> I’ve only seen the Scala aggregation stuff used in tests to compare the
> results of small in core matrix ops to distributed ones. There are separate
> Matrix methods (sometimes using aggregations) and DRM methods, one is
> non-distributed the other distributed. DrmLike currently seems to only
> support row-wise mapBlock but Dmitriy may know better.
>
> On Jun 14, 2014, at 6:45 PM, Ted Dunning <no...@github.com> wrote:
>
> In
> math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala:
>
> > @@ -188,8 +188,8 @@ object MatrixOps {
> >      def apply(f: Vector): Double = f.sum
> >    }
> >
> > -  private def vectorCountFunc = new VectorFunction {
> > -    def apply(f: Vector): Double = f.aggregate(Functions.PLUS,
> Functions.greater(0))
> > +  private def vectorCountNonZeroElementsFunc = new VectorFunction {
> > +    def apply(f: Vector): Double = f.aggregate(Functions.PLUS,
> Functions.notEqual(0))
> The issue I have is with the rowAggregation and columnAggregation API. It
> enforces row by row evaluation. A map-reduce API could evaluate in many
> different orders and could iterate by rows or by columns for either
> aggregation and wouldn't require the a custom VectorFunction for simple
> aggregations.
>
> —
> Reply to this email directly or view it on GitHub.
>
>
>