You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Dmitriy Lyubimov <dl...@gmail.com> on 2016/03/22 21:56:37 UTC

a typical ML algorithm flow

Hi,

probably more of a question for Till:

Imagine a common ML algorithm flow that runs until convergence.

typical distributed flow would be something like that (e.g. GMM EM would be
exactly like that):

A: input

do {

   stat1 = A.map.reduce
   A = A.update-map(stat1)
   conv = A.map.reduce
} until conv > convThreshold

There probably could be 1 map-reduce step originating on A to compute both
convergence criteria statistics and udpate statistics in one step. not the
point.

The point is that update and map.reduce originate on the same dataset
intermittently.

In spark we would normally commit A to a object tree cache so that data is
available to subsequent map passes without any I/O or serialization
operations, thus insuring high rate of iterations.

We observe the same pattern pretty much everywhere. clustering,
probabilistic algorithms, even batch gradient descent of quasi newton
algorithms fitting.

How do we do something like that, for example, in FlinkML?

Thoughts?

thanks.

-Dmitriy

Re: a typical ML algorithm flow

Posted by Till Rohrmann <ti...@gmail.com>.
I agree that Flink’s concept of the closed loop iteration does not
translate so easily to a more general distributed linear algebra DSL such
as Samsara. There one usually writes loops using the for and while
primitives. Unfortunately, it is not so trivial to automatically translate
a for loop into Flink’s closed loop primitive.

Flink does not support butterfly mixing communication patterns out of the
box. The basic communication patterns of the runtime are pointwise and
all-to-all communication. But you can write your own Partitioner which will
distribute the elements in your cluster as you want to. You have to set it
via the DataSet.partitionCustom API call. Alternatively, you could
calculate the next butterfly mixing step in a map function, assign a
corresponding destination key and then group by this key.

Cheers,
Till
​

On Wed, Mar 30, 2016 at 3:03 AM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> BTW thank you for educating me on this.
>
> I think it's actually a wonderful capability, along with the capability of
> broadcasting distributed sets to map operators, it means (I hope) that
> fine-grained, centralized scheduling and centralized broadcasting we find
> in Spark analogous algorithms could be all but eliminated.
>
> Just to explain the rationale. It does present a problem for some things in
> Samsara though. Some times we do want to load inputs and eagerly evaluate
> some of their heuristics in order to help the plan construction itself.
> Since we have already loaded the datasets and hopefully are about to
> execute them, it would be a waste to load them again once the actual
> evaluation plan is built.
>
> This is a very basic technique of database optimizers: being able to infer
> the execution plan based on _input dataset heuristics_. In Samsara, we find
> that just algebra optimizes not unlike relational algebra, and that
> algebraic computations could be executed not unlike, say, Hive sql-like
> statements.
>
> I guess something similar happens inside Flink itself, too: it may decide
> on certain operations based on data-inferred heuristics.
>
> Like i said, i think iterations and dataset broadcasts are very cool ideas
> for the sake of ML; although truly capitalizing on them in Samsara APIs
> could be a bit of a challenge as it stands.
> It certainly would be a challenge for our platform-agnostic code.
>
> BTW, while we are at it, are there any schemes in Flink that leverage
> something like "butterfly mixing" communication patterns for power law
> algorithms? and hopefully without excessive spilling?
>
> Thank you very much.
> -d
>
>
> On Tue, Mar 29, 2016 at 5:31 PM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
>
> > Thanks.
> >
> > Regardless of the rationale, i wanted to confirm if the iteration is
> > lazily evaluated-only thing and it sounds eager evaluation inside (and
> > collection) is not possible, and the algorithms that need it, just will
> > have to work around this. I think this answers my question -- thanks!
> >
> > -d
> >
> >
> > On Tue, Mar 29, 2016 at 2:53 AM, Till Rohrmann <tr...@apache.org>
> > wrote:
> >
> >> Hi,
> >>
> >> Chiwan’s example is perfectly fine and it should also work with general
> EM
> >> algorithms. Moreover, it is the recommended way how to implement
> >> iterations
> >> with Flink. The iterateWithTermination API call generates a lazily
> >> evaluated data flow with an iteration operator. This plan will only be
> >> executed when you call env.execute, collect or count which depends on
> this
> >> data flow. In the example it would be triggered by result.print. You can
> >> also take a look at the KMeans implementation of Flink. It does not use
> a
> >> dynamic convergence criterion but it could easily be added.
> >>
> >> If you really need to trigger the execution of the data flow for each
> >> iteration (e.g. because you have different data flows depending on the
> >> result), then you should persist the intermediate result every n
> >> iteration.
> >> Otherwise you will over and over re-trigger the execution of previous
> >> operators.
> >>
> >> Cheers,
> >> Till
> >> ​
> >>
> >> On Tue, Mar 29, 2016 at 1:26 AM, Dmitriy Lyubimov <dl...@gmail.com>
> >> wrote:
> >>
> >> > Thanks Chiwan.
> >> >
> >> > I think this example still creates a lazy-evaluated plan. And if i
> need
> >> to
> >> > collect statistics to front end (and use it in subsequent iteration
> >> > evaluation) as my example with computing column-wise averages
> suggests?
> >> >
> >> > problem generally is, what if I need to eagerly evaluate the
> statistics
> >> > inside the iteration in order to proceed with further computations
> (and
> >> > even plan construction). typically, that would be result of M-step in
> EM
> >> > algorithm.
> >> >
> >> > On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <ch...@apache.org>
> >> > wrote:
> >> >
> >> > > Hi Dmitriy,
> >> > >
> >> > > I think you can implement it with iterative API with custom
> >> convergence
> >> > > criterion. You can express the convergence criterion by two methods.
> >> One
> >> > is
> >> > > using a convergence criterion data set [1][2] and the other is
> >> > registering
> >> > > an aggregator with custom implementation of `ConvergenceCriterion`
> >> > > interface [3].
> >> > >
> >> > > Here is an example using a convergence criterion data set in Scala
> >> API:
> >> > >
> >> > > ```
> >> > > package flink.sample
> >> > >
> >> > > import org.apache.flink.api.scala._
> >> > >
> >> > > import scala.util.Random
> >> > >
> >> > > object SampleApp extends App {
> >> > >   val env = ExecutionEnvironment.getExecutionEnvironment
> >> > >
> >> > >   val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
> >> > >
> >> > >   val result = data.iterateWithTermination(5000) { prev =>
> >> > >     // calculate sub solution
> >> > >     val rand = Random.nextDouble()
> >> > >     val subSolution = prev.map(_ * rand)
> >> > >
> >> > >     // calculate convergent condition
> >> > >     val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_
> >> > 8)
> >> > >
> >> > >     (subSolution, convergence)
> >> > >   }
> >> > >
> >> > >   result.print()
> >> > > }
> >> > > ```
> >> > >
> >> > > Regards,
> >> > > Chiwan Park
> >> > >
> >> > > [1]:
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
> >> > > [2]: iterateWithTermination method in
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
> >> > > [3]:
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29
> >> > >
> >> > > > On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dl...@gmail.com>
> >> > wrote:
> >> > > >
> >> > > > Thank you, all :)
> >> > > >
> >> > > > yes, that's my question. How do we construct such a loop with a
> >> > concrete
> >> > > > example?
> >> > > >
> >> > > > Let's take something nonsensical yet specific.
> >> > > >
> >> > > > Say, in samsara terms we do something like that :
> >> > > >
> >> > > > var avg = Double.PositiveInfinity
> >> > > > var drmA = ... (construct elsewhere)
> >> > > >
> >> > > >
> >> > > >
> >> > > > do {
> >> > > >   avg = drmA.colMeans.mean // average of col-wise means
> >> > > >   drmA = drmA - avg // elementwise subtract of average
> >> > > >
> >> > > > } while (avg > 1e-10)
> >> > > >
> >> > > > (which probably does not converge in reality).
> >> > > >
> >> > > > How would we implement that with native iterations in flink?
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <
> >> trohrmann@apache.org>
> >> > > wrote:
> >> > > >
> >> > > >> Hi Dmitriy,
> >> > > >>
> >> > > >> I’m not sure whether I’ve understood your question correctly, so
> >> > please
> >> > > >> correct me if I’m wrong.
> >> > > >>
> >> > > >> So you’re asking whether it is a problem that
> >> > > >>
> >> > > >> stat1 = A.map.reduce
> >> > > >> A = A.update.map(stat1)
> >> > > >>
> >> > > >> are executed on the same input data set A and whether we have to
> >> > cache A
> >> > > >> for that, right? I assume you’re worried that A is calculated
> >> twice.
> >> > > >>
> >> > > >> Since you don’t have a API call which triggers eager execution of
> >> the
> >> > > data
> >> > > >> flow, the map.reduce and map(stat1) call will only construct the
> >> data
> >> > > flow
> >> > > >> of your program. Both operators will depend on the result of A
> >> which
> >> > is
> >> > > >> only once calculated (when execute, collect or count is called)
> and
> >> > then
> >> > > >> sent to the map.reduce and map(stat1) operator.
> >> > > >>
> >> > > >> However, it is not recommended using an explicit loop to do
> >> iterative
> >> > > >> computations with Flink. The problem here is that you will
> >> basically
> >> > > unroll
> >> > > >> the loop and construct a long pipeline with the operations of
> each
> >> > > >> iterations. Once you execute this long pipeline you will face
> >> > > considerable
> >> > > >> memory fragmentation, because every operator will get a
> >> proportional
> >> > > >> fraction of the available memory assigned. Even worse, if you
> >> trigger
> >> > > the
> >> > > >> execution of your data flow to evaluate the convergence
> criterion,
> >> you
> >> > > will
> >> > > >> execute for each iteration the complete pipeline which has been
> >> built
> >> > > up so
> >> > > >> far. Thus, you’ll end up with a quadratic complexity in the
> number
> >> of
> >> > > >> iterations. Therefore, I would highly recommend using Flink’s
> >> built in
> >> > > >> support for native iterations which won’t suffer from this
> problem
> >> or
> >> > to
> >> > > >> materialize at least for every n iterations the intermediate
> >> result.
> >> > At
> >> > > the
> >> > > >> moment this would mean to write the data to some sink and then
> >> reading
> >> > > it
> >> > > >> from there again.
> >> > > >>
> >> > > >> I hope this answers your question. If not, then don’t hesitate to
> >> ask
> >> > me
> >> > > >> again.
> >> > > >>
> >> > > >> Cheers,
> >> > > >> Till
> >> > > >> ​
> >> > > >>
> >> > > >> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
> >> > > >> theodoros.vasiloudis@gmail.com> wrote:
> >> > > >>
> >> > > >>> Hello Dmitriy,
> >> > > >>>
> >> > > >>> If I understood correctly what you are basically talking about
> >> > > modifying
> >> > > >> a
> >> > > >>> DataSet as you iterate over it.
> >> > > >>>
> >> > > >>> AFAIK this is currently not possible in Flink, and indeed it's a
> >> real
> >> > > >>> bottleneck for ML algorithms. This is the reason our current
> >> > > >>> SGD implementation does a pass over the whole dataset at each
> >> > > iteration,
> >> > > >>> since we cannot take a sample from the dataset
> >> > > >>> and iterate only over that (so it's not really stochastic).
> >> > > >>>
> >> > > >>> The relevant JIRA is here:
> >> > > >>> https://issues.apache.org/jira/browse/FLINK-2396
> >> > > >>>
> >> > > >>> I would love to start a discussion on how we can proceed to fix
> >> this.
> >> > > >>>
> >> > > >>> Regards,
> >> > > >>> Theodore
> >> > > >>>
> >> > > >>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <
> >> dlieu.7@gmail.com
> >> > >
> >> > > >>> wrote:
> >> > > >>>
> >> > > >>>> Hi,
> >> > > >>>>
> >> > > >>>> probably more of a question for Till:
> >> > > >>>>
> >> > > >>>> Imagine a common ML algorithm flow that runs until convergence.
> >> > > >>>>
> >> > > >>>> typical distributed flow would be something like that (e.g. GMM
> >> EM
> >> > > >> would
> >> > > >>> be
> >> > > >>>> exactly like that):
> >> > > >>>>
> >> > > >>>> A: input
> >> > > >>>>
> >> > > >>>> do {
> >> > > >>>>
> >> > > >>>>   stat1 = A.map.reduce
> >> > > >>>>   A = A.update-map(stat1)
> >> > > >>>>   conv = A.map.reduce
> >> > > >>>> } until conv > convThreshold
> >> > > >>>>
> >> > > >>>> There probably could be 1 map-reduce step originating on A to
> >> > compute
> >> > > >>> both
> >> > > >>>> convergence criteria statistics and udpate statistics in one
> >> step.
> >> > not
> >> > > >>> the
> >> > > >>>> point.
> >> > > >>>>
> >> > > >>>> The point is that update and map.reduce originate on the same
> >> > dataset
> >> > > >>>> intermittently.
> >> > > >>>>
> >> > > >>>> In spark we would normally commit A to a object tree cache so
> >> that
> >> > > data
> >> > > >>> is
> >> > > >>>> available to subsequent map passes without any I/O or
> >> serialization
> >> > > >>>> operations, thus insuring high rate of iterations.
> >> > > >>>>
> >> > > >>>> We observe the same pattern pretty much everywhere. clustering,
> >> > > >>>> probabilistic algorithms, even batch gradient descent of quasi
> >> > newton
> >> > > >>>> algorithms fitting.
> >> > > >>>>
> >> > > >>>> How do we do something like that, for example, in FlinkML?
> >> > > >>>>
> >> > > >>>> Thoughts?
> >> > > >>>>
> >> > > >>>> thanks.
> >> > > >>>>
> >> > > >>>> -Dmitriy
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> > >
> >> >
> >>
> >
> >
>

Re: a typical ML algorithm flow

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
BTW thank you for educating me on this.

I think it's actually a wonderful capability, along with the capability of
broadcasting distributed sets to map operators, it means (I hope) that
fine-grained, centralized scheduling and centralized broadcasting we find
in Spark analogous algorithms could be all but eliminated.

Just to explain the rationale. It does present a problem for some things in
Samsara though. Some times we do want to load inputs and eagerly evaluate
some of their heuristics in order to help the plan construction itself.
Since we have already loaded the datasets and hopefully are about to
execute them, it would be a waste to load them again once the actual
evaluation plan is built.

This is a very basic technique of database optimizers: being able to infer
the execution plan based on _input dataset heuristics_. In Samsara, we find
that just algebra optimizes not unlike relational algebra, and that
algebraic computations could be executed not unlike, say, Hive sql-like
statements.

I guess something similar happens inside Flink itself, too: it may decide
on certain operations based on data-inferred heuristics.

Like i said, i think iterations and dataset broadcasts are very cool ideas
for the sake of ML; although truly capitalizing on them in Samsara APIs
could be a bit of a challenge as it stands.
It certainly would be a challenge for our platform-agnostic code.

BTW, while we are at it, are there any schemes in Flink that leverage
something like "butterfly mixing" communication patterns for power law
algorithms? and hopefully without excessive spilling?

Thank you very much.
-d


On Tue, Mar 29, 2016 at 5:31 PM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> Thanks.
>
> Regardless of the rationale, i wanted to confirm if the iteration is
> lazily evaluated-only thing and it sounds eager evaluation inside (and
> collection) is not possible, and the algorithms that need it, just will
> have to work around this. I think this answers my question -- thanks!
>
> -d
>
>
> On Tue, Mar 29, 2016 at 2:53 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi,
>>
>> Chiwan’s example is perfectly fine and it should also work with general EM
>> algorithms. Moreover, it is the recommended way how to implement
>> iterations
>> with Flink. The iterateWithTermination API call generates a lazily
>> evaluated data flow with an iteration operator. This plan will only be
>> executed when you call env.execute, collect or count which depends on this
>> data flow. In the example it would be triggered by result.print. You can
>> also take a look at the KMeans implementation of Flink. It does not use a
>> dynamic convergence criterion but it could easily be added.
>>
>> If you really need to trigger the execution of the data flow for each
>> iteration (e.g. because you have different data flows depending on the
>> result), then you should persist the intermediate result every n
>> iteration.
>> Otherwise you will over and over re-trigger the execution of previous
>> operators.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Mar 29, 2016 at 1:26 AM, Dmitriy Lyubimov <dl...@gmail.com>
>> wrote:
>>
>> > Thanks Chiwan.
>> >
>> > I think this example still creates a lazy-evaluated plan. And if i need
>> to
>> > collect statistics to front end (and use it in subsequent iteration
>> > evaluation) as my example with computing column-wise averages suggests?
>> >
>> > problem generally is, what if I need to eagerly evaluate the statistics
>> > inside the iteration in order to proceed with further computations (and
>> > even plan construction). typically, that would be result of M-step in EM
>> > algorithm.
>> >
>> > On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <ch...@apache.org>
>> > wrote:
>> >
>> > > Hi Dmitriy,
>> > >
>> > > I think you can implement it with iterative API with custom
>> convergence
>> > > criterion. You can express the convergence criterion by two methods.
>> One
>> > is
>> > > using a convergence criterion data set [1][2] and the other is
>> > registering
>> > > an aggregator with custom implementation of `ConvergenceCriterion`
>> > > interface [3].
>> > >
>> > > Here is an example using a convergence criterion data set in Scala
>> API:
>> > >
>> > > ```
>> > > package flink.sample
>> > >
>> > > import org.apache.flink.api.scala._
>> > >
>> > > import scala.util.Random
>> > >
>> > > object SampleApp extends App {
>> > >   val env = ExecutionEnvironment.getExecutionEnvironment
>> > >
>> > >   val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>> > >
>> > >   val result = data.iterateWithTermination(5000) { prev =>
>> > >     // calculate sub solution
>> > >     val rand = Random.nextDouble()
>> > >     val subSolution = prev.map(_ * rand)
>> > >
>> > >     // calculate convergent condition
>> > >     val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_
>> > 8)
>> > >
>> > >     (subSolution, convergence)
>> > >   }
>> > >
>> > >   result.print()
>> > > }
>> > > ```
>> > >
>> > > Regards,
>> > > Chiwan Park
>> > >
>> > > [1]:
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
>> > > [2]: iterateWithTermination method in
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
>> > > [3]:
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29
>> > >
>> > > > On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dl...@gmail.com>
>> > wrote:
>> > > >
>> > > > Thank you, all :)
>> > > >
>> > > > yes, that's my question. How do we construct such a loop with a
>> > concrete
>> > > > example?
>> > > >
>> > > > Let's take something nonsensical yet specific.
>> > > >
>> > > > Say, in samsara terms we do something like that :
>> > > >
>> > > > var avg = Double.PositiveInfinity
>> > > > var drmA = ... (construct elsewhere)
>> > > >
>> > > >
>> > > >
>> > > > do {
>> > > >   avg = drmA.colMeans.mean // average of col-wise means
>> > > >   drmA = drmA - avg // elementwise subtract of average
>> > > >
>> > > > } while (avg > 1e-10)
>> > > >
>> > > > (which probably does not converge in reality).
>> > > >
>> > > > How would we implement that with native iterations in flink?
>> > > >
>> > > >
>> > > >
>> > > > On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <
>> trohrmann@apache.org>
>> > > wrote:
>> > > >
>> > > >> Hi Dmitriy,
>> > > >>
>> > > >> I’m not sure whether I’ve understood your question correctly, so
>> > please
>> > > >> correct me if I’m wrong.
>> > > >>
>> > > >> So you’re asking whether it is a problem that
>> > > >>
>> > > >> stat1 = A.map.reduce
>> > > >> A = A.update.map(stat1)
>> > > >>
>> > > >> are executed on the same input data set A and whether we have to
>> > cache A
>> > > >> for that, right? I assume you’re worried that A is calculated
>> twice.
>> > > >>
>> > > >> Since you don’t have a API call which triggers eager execution of
>> the
>> > > data
>> > > >> flow, the map.reduce and map(stat1) call will only construct the
>> data
>> > > flow
>> > > >> of your program. Both operators will depend on the result of A
>> which
>> > is
>> > > >> only once calculated (when execute, collect or count is called) and
>> > then
>> > > >> sent to the map.reduce and map(stat1) operator.
>> > > >>
>> > > >> However, it is not recommended using an explicit loop to do
>> iterative
>> > > >> computations with Flink. The problem here is that you will
>> basically
>> > > unroll
>> > > >> the loop and construct a long pipeline with the operations of each
>> > > >> iterations. Once you execute this long pipeline you will face
>> > > considerable
>> > > >> memory fragmentation, because every operator will get a
>> proportional
>> > > >> fraction of the available memory assigned. Even worse, if you
>> trigger
>> > > the
>> > > >> execution of your data flow to evaluate the convergence criterion,
>> you
>> > > will
>> > > >> execute for each iteration the complete pipeline which has been
>> built
>> > > up so
>> > > >> far. Thus, you’ll end up with a quadratic complexity in the number
>> of
>> > > >> iterations. Therefore, I would highly recommend using Flink’s
>> built in
>> > > >> support for native iterations which won’t suffer from this problem
>> or
>> > to
>> > > >> materialize at least for every n iterations the intermediate
>> result.
>> > At
>> > > the
>> > > >> moment this would mean to write the data to some sink and then
>> reading
>> > > it
>> > > >> from there again.
>> > > >>
>> > > >> I hope this answers your question. If not, then don’t hesitate to
>> ask
>> > me
>> > > >> again.
>> > > >>
>> > > >> Cheers,
>> > > >> Till
>> > > >> ​
>> > > >>
>> > > >> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
>> > > >> theodoros.vasiloudis@gmail.com> wrote:
>> > > >>
>> > > >>> Hello Dmitriy,
>> > > >>>
>> > > >>> If I understood correctly what you are basically talking about
>> > > modifying
>> > > >> a
>> > > >>> DataSet as you iterate over it.
>> > > >>>
>> > > >>> AFAIK this is currently not possible in Flink, and indeed it's a
>> real
>> > > >>> bottleneck for ML algorithms. This is the reason our current
>> > > >>> SGD implementation does a pass over the whole dataset at each
>> > > iteration,
>> > > >>> since we cannot take a sample from the dataset
>> > > >>> and iterate only over that (so it's not really stochastic).
>> > > >>>
>> > > >>> The relevant JIRA is here:
>> > > >>> https://issues.apache.org/jira/browse/FLINK-2396
>> > > >>>
>> > > >>> I would love to start a discussion on how we can proceed to fix
>> this.
>> > > >>>
>> > > >>> Regards,
>> > > >>> Theodore
>> > > >>>
>> > > >>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <
>> dlieu.7@gmail.com
>> > >
>> > > >>> wrote:
>> > > >>>
>> > > >>>> Hi,
>> > > >>>>
>> > > >>>> probably more of a question for Till:
>> > > >>>>
>> > > >>>> Imagine a common ML algorithm flow that runs until convergence.
>> > > >>>>
>> > > >>>> typical distributed flow would be something like that (e.g. GMM
>> EM
>> > > >> would
>> > > >>> be
>> > > >>>> exactly like that):
>> > > >>>>
>> > > >>>> A: input
>> > > >>>>
>> > > >>>> do {
>> > > >>>>
>> > > >>>>   stat1 = A.map.reduce
>> > > >>>>   A = A.update-map(stat1)
>> > > >>>>   conv = A.map.reduce
>> > > >>>> } until conv > convThreshold
>> > > >>>>
>> > > >>>> There probably could be 1 map-reduce step originating on A to
>> > compute
>> > > >>> both
>> > > >>>> convergence criteria statistics and udpate statistics in one
>> step.
>> > not
>> > > >>> the
>> > > >>>> point.
>> > > >>>>
>> > > >>>> The point is that update and map.reduce originate on the same
>> > dataset
>> > > >>>> intermittently.
>> > > >>>>
>> > > >>>> In spark we would normally commit A to a object tree cache so
>> that
>> > > data
>> > > >>> is
>> > > >>>> available to subsequent map passes without any I/O or
>> serialization
>> > > >>>> operations, thus insuring high rate of iterations.
>> > > >>>>
>> > > >>>> We observe the same pattern pretty much everywhere. clustering,
>> > > >>>> probabilistic algorithms, even batch gradient descent of quasi
>> > newton
>> > > >>>> algorithms fitting.
>> > > >>>>
>> > > >>>> How do we do something like that, for example, in FlinkML?
>> > > >>>>
>> > > >>>> Thoughts?
>> > > >>>>
>> > > >>>> thanks.
>> > > >>>>
>> > > >>>> -Dmitriy
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> > >
>> >
>>
>
>

Re: a typical ML algorithm flow

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
Thanks.

Regardless of the rationale, i wanted to confirm if the iteration is lazily
evaluated-only thing and it sounds eager evaluation inside (and collection)
is not possible, and the algorithms that need it, just will have to work
around this. I think this answers my question -- thanks!

-d

On Tue, Mar 29, 2016 at 2:53 AM, Till Rohrmann <tr...@apache.org> wrote:

> Hi,
>
> Chiwan’s example is perfectly fine and it should also work with general EM
> algorithms. Moreover, it is the recommended way how to implement iterations
> with Flink. The iterateWithTermination API call generates a lazily
> evaluated data flow with an iteration operator. This plan will only be
> executed when you call env.execute, collect or count which depends on this
> data flow. In the example it would be triggered by result.print. You can
> also take a look at the KMeans implementation of Flink. It does not use a
> dynamic convergence criterion but it could easily be added.
>
> If you really need to trigger the execution of the data flow for each
> iteration (e.g. because you have different data flows depending on the
> result), then you should persist the intermediate result every n iteration.
> Otherwise you will over and over re-trigger the execution of previous
> operators.
>
> Cheers,
> Till
> ​
>
> On Tue, Mar 29, 2016 at 1:26 AM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
>
> > Thanks Chiwan.
> >
> > I think this example still creates a lazy-evaluated plan. And if i need
> to
> > collect statistics to front end (and use it in subsequent iteration
> > evaluation) as my example with computing column-wise averages suggests?
> >
> > problem generally is, what if I need to eagerly evaluate the statistics
> > inside the iteration in order to proceed with further computations (and
> > even plan construction). typically, that would be result of M-step in EM
> > algorithm.
> >
> > On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <ch...@apache.org>
> > wrote:
> >
> > > Hi Dmitriy,
> > >
> > > I think you can implement it with iterative API with custom convergence
> > > criterion. You can express the convergence criterion by two methods.
> One
> > is
> > > using a convergence criterion data set [1][2] and the other is
> > registering
> > > an aggregator with custom implementation of `ConvergenceCriterion`
> > > interface [3].
> > >
> > > Here is an example using a convergence criterion data set in Scala API:
> > >
> > > ```
> > > package flink.sample
> > >
> > > import org.apache.flink.api.scala._
> > >
> > > import scala.util.Random
> > >
> > > object SampleApp extends App {
> > >   val env = ExecutionEnvironment.getExecutionEnvironment
> > >
> > >   val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
> > >
> > >   val result = data.iterateWithTermination(5000) { prev =>
> > >     // calculate sub solution
> > >     val rand = Random.nextDouble()
> > >     val subSolution = prev.map(_ * rand)
> > >
> > >     // calculate convergent condition
> > >     val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ >
> 8)
> > >
> > >     (subSolution, convergence)
> > >   }
> > >
> > >   result.print()
> > > }
> > > ```
> > >
> > > Regards,
> > > Chiwan Park
> > >
> > > [1]:
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
> > > [2]: iterateWithTermination method in
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
> > > [3]:
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29
> > >
> > > > On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dl...@gmail.com>
> > wrote:
> > > >
> > > > Thank you, all :)
> > > >
> > > > yes, that's my question. How do we construct such a loop with a
> > concrete
> > > > example?
> > > >
> > > > Let's take something nonsensical yet specific.
> > > >
> > > > Say, in samsara terms we do something like that :
> > > >
> > > > var avg = Double.PositiveInfinity
> > > > var drmA = ... (construct elsewhere)
> > > >
> > > >
> > > >
> > > > do {
> > > >   avg = drmA.colMeans.mean // average of col-wise means
> > > >   drmA = drmA - avg // elementwise subtract of average
> > > >
> > > > } while (avg > 1e-10)
> > > >
> > > > (which probably does not converge in reality).
> > > >
> > > > How would we implement that with native iterations in flink?
> > > >
> > > >
> > > >
> > > > On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <trohrmann@apache.org
> >
> > > wrote:
> > > >
> > > >> Hi Dmitriy,
> > > >>
> > > >> I’m not sure whether I’ve understood your question correctly, so
> > please
> > > >> correct me if I’m wrong.
> > > >>
> > > >> So you’re asking whether it is a problem that
> > > >>
> > > >> stat1 = A.map.reduce
> > > >> A = A.update.map(stat1)
> > > >>
> > > >> are executed on the same input data set A and whether we have to
> > cache A
> > > >> for that, right? I assume you’re worried that A is calculated twice.
> > > >>
> > > >> Since you don’t have a API call which triggers eager execution of
> the
> > > data
> > > >> flow, the map.reduce and map(stat1) call will only construct the
> data
> > > flow
> > > >> of your program. Both operators will depend on the result of A which
> > is
> > > >> only once calculated (when execute, collect or count is called) and
> > then
> > > >> sent to the map.reduce and map(stat1) operator.
> > > >>
> > > >> However, it is not recommended using an explicit loop to do
> iterative
> > > >> computations with Flink. The problem here is that you will basically
> > > unroll
> > > >> the loop and construct a long pipeline with the operations of each
> > > >> iterations. Once you execute this long pipeline you will face
> > > considerable
> > > >> memory fragmentation, because every operator will get a proportional
> > > >> fraction of the available memory assigned. Even worse, if you
> trigger
> > > the
> > > >> execution of your data flow to evaluate the convergence criterion,
> you
> > > will
> > > >> execute for each iteration the complete pipeline which has been
> built
> > > up so
> > > >> far. Thus, you’ll end up with a quadratic complexity in the number
> of
> > > >> iterations. Therefore, I would highly recommend using Flink’s built
> in
> > > >> support for native iterations which won’t suffer from this problem
> or
> > to
> > > >> materialize at least for every n iterations the intermediate result.
> > At
> > > the
> > > >> moment this would mean to write the data to some sink and then
> reading
> > > it
> > > >> from there again.
> > > >>
> > > >> I hope this answers your question. If not, then don’t hesitate to
> ask
> > me
> > > >> again.
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >> ​
> > > >>
> > > >> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
> > > >> theodoros.vasiloudis@gmail.com> wrote:
> > > >>
> > > >>> Hello Dmitriy,
> > > >>>
> > > >>> If I understood correctly what you are basically talking about
> > > modifying
> > > >> a
> > > >>> DataSet as you iterate over it.
> > > >>>
> > > >>> AFAIK this is currently not possible in Flink, and indeed it's a
> real
> > > >>> bottleneck for ML algorithms. This is the reason our current
> > > >>> SGD implementation does a pass over the whole dataset at each
> > > iteration,
> > > >>> since we cannot take a sample from the dataset
> > > >>> and iterate only over that (so it's not really stochastic).
> > > >>>
> > > >>> The relevant JIRA is here:
> > > >>> https://issues.apache.org/jira/browse/FLINK-2396
> > > >>>
> > > >>> I would love to start a discussion on how we can proceed to fix
> this.
> > > >>>
> > > >>> Regards,
> > > >>> Theodore
> > > >>>
> > > >>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <
> dlieu.7@gmail.com
> > >
> > > >>> wrote:
> > > >>>
> > > >>>> Hi,
> > > >>>>
> > > >>>> probably more of a question for Till:
> > > >>>>
> > > >>>> Imagine a common ML algorithm flow that runs until convergence.
> > > >>>>
> > > >>>> typical distributed flow would be something like that (e.g. GMM EM
> > > >> would
> > > >>> be
> > > >>>> exactly like that):
> > > >>>>
> > > >>>> A: input
> > > >>>>
> > > >>>> do {
> > > >>>>
> > > >>>>   stat1 = A.map.reduce
> > > >>>>   A = A.update-map(stat1)
> > > >>>>   conv = A.map.reduce
> > > >>>> } until conv > convThreshold
> > > >>>>
> > > >>>> There probably could be 1 map-reduce step originating on A to
> > compute
> > > >>> both
> > > >>>> convergence criteria statistics and udpate statistics in one step.
> > not
> > > >>> the
> > > >>>> point.
> > > >>>>
> > > >>>> The point is that update and map.reduce originate on the same
> > dataset
> > > >>>> intermittently.
> > > >>>>
> > > >>>> In spark we would normally commit A to a object tree cache so that
> > > data
> > > >>> is
> > > >>>> available to subsequent map passes without any I/O or
> serialization
> > > >>>> operations, thus insuring high rate of iterations.
> > > >>>>
> > > >>>> We observe the same pattern pretty much everywhere. clustering,
> > > >>>> probabilistic algorithms, even batch gradient descent of quasi
> > newton
> > > >>>> algorithms fitting.
> > > >>>>
> > > >>>> How do we do something like that, for example, in FlinkML?
> > > >>>>
> > > >>>> Thoughts?
> > > >>>>
> > > >>>> thanks.
> > > >>>>
> > > >>>> -Dmitriy
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Re: a typical ML algorithm flow

Posted by Theodore Vasiloudis <th...@gmail.com>.
@Shannon

What you are talking about is available for the DataSet API through the
iterateWithTermination function. See the API docs
<https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#iteration-operators>
and Iterations page
<https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html>
.

On Tue, Mar 29, 2016 at 3:14 PM, Shannon Quinn <sq...@gatech.edu> wrote:

> Apologies for hijacking, but this thread hits right at my last message to
> this list (looking to implement native iterations in the PyFlink API).
>
> I'm particularly interested in custom convergence criteria, often centered
> around measuring some sort of squared loss and checking if it falls below a
> threshold. Is this what you mean by a "dynamic convergence criterion"?
> Certainly having a max-iterations cut-off as a "just in case" measure is a
> good thing, but I'm curious if there's a native way of using a
> threshold-based criterion that doesn't involve simply iterating 10 or so
> times, checking the criterion, and iterating some more.
>
> Shannon
>
>
> On 3/29/16 5:53 AM, Till Rohrmann wrote:
>
>> Hi,
>>
>> Chiwan’s example is perfectly fine and it should also work with general EM
>> algorithms. Moreover, it is the recommended way how to implement
>> iterations
>> with Flink. The iterateWithTermination API call generates a lazily
>> evaluated data flow with an iteration operator. This plan will only be
>> executed when you call env.execute, collect or count which depends on this
>> data flow. In the example it would be triggered by result.print. You can
>> also take a look at the KMeans implementation of Flink. It does not use a
>> dynamic convergence criterion but it could easily be added.
>>
>> If you really need to trigger the execution of the data flow for each
>> iteration (e.g. because you have different data flows depending on the
>> result), then you should persist the intermediate result every n
>> iteration.
>> Otherwise you will over and over re-trigger the execution of previous
>> operators.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Mar 29, 2016 at 1:26 AM, Dmitriy Lyubimov <dl...@gmail.com>
>> wrote:
>>
>> Thanks Chiwan.
>>>
>>> I think this example still creates a lazy-evaluated plan. And if i need
>>> to
>>> collect statistics to front end (and use it in subsequent iteration
>>> evaluation) as my example with computing column-wise averages suggests?
>>>
>>> problem generally is, what if I need to eagerly evaluate the statistics
>>> inside the iteration in order to proceed with further computations (and
>>> even plan construction). typically, that would be result of M-step in EM
>>> algorithm.
>>>
>>> On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <ch...@apache.org>
>>> wrote:
>>>
>>> Hi Dmitriy,
>>>>
>>>> I think you can implement it with iterative API with custom convergence
>>>> criterion. You can express the convergence criterion by two methods. One
>>>>
>>> is
>>>
>>>> using a convergence criterion data set [1][2] and the other is
>>>>
>>> registering
>>>
>>>> an aggregator with custom implementation of `ConvergenceCriterion`
>>>> interface [3].
>>>>
>>>> Here is an example using a convergence criterion data set in Scala API:
>>>>
>>>> ```
>>>> package flink.sample
>>>>
>>>> import org.apache.flink.api.scala._
>>>>
>>>> import scala.util.Random
>>>>
>>>> object SampleApp extends App {
>>>>    val env = ExecutionEnvironment.getExecutionEnvironment
>>>>
>>>>    val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>>>
>>>>    val result = data.iterateWithTermination(5000) { prev =>
>>>>      // calculate sub solution
>>>>      val rand = Random.nextDouble()
>>>>      val subSolution = prev.map(_ * rand)
>>>>
>>>>      // calculate convergent condition
>>>>      val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ >
>>>> 8)
>>>>
>>>>      (subSolution, convergence)
>>>>    }
>>>>
>>>>    result.print()
>>>> }
>>>> ```
>>>>
>>>> Regards,
>>>> Chiwan Park
>>>>
>>>> [1]:
>>>>
>>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
>>>
>>>> [2]: iterateWithTermination method in
>>>>
>>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
>>>
>>>> [3]:
>>>>
>>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29
>>>
>>>> On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dl...@gmail.com>
>>>>>
>>>> wrote:
>>>
>>>> Thank you, all :)
>>>>>
>>>>> yes, that's my question. How do we construct such a loop with a
>>>>>
>>>> concrete
>>>
>>>> example?
>>>>>
>>>>> Let's take something nonsensical yet specific.
>>>>>
>>>>> Say, in samsara terms we do something like that :
>>>>>
>>>>> var avg = Double.PositiveInfinity
>>>>> var drmA = ... (construct elsewhere)
>>>>>
>>>>>
>>>>>
>>>>> do {
>>>>>    avg = drmA.colMeans.mean // average of col-wise means
>>>>>    drmA = drmA - avg // elementwise subtract of average
>>>>>
>>>>> } while (avg > 1e-10)
>>>>>
>>>>> (which probably does not converge in reality).
>>>>>
>>>>> How would we implement that with native iterations in flink?
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <tr...@apache.org>
>>>>>
>>>> wrote:
>>>>
>>>>> Hi Dmitriy,
>>>>>>
>>>>>> I’m not sure whether I’ve understood your question correctly, so
>>>>>>
>>>>> please
>>>
>>>> correct me if I’m wrong.
>>>>>>
>>>>>> So you’re asking whether it is a problem that
>>>>>>
>>>>>> stat1 = A.map.reduce
>>>>>> A = A.update.map(stat1)
>>>>>>
>>>>>> are executed on the same input data set A and whether we have to
>>>>>>
>>>>> cache A
>>>
>>>> for that, right? I assume you’re worried that A is calculated twice.
>>>>>>
>>>>>> Since you don’t have a API call which triggers eager execution of the
>>>>>>
>>>>> data
>>>>
>>>>> flow, the map.reduce and map(stat1) call will only construct the data
>>>>>>
>>>>> flow
>>>>
>>>>> of your program. Both operators will depend on the result of A which
>>>>>>
>>>>> is
>>>
>>>> only once calculated (when execute, collect or count is called) and
>>>>>>
>>>>> then
>>>
>>>> sent to the map.reduce and map(stat1) operator.
>>>>>>
>>>>>> However, it is not recommended using an explicit loop to do iterative
>>>>>> computations with Flink. The problem here is that you will basically
>>>>>>
>>>>> unroll
>>>>
>>>>> the loop and construct a long pipeline with the operations of each
>>>>>> iterations. Once you execute this long pipeline you will face
>>>>>>
>>>>> considerable
>>>>
>>>>> memory fragmentation, because every operator will get a proportional
>>>>>> fraction of the available memory assigned. Even worse, if you trigger
>>>>>>
>>>>> the
>>>>
>>>>> execution of your data flow to evaluate the convergence criterion, you
>>>>>>
>>>>> will
>>>>
>>>>> execute for each iteration the complete pipeline which has been built
>>>>>>
>>>>> up so
>>>>
>>>>> far. Thus, you’ll end up with a quadratic complexity in the number of
>>>>>> iterations. Therefore, I would highly recommend using Flink’s built in
>>>>>> support for native iterations which won’t suffer from this problem or
>>>>>>
>>>>> to
>>>
>>>> materialize at least for every n iterations the intermediate result.
>>>>>>
>>>>> At
>>>
>>>> the
>>>>
>>>>> moment this would mean to write the data to some sink and then reading
>>>>>>
>>>>> it
>>>>
>>>>> from there again.
>>>>>>
>>>>>> I hope this answers your question. If not, then don’t hesitate to ask
>>>>>>
>>>>> me
>>>
>>>> again.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>> ​
>>>>>>
>>>>>> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
>>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>>
>>>>>> Hello Dmitriy,
>>>>>>>
>>>>>>> If I understood correctly what you are basically talking about
>>>>>>>
>>>>>> modifying
>>>>
>>>>> a
>>>>>>
>>>>>>> DataSet as you iterate over it.
>>>>>>>
>>>>>>> AFAIK this is currently not possible in Flink, and indeed it's a real
>>>>>>> bottleneck for ML algorithms. This is the reason our current
>>>>>>> SGD implementation does a pass over the whole dataset at each
>>>>>>>
>>>>>> iteration,
>>>>
>>>>> since we cannot take a sample from the dataset
>>>>>>> and iterate only over that (so it's not really stochastic).
>>>>>>>
>>>>>>> The relevant JIRA is here:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-2396
>>>>>>>
>>>>>>> I would love to start a discussion on how we can proceed to fix this.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Theodore
>>>>>>>
>>>>>>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlieu.7@gmail.com
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>>
>>>>>>>> probably more of a question for Till:
>>>>>>>>
>>>>>>>> Imagine a common ML algorithm flow that runs until convergence.
>>>>>>>>
>>>>>>>> typical distributed flow would be something like that (e.g. GMM EM
>>>>>>>>
>>>>>>> would
>>>>>>
>>>>>>> be
>>>>>>>
>>>>>>>> exactly like that):
>>>>>>>>
>>>>>>>> A: input
>>>>>>>>
>>>>>>>> do {
>>>>>>>>
>>>>>>>>    stat1 = A.map.reduce
>>>>>>>>    A = A.update-map(stat1)
>>>>>>>>    conv = A.map.reduce
>>>>>>>> } until conv > convThreshold
>>>>>>>>
>>>>>>>> There probably could be 1 map-reduce step originating on A to
>>>>>>>>
>>>>>>> compute
>>>
>>>> both
>>>>>>>
>>>>>>>> convergence criteria statistics and udpate statistics in one step.
>>>>>>>>
>>>>>>> not
>>>
>>>> the
>>>>>>>
>>>>>>>> point.
>>>>>>>>
>>>>>>>> The point is that update and map.reduce originate on the same
>>>>>>>>
>>>>>>> dataset
>>>
>>>> intermittently.
>>>>>>>>
>>>>>>>> In spark we would normally commit A to a object tree cache so that
>>>>>>>>
>>>>>>> data
>>>>
>>>>> is
>>>>>>>
>>>>>>>> available to subsequent map passes without any I/O or serialization
>>>>>>>> operations, thus insuring high rate of iterations.
>>>>>>>>
>>>>>>>> We observe the same pattern pretty much everywhere. clustering,
>>>>>>>> probabilistic algorithms, even batch gradient descent of quasi
>>>>>>>>
>>>>>>> newton
>>>
>>>> algorithms fitting.
>>>>>>>>
>>>>>>>> How do we do something like that, for example, in FlinkML?
>>>>>>>>
>>>>>>>> Thoughts?
>>>>>>>>
>>>>>>>> thanks.
>>>>>>>>
>>>>>>>> -Dmitriy
>>>>>>>>
>>>>>>>>
>>>>
>

Re: a typical ML algorithm flow

Posted by Shannon Quinn <sq...@gatech.edu>.
Apologies for hijacking, but this thread hits right at my last message 
to this list (looking to implement native iterations in the PyFlink API).

I'm particularly interested in custom convergence criteria, often 
centered around measuring some sort of squared loss and checking if it 
falls below a threshold. Is this what you mean by a "dynamic convergence 
criterion"? Certainly having a max-iterations cut-off as a "just in 
case" measure is a good thing, but I'm curious if there's a native way 
of using a threshold-based criterion that doesn't involve simply 
iterating 10 or so times, checking the criterion, and iterating some more.

Shannon

On 3/29/16 5:53 AM, Till Rohrmann wrote:
> Hi,
>
> Chiwan’s example is perfectly fine and it should also work with general EM
> algorithms. Moreover, it is the recommended way how to implement iterations
> with Flink. The iterateWithTermination API call generates a lazily
> evaluated data flow with an iteration operator. This plan will only be
> executed when you call env.execute, collect or count which depends on this
> data flow. In the example it would be triggered by result.print. You can
> also take a look at the KMeans implementation of Flink. It does not use a
> dynamic convergence criterion but it could easily be added.
>
> If you really need to trigger the execution of the data flow for each
> iteration (e.g. because you have different data flows depending on the
> result), then you should persist the intermediate result every n iteration.
> Otherwise you will over and over re-trigger the execution of previous
> operators.
>
> Cheers,
> Till
> ​
>
> On Tue, Mar 29, 2016 at 1:26 AM, Dmitriy Lyubimov <dl...@gmail.com> wrote:
>
>> Thanks Chiwan.
>>
>> I think this example still creates a lazy-evaluated plan. And if i need to
>> collect statistics to front end (and use it in subsequent iteration
>> evaluation) as my example with computing column-wise averages suggests?
>>
>> problem generally is, what if I need to eagerly evaluate the statistics
>> inside the iteration in order to proceed with further computations (and
>> even plan construction). typically, that would be result of M-step in EM
>> algorithm.
>>
>> On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <ch...@apache.org>
>> wrote:
>>
>>> Hi Dmitriy,
>>>
>>> I think you can implement it with iterative API with custom convergence
>>> criterion. You can express the convergence criterion by two methods. One
>> is
>>> using a convergence criterion data set [1][2] and the other is
>> registering
>>> an aggregator with custom implementation of `ConvergenceCriterion`
>>> interface [3].
>>>
>>> Here is an example using a convergence criterion data set in Scala API:
>>>
>>> ```
>>> package flink.sample
>>>
>>> import org.apache.flink.api.scala._
>>>
>>> import scala.util.Random
>>>
>>> object SampleApp extends App {
>>>    val env = ExecutionEnvironment.getExecutionEnvironment
>>>
>>>    val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>>
>>>    val result = data.iterateWithTermination(5000) { prev =>
>>>      // calculate sub solution
>>>      val rand = Random.nextDouble()
>>>      val subSolution = prev.map(_ * rand)
>>>
>>>      // calculate convergent condition
>>>      val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ > 8)
>>>
>>>      (subSolution, convergence)
>>>    }
>>>
>>>    result.print()
>>> }
>>> ```
>>>
>>> Regards,
>>> Chiwan Park
>>>
>>> [1]:
>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
>>> [2]: iterateWithTermination method in
>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
>>> [3]:
>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29
>>>> On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dl...@gmail.com>
>> wrote:
>>>> Thank you, all :)
>>>>
>>>> yes, that's my question. How do we construct such a loop with a
>> concrete
>>>> example?
>>>>
>>>> Let's take something nonsensical yet specific.
>>>>
>>>> Say, in samsara terms we do something like that :
>>>>
>>>> var avg = Double.PositiveInfinity
>>>> var drmA = ... (construct elsewhere)
>>>>
>>>>
>>>>
>>>> do {
>>>>    avg = drmA.colMeans.mean // average of col-wise means
>>>>    drmA = drmA - avg // elementwise subtract of average
>>>>
>>>> } while (avg > 1e-10)
>>>>
>>>> (which probably does not converge in reality).
>>>>
>>>> How would we implement that with native iterations in flink?
>>>>
>>>>
>>>>
>>>> On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>>> Hi Dmitriy,
>>>>>
>>>>> I’m not sure whether I’ve understood your question correctly, so
>> please
>>>>> correct me if I’m wrong.
>>>>>
>>>>> So you’re asking whether it is a problem that
>>>>>
>>>>> stat1 = A.map.reduce
>>>>> A = A.update.map(stat1)
>>>>>
>>>>> are executed on the same input data set A and whether we have to
>> cache A
>>>>> for that, right? I assume you’re worried that A is calculated twice.
>>>>>
>>>>> Since you don’t have a API call which triggers eager execution of the
>>> data
>>>>> flow, the map.reduce and map(stat1) call will only construct the data
>>> flow
>>>>> of your program. Both operators will depend on the result of A which
>> is
>>>>> only once calculated (when execute, collect or count is called) and
>> then
>>>>> sent to the map.reduce and map(stat1) operator.
>>>>>
>>>>> However, it is not recommended using an explicit loop to do iterative
>>>>> computations with Flink. The problem here is that you will basically
>>> unroll
>>>>> the loop and construct a long pipeline with the operations of each
>>>>> iterations. Once you execute this long pipeline you will face
>>> considerable
>>>>> memory fragmentation, because every operator will get a proportional
>>>>> fraction of the available memory assigned. Even worse, if you trigger
>>> the
>>>>> execution of your data flow to evaluate the convergence criterion, you
>>> will
>>>>> execute for each iteration the complete pipeline which has been built
>>> up so
>>>>> far. Thus, you’ll end up with a quadratic complexity in the number of
>>>>> iterations. Therefore, I would highly recommend using Flink’s built in
>>>>> support for native iterations which won’t suffer from this problem or
>> to
>>>>> materialize at least for every n iterations the intermediate result.
>> At
>>> the
>>>>> moment this would mean to write the data to some sink and then reading
>>> it
>>>>> from there again.
>>>>>
>>>>> I hope this answers your question. If not, then don’t hesitate to ask
>> me
>>>>> again.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>> ​
>>>>>
>>>>> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>
>>>>>> Hello Dmitriy,
>>>>>>
>>>>>> If I understood correctly what you are basically talking about
>>> modifying
>>>>> a
>>>>>> DataSet as you iterate over it.
>>>>>>
>>>>>> AFAIK this is currently not possible in Flink, and indeed it's a real
>>>>>> bottleneck for ML algorithms. This is the reason our current
>>>>>> SGD implementation does a pass over the whole dataset at each
>>> iteration,
>>>>>> since we cannot take a sample from the dataset
>>>>>> and iterate only over that (so it's not really stochastic).
>>>>>>
>>>>>> The relevant JIRA is here:
>>>>>> https://issues.apache.org/jira/browse/FLINK-2396
>>>>>>
>>>>>> I would love to start a discussion on how we can proceed to fix this.
>>>>>>
>>>>>> Regards,
>>>>>> Theodore
>>>>>>
>>>>>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlieu.7@gmail.com
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> probably more of a question for Till:
>>>>>>>
>>>>>>> Imagine a common ML algorithm flow that runs until convergence.
>>>>>>>
>>>>>>> typical distributed flow would be something like that (e.g. GMM EM
>>>>> would
>>>>>> be
>>>>>>> exactly like that):
>>>>>>>
>>>>>>> A: input
>>>>>>>
>>>>>>> do {
>>>>>>>
>>>>>>>    stat1 = A.map.reduce
>>>>>>>    A = A.update-map(stat1)
>>>>>>>    conv = A.map.reduce
>>>>>>> } until conv > convThreshold
>>>>>>>
>>>>>>> There probably could be 1 map-reduce step originating on A to
>> compute
>>>>>> both
>>>>>>> convergence criteria statistics and udpate statistics in one step.
>> not
>>>>>> the
>>>>>>> point.
>>>>>>>
>>>>>>> The point is that update and map.reduce originate on the same
>> dataset
>>>>>>> intermittently.
>>>>>>>
>>>>>>> In spark we would normally commit A to a object tree cache so that
>>> data
>>>>>> is
>>>>>>> available to subsequent map passes without any I/O or serialization
>>>>>>> operations, thus insuring high rate of iterations.
>>>>>>>
>>>>>>> We observe the same pattern pretty much everywhere. clustering,
>>>>>>> probabilistic algorithms, even batch gradient descent of quasi
>> newton
>>>>>>> algorithms fitting.
>>>>>>>
>>>>>>> How do we do something like that, for example, in FlinkML?
>>>>>>>
>>>>>>> Thoughts?
>>>>>>>
>>>>>>> thanks.
>>>>>>>
>>>>>>> -Dmitriy
>>>>>>>
>>>


Re: a typical ML algorithm flow

Posted by Till Rohrmann <tr...@apache.org>.
Hi,

Chiwan’s example is perfectly fine and it should also work with general EM
algorithms. Moreover, it is the recommended way how to implement iterations
with Flink. The iterateWithTermination API call generates a lazily
evaluated data flow with an iteration operator. This plan will only be
executed when you call env.execute, collect or count which depends on this
data flow. In the example it would be triggered by result.print. You can
also take a look at the KMeans implementation of Flink. It does not use a
dynamic convergence criterion but it could easily be added.

If you really need to trigger the execution of the data flow for each
iteration (e.g. because you have different data flows depending on the
result), then you should persist the intermediate result every n iteration.
Otherwise you will over and over re-trigger the execution of previous
operators.

Cheers,
Till
​

On Tue, Mar 29, 2016 at 1:26 AM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> Thanks Chiwan.
>
> I think this example still creates a lazy-evaluated plan. And if i need to
> collect statistics to front end (and use it in subsequent iteration
> evaluation) as my example with computing column-wise averages suggests?
>
> problem generally is, what if I need to eagerly evaluate the statistics
> inside the iteration in order to proceed with further computations (and
> even plan construction). typically, that would be result of M-step in EM
> algorithm.
>
> On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <ch...@apache.org>
> wrote:
>
> > Hi Dmitriy,
> >
> > I think you can implement it with iterative API with custom convergence
> > criterion. You can express the convergence criterion by two methods. One
> is
> > using a convergence criterion data set [1][2] and the other is
> registering
> > an aggregator with custom implementation of `ConvergenceCriterion`
> > interface [3].
> >
> > Here is an example using a convergence criterion data set in Scala API:
> >
> > ```
> > package flink.sample
> >
> > import org.apache.flink.api.scala._
> >
> > import scala.util.Random
> >
> > object SampleApp extends App {
> >   val env = ExecutionEnvironment.getExecutionEnvironment
> >
> >   val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
> >
> >   val result = data.iterateWithTermination(5000) { prev =>
> >     // calculate sub solution
> >     val rand = Random.nextDouble()
> >     val subSolution = prev.map(_ * rand)
> >
> >     // calculate convergent condition
> >     val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ > 8)
> >
> >     (subSolution, convergence)
> >   }
> >
> >   result.print()
> > }
> > ```
> >
> > Regards,
> > Chiwan Park
> >
> > [1]:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
> > [2]: iterateWithTermination method in
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
> > [3]:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29
> >
> > > On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
> > >
> > > Thank you, all :)
> > >
> > > yes, that's my question. How do we construct such a loop with a
> concrete
> > > example?
> > >
> > > Let's take something nonsensical yet specific.
> > >
> > > Say, in samsara terms we do something like that :
> > >
> > > var avg = Double.PositiveInfinity
> > > var drmA = ... (construct elsewhere)
> > >
> > >
> > >
> > > do {
> > >   avg = drmA.colMeans.mean // average of col-wise means
> > >   drmA = drmA - avg // elementwise subtract of average
> > >
> > > } while (avg > 1e-10)
> > >
> > > (which probably does not converge in reality).
> > >
> > > How would we implement that with native iterations in flink?
> > >
> > >
> > >
> > > On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <tr...@apache.org>
> > wrote:
> > >
> > >> Hi Dmitriy,
> > >>
> > >> I’m not sure whether I’ve understood your question correctly, so
> please
> > >> correct me if I’m wrong.
> > >>
> > >> So you’re asking whether it is a problem that
> > >>
> > >> stat1 = A.map.reduce
> > >> A = A.update.map(stat1)
> > >>
> > >> are executed on the same input data set A and whether we have to
> cache A
> > >> for that, right? I assume you’re worried that A is calculated twice.
> > >>
> > >> Since you don’t have a API call which triggers eager execution of the
> > data
> > >> flow, the map.reduce and map(stat1) call will only construct the data
> > flow
> > >> of your program. Both operators will depend on the result of A which
> is
> > >> only once calculated (when execute, collect or count is called) and
> then
> > >> sent to the map.reduce and map(stat1) operator.
> > >>
> > >> However, it is not recommended using an explicit loop to do iterative
> > >> computations with Flink. The problem here is that you will basically
> > unroll
> > >> the loop and construct a long pipeline with the operations of each
> > >> iterations. Once you execute this long pipeline you will face
> > considerable
> > >> memory fragmentation, because every operator will get a proportional
> > >> fraction of the available memory assigned. Even worse, if you trigger
> > the
> > >> execution of your data flow to evaluate the convergence criterion, you
> > will
> > >> execute for each iteration the complete pipeline which has been built
> > up so
> > >> far. Thus, you’ll end up with a quadratic complexity in the number of
> > >> iterations. Therefore, I would highly recommend using Flink’s built in
> > >> support for native iterations which won’t suffer from this problem or
> to
> > >> materialize at least for every n iterations the intermediate result.
> At
> > the
> > >> moment this would mean to write the data to some sink and then reading
> > it
> > >> from there again.
> > >>
> > >> I hope this answers your question. If not, then don’t hesitate to ask
> me
> > >> again.
> > >>
> > >> Cheers,
> > >> Till
> > >> ​
> > >>
> > >> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
> > >> theodoros.vasiloudis@gmail.com> wrote:
> > >>
> > >>> Hello Dmitriy,
> > >>>
> > >>> If I understood correctly what you are basically talking about
> > modifying
> > >> a
> > >>> DataSet as you iterate over it.
> > >>>
> > >>> AFAIK this is currently not possible in Flink, and indeed it's a real
> > >>> bottleneck for ML algorithms. This is the reason our current
> > >>> SGD implementation does a pass over the whole dataset at each
> > iteration,
> > >>> since we cannot take a sample from the dataset
> > >>> and iterate only over that (so it's not really stochastic).
> > >>>
> > >>> The relevant JIRA is here:
> > >>> https://issues.apache.org/jira/browse/FLINK-2396
> > >>>
> > >>> I would love to start a discussion on how we can proceed to fix this.
> > >>>
> > >>> Regards,
> > >>> Theodore
> > >>>
> > >>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlieu.7@gmail.com
> >
> > >>> wrote:
> > >>>
> > >>>> Hi,
> > >>>>
> > >>>> probably more of a question for Till:
> > >>>>
> > >>>> Imagine a common ML algorithm flow that runs until convergence.
> > >>>>
> > >>>> typical distributed flow would be something like that (e.g. GMM EM
> > >> would
> > >>> be
> > >>>> exactly like that):
> > >>>>
> > >>>> A: input
> > >>>>
> > >>>> do {
> > >>>>
> > >>>>   stat1 = A.map.reduce
> > >>>>   A = A.update-map(stat1)
> > >>>>   conv = A.map.reduce
> > >>>> } until conv > convThreshold
> > >>>>
> > >>>> There probably could be 1 map-reduce step originating on A to
> compute
> > >>> both
> > >>>> convergence criteria statistics and udpate statistics in one step.
> not
> > >>> the
> > >>>> point.
> > >>>>
> > >>>> The point is that update and map.reduce originate on the same
> dataset
> > >>>> intermittently.
> > >>>>
> > >>>> In spark we would normally commit A to a object tree cache so that
> > data
> > >>> is
> > >>>> available to subsequent map passes without any I/O or serialization
> > >>>> operations, thus insuring high rate of iterations.
> > >>>>
> > >>>> We observe the same pattern pretty much everywhere. clustering,
> > >>>> probabilistic algorithms, even batch gradient descent of quasi
> newton
> > >>>> algorithms fitting.
> > >>>>
> > >>>> How do we do something like that, for example, in FlinkML?
> > >>>>
> > >>>> Thoughts?
> > >>>>
> > >>>> thanks.
> > >>>>
> > >>>> -Dmitriy
> > >>>>
> > >>>
> > >>
> >
> >
>

Re: a typical ML algorithm flow

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
Thanks Chiwan.

I think this example still creates a lazy-evaluated plan. And if i need to
collect statistics to front end (and use it in subsequent iteration
evaluation) as my example with computing column-wise averages suggests?

problem generally is, what if I need to eagerly evaluate the statistics
inside the iteration in order to proceed with further computations (and
even plan construction). typically, that would be result of M-step in EM
algorithm.

On Sun, Mar 27, 2016 at 3:26 AM, Chiwan Park <ch...@apache.org> wrote:

> Hi Dmitriy,
>
> I think you can implement it with iterative API with custom convergence
> criterion. You can express the convergence criterion by two methods. One is
> using a convergence criterion data set [1][2] and the other is registering
> an aggregator with custom implementation of `ConvergenceCriterion`
> interface [3].
>
> Here is an example using a convergence criterion data set in Scala API:
>
> ```
> package flink.sample
>
> import org.apache.flink.api.scala._
>
> import scala.util.Random
>
> object SampleApp extends App {
>   val env = ExecutionEnvironment.getExecutionEnvironment
>
>   val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>
>   val result = data.iterateWithTermination(5000) { prev =>
>     // calculate sub solution
>     val rand = Random.nextDouble()
>     val subSolution = prev.map(_ * rand)
>
>     // calculate convergent condition
>     val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ > 8)
>
>     (subSolution, convergence)
>   }
>
>   result.print()
> }
> ```
>
> Regards,
> Chiwan Park
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
> [2]: iterateWithTermination method in
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
> [3]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29
>
> > On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dl...@gmail.com> wrote:
> >
> > Thank you, all :)
> >
> > yes, that's my question. How do we construct such a loop with a concrete
> > example?
> >
> > Let's take something nonsensical yet specific.
> >
> > Say, in samsara terms we do something like that :
> >
> > var avg = Double.PositiveInfinity
> > var drmA = ... (construct elsewhere)
> >
> >
> >
> > do {
> >   avg = drmA.colMeans.mean // average of col-wise means
> >   drmA = drmA - avg // elementwise subtract of average
> >
> > } while (avg > 1e-10)
> >
> > (which probably does not converge in reality).
> >
> > How would we implement that with native iterations in flink?
> >
> >
> >
> > On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <tr...@apache.org>
> wrote:
> >
> >> Hi Dmitriy,
> >>
> >> I’m not sure whether I’ve understood your question correctly, so please
> >> correct me if I’m wrong.
> >>
> >> So you’re asking whether it is a problem that
> >>
> >> stat1 = A.map.reduce
> >> A = A.update.map(stat1)
> >>
> >> are executed on the same input data set A and whether we have to cache A
> >> for that, right? I assume you’re worried that A is calculated twice.
> >>
> >> Since you don’t have a API call which triggers eager execution of the
> data
> >> flow, the map.reduce and map(stat1) call will only construct the data
> flow
> >> of your program. Both operators will depend on the result of A which is
> >> only once calculated (when execute, collect or count is called) and then
> >> sent to the map.reduce and map(stat1) operator.
> >>
> >> However, it is not recommended using an explicit loop to do iterative
> >> computations with Flink. The problem here is that you will basically
> unroll
> >> the loop and construct a long pipeline with the operations of each
> >> iterations. Once you execute this long pipeline you will face
> considerable
> >> memory fragmentation, because every operator will get a proportional
> >> fraction of the available memory assigned. Even worse, if you trigger
> the
> >> execution of your data flow to evaluate the convergence criterion, you
> will
> >> execute for each iteration the complete pipeline which has been built
> up so
> >> far. Thus, you’ll end up with a quadratic complexity in the number of
> >> iterations. Therefore, I would highly recommend using Flink’s built in
> >> support for native iterations which won’t suffer from this problem or to
> >> materialize at least for every n iterations the intermediate result. At
> the
> >> moment this would mean to write the data to some sink and then reading
> it
> >> from there again.
> >>
> >> I hope this answers your question. If not, then don’t hesitate to ask me
> >> again.
> >>
> >> Cheers,
> >> Till
> >> ​
> >>
> >> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
> >> theodoros.vasiloudis@gmail.com> wrote:
> >>
> >>> Hello Dmitriy,
> >>>
> >>> If I understood correctly what you are basically talking about
> modifying
> >> a
> >>> DataSet as you iterate over it.
> >>>
> >>> AFAIK this is currently not possible in Flink, and indeed it's a real
> >>> bottleneck for ML algorithms. This is the reason our current
> >>> SGD implementation does a pass over the whole dataset at each
> iteration,
> >>> since we cannot take a sample from the dataset
> >>> and iterate only over that (so it's not really stochastic).
> >>>
> >>> The relevant JIRA is here:
> >>> https://issues.apache.org/jira/browse/FLINK-2396
> >>>
> >>> I would love to start a discussion on how we can proceed to fix this.
> >>>
> >>> Regards,
> >>> Theodore
> >>>
> >>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dl...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> probably more of a question for Till:
> >>>>
> >>>> Imagine a common ML algorithm flow that runs until convergence.
> >>>>
> >>>> typical distributed flow would be something like that (e.g. GMM EM
> >> would
> >>> be
> >>>> exactly like that):
> >>>>
> >>>> A: input
> >>>>
> >>>> do {
> >>>>
> >>>>   stat1 = A.map.reduce
> >>>>   A = A.update-map(stat1)
> >>>>   conv = A.map.reduce
> >>>> } until conv > convThreshold
> >>>>
> >>>> There probably could be 1 map-reduce step originating on A to compute
> >>> both
> >>>> convergence criteria statistics and udpate statistics in one step. not
> >>> the
> >>>> point.
> >>>>
> >>>> The point is that update and map.reduce originate on the same dataset
> >>>> intermittently.
> >>>>
> >>>> In spark we would normally commit A to a object tree cache so that
> data
> >>> is
> >>>> available to subsequent map passes without any I/O or serialization
> >>>> operations, thus insuring high rate of iterations.
> >>>>
> >>>> We observe the same pattern pretty much everywhere. clustering,
> >>>> probabilistic algorithms, even batch gradient descent of quasi newton
> >>>> algorithms fitting.
> >>>>
> >>>> How do we do something like that, for example, in FlinkML?
> >>>>
> >>>> Thoughts?
> >>>>
> >>>> thanks.
> >>>>
> >>>> -Dmitriy
> >>>>
> >>>
> >>
>
>

Re: a typical ML algorithm flow

Posted by Chiwan Park <ch...@apache.org>.
Hi Dmitriy,

I think you can implement it with iterative API with custom convergence criterion. You can express the convergence criterion by two methods. One is using a convergence criterion data set [1][2] and the other is registering an aggregator with custom implementation of `ConvergenceCriterion` interface [3].

Here is an example using a convergence criterion data set in Scala API:

```
package flink.sample

import org.apache.flink.api.scala._

import scala.util.Random

object SampleApp extends App {
  val env = ExecutionEnvironment.getExecutionEnvironment

  val data = env.fromElements[Double](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  val result = data.iterateWithTermination(5000) { prev =>
    // calculate sub solution
    val rand = Random.nextDouble()
    val subSolution = prev.map(_ * rand)

    // calculate convergent condition
    val convergence = subSolution.reduce(_ + _).map(_ / 10).filter(_ > 8)

    (subSolution, convergence)
  }

  result.print()
}
```

Regards,
Chiwan Park

[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#closeWith%28org.apache.flink.api.java.DataSet,%20org.apache.flink.api.java.DataSet%29
[2]: iterateWithTermination method in https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/scala/index.html#org.apache.flink.api.scala.DataSet
[3]: https://ci.apache.org/projects/flink/flink-docs-release-1.0/api/java/org/apache/flink/api/java/operators/IterativeDataSet.html#registerAggregationConvergenceCriterion%28java.lang.String,%20org.apache.flink.api.common.aggregators.Aggregator,%20org.apache.flink.api.common.aggregators.ConvergenceCriterion%29

> On Mar 26, 2016, at 2:51 AM, Dmitriy Lyubimov <dl...@gmail.com> wrote:
> 
> Thank you, all :)
> 
> yes, that's my question. How do we construct such a loop with a concrete
> example?
> 
> Let's take something nonsensical yet specific.
> 
> Say, in samsara terms we do something like that :
> 
> var avg = Double.PositiveInfinity
> var drmA = ... (construct elsewhere)
> 
> 
> 
> do {
>   avg = drmA.colMeans.mean // average of col-wise means
>   drmA = drmA - avg // elementwise subtract of average
> 
> } while (avg > 1e-10)
> 
> (which probably does not converge in reality).
> 
> How would we implement that with native iterations in flink?
> 
> 
> 
> On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <tr...@apache.org> wrote:
> 
>> Hi Dmitriy,
>> 
>> I’m not sure whether I’ve understood your question correctly, so please
>> correct me if I’m wrong.
>> 
>> So you’re asking whether it is a problem that
>> 
>> stat1 = A.map.reduce
>> A = A.update.map(stat1)
>> 
>> are executed on the same input data set A and whether we have to cache A
>> for that, right? I assume you’re worried that A is calculated twice.
>> 
>> Since you don’t have a API call which triggers eager execution of the data
>> flow, the map.reduce and map(stat1) call will only construct the data flow
>> of your program. Both operators will depend on the result of A which is
>> only once calculated (when execute, collect or count is called) and then
>> sent to the map.reduce and map(stat1) operator.
>> 
>> However, it is not recommended using an explicit loop to do iterative
>> computations with Flink. The problem here is that you will basically unroll
>> the loop and construct a long pipeline with the operations of each
>> iterations. Once you execute this long pipeline you will face considerable
>> memory fragmentation, because every operator will get a proportional
>> fraction of the available memory assigned. Even worse, if you trigger the
>> execution of your data flow to evaluate the convergence criterion, you will
>> execute for each iteration the complete pipeline which has been built up so
>> far. Thus, you’ll end up with a quadratic complexity in the number of
>> iterations. Therefore, I would highly recommend using Flink’s built in
>> support for native iterations which won’t suffer from this problem or to
>> materialize at least for every n iterations the intermediate result. At the
>> moment this would mean to write the data to some sink and then reading it
>> from there again.
>> 
>> I hope this answers your question. If not, then don’t hesitate to ask me
>> again.
>> 
>> Cheers,
>> Till
>> ​
>> 
>> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
>> theodoros.vasiloudis@gmail.com> wrote:
>> 
>>> Hello Dmitriy,
>>> 
>>> If I understood correctly what you are basically talking about modifying
>> a
>>> DataSet as you iterate over it.
>>> 
>>> AFAIK this is currently not possible in Flink, and indeed it's a real
>>> bottleneck for ML algorithms. This is the reason our current
>>> SGD implementation does a pass over the whole dataset at each iteration,
>>> since we cannot take a sample from the dataset
>>> and iterate only over that (so it's not really stochastic).
>>> 
>>> The relevant JIRA is here:
>>> https://issues.apache.org/jira/browse/FLINK-2396
>>> 
>>> I would love to start a discussion on how we can proceed to fix this.
>>> 
>>> Regards,
>>> Theodore
>>> 
>>> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dl...@gmail.com>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> probably more of a question for Till:
>>>> 
>>>> Imagine a common ML algorithm flow that runs until convergence.
>>>> 
>>>> typical distributed flow would be something like that (e.g. GMM EM
>> would
>>> be
>>>> exactly like that):
>>>> 
>>>> A: input
>>>> 
>>>> do {
>>>> 
>>>>   stat1 = A.map.reduce
>>>>   A = A.update-map(stat1)
>>>>   conv = A.map.reduce
>>>> } until conv > convThreshold
>>>> 
>>>> There probably could be 1 map-reduce step originating on A to compute
>>> both
>>>> convergence criteria statistics and udpate statistics in one step. not
>>> the
>>>> point.
>>>> 
>>>> The point is that update and map.reduce originate on the same dataset
>>>> intermittently.
>>>> 
>>>> In spark we would normally commit A to a object tree cache so that data
>>> is
>>>> available to subsequent map passes without any I/O or serialization
>>>> operations, thus insuring high rate of iterations.
>>>> 
>>>> We observe the same pattern pretty much everywhere. clustering,
>>>> probabilistic algorithms, even batch gradient descent of quasi newton
>>>> algorithms fitting.
>>>> 
>>>> How do we do something like that, for example, in FlinkML?
>>>> 
>>>> Thoughts?
>>>> 
>>>> thanks.
>>>> 
>>>> -Dmitriy
>>>> 
>>> 
>> 


Re: a typical ML algorithm flow

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
Thank you, all :)

yes, that's my question. How do we construct such a loop with a concrete
example?

Let's take something nonsensical yet specific.

Say, in samsara terms we do something like that :

var avg = Double.PositiveInfinity
var drmA = ... (construct elsewhere)



do {
   avg = drmA.colMeans.mean // average of col-wise means
   drmA = drmA - avg // elementwise subtract of average

} while (avg > 1e-10)

(which probably does not converge in reality).

How would we implement that with native iterations in flink?



On Wed, Mar 23, 2016 at 2:50 AM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Dmitriy,
>
> I’m not sure whether I’ve understood your question correctly, so please
> correct me if I’m wrong.
>
> So you’re asking whether it is a problem that
>
> stat1 = A.map.reduce
> A = A.update.map(stat1)
>
> are executed on the same input data set A and whether we have to cache A
> for that, right? I assume you’re worried that A is calculated twice.
>
> Since you don’t have a API call which triggers eager execution of the data
> flow, the map.reduce and map(stat1) call will only construct the data flow
> of your program. Both operators will depend on the result of A which is
> only once calculated (when execute, collect or count is called) and then
> sent to the map.reduce and map(stat1) operator.
>
> However, it is not recommended using an explicit loop to do iterative
> computations with Flink. The problem here is that you will basically unroll
> the loop and construct a long pipeline with the operations of each
> iterations. Once you execute this long pipeline you will face considerable
> memory fragmentation, because every operator will get a proportional
> fraction of the available memory assigned. Even worse, if you trigger the
> execution of your data flow to evaluate the convergence criterion, you will
> execute for each iteration the complete pipeline which has been built up so
> far. Thus, you’ll end up with a quadratic complexity in the number of
> iterations. Therefore, I would highly recommend using Flink’s built in
> support for native iterations which won’t suffer from this problem or to
> materialize at least for every n iterations the intermediate result. At the
> moment this would mean to write the data to some sink and then reading it
> from there again.
>
> I hope this answers your question. If not, then don’t hesitate to ask me
> again.
>
> Cheers,
> Till
> ​
>
> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
> > Hello Dmitriy,
> >
> > If I understood correctly what you are basically talking about modifying
> a
> > DataSet as you iterate over it.
> >
> > AFAIK this is currently not possible in Flink, and indeed it's a real
> > bottleneck for ML algorithms. This is the reason our current
> > SGD implementation does a pass over the whole dataset at each iteration,
> > since we cannot take a sample from the dataset
> > and iterate only over that (so it's not really stochastic).
> >
> > The relevant JIRA is here:
> > https://issues.apache.org/jira/browse/FLINK-2396
> >
> > I would love to start a discussion on how we can proceed to fix this.
> >
> > Regards,
> > Theodore
> >
> > On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dl...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > probably more of a question for Till:
> > >
> > > Imagine a common ML algorithm flow that runs until convergence.
> > >
> > > typical distributed flow would be something like that (e.g. GMM EM
> would
> > be
> > > exactly like that):
> > >
> > > A: input
> > >
> > > do {
> > >
> > >    stat1 = A.map.reduce
> > >    A = A.update-map(stat1)
> > >    conv = A.map.reduce
> > > } until conv > convThreshold
> > >
> > > There probably could be 1 map-reduce step originating on A to compute
> > both
> > > convergence criteria statistics and udpate statistics in one step. not
> > the
> > > point.
> > >
> > > The point is that update and map.reduce originate on the same dataset
> > > intermittently.
> > >
> > > In spark we would normally commit A to a object tree cache so that data
> > is
> > > available to subsequent map passes without any I/O or serialization
> > > operations, thus insuring high rate of iterations.
> > >
> > > We observe the same pattern pretty much everywhere. clustering,
> > > probabilistic algorithms, even batch gradient descent of quasi newton
> > > algorithms fitting.
> > >
> > > How do we do something like that, for example, in FlinkML?
> > >
> > > Thoughts?
> > >
> > > thanks.
> > >
> > > -Dmitriy
> > >
> >
>

Re: a typical ML algorithm flow

Posted by Theodore Vasiloudis <th...@gmail.com>.
Just realized what I wrote is wrong and probably doesn't apply here.

The problem I described relates to modifying a *secondary* dataset
as you iterate over a primary one.

Taking SGD as an example, you would iterate over a weights dataset,
modifying  it using the native Flink iterations that Till talked about.
The problem comes from the fact that we need at every iteration to take
a different sample from *another* dataset (which is our training data),
in a sense modifying it as well at every iteration; *that *is not currently
possible AFAIK.

On Wed, Mar 23, 2016 at 10:50 AM, Till Rohrmann <tr...@apache.org>
wrote:

> Hi Dmitriy,
>
> I’m not sure whether I’ve understood your question correctly, so please
> correct me if I’m wrong.
>
> So you’re asking whether it is a problem that
>
> stat1 = A.map.reduce
> A = A.update.map(stat1)
>
> are executed on the same input data set A and whether we have to cache A
> for that, right? I assume you’re worried that A is calculated twice.
>
> Since you don’t have a API call which triggers eager execution of the data
> flow, the map.reduce and map(stat1) call will only construct the data flow
> of your program. Both operators will depend on the result of A which is
> only once calculated (when execute, collect or count is called) and then
> sent to the map.reduce and map(stat1) operator.
>
> However, it is not recommended using an explicit loop to do iterative
> computations with Flink. The problem here is that you will basically unroll
> the loop and construct a long pipeline with the operations of each
> iterations. Once you execute this long pipeline you will face considerable
> memory fragmentation, because every operator will get a proportional
> fraction of the available memory assigned. Even worse, if you trigger the
> execution of your data flow to evaluate the convergence criterion, you will
> execute for each iteration the complete pipeline which has been built up so
> far. Thus, you’ll end up with a quadratic complexity in the number of
> iterations. Therefore, I would highly recommend using Flink’s built in
> support for native iterations which won’t suffer from this problem or to
> materialize at least for every n iterations the intermediate result. At the
> moment this would mean to write the data to some sink and then reading it
> from there again.
>
> I hope this answers your question. If not, then don’t hesitate to ask me
> again.
>
> Cheers,
> Till
> ​
>
> On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
> > Hello Dmitriy,
> >
> > If I understood correctly what you are basically talking about modifying
> a
> > DataSet as you iterate over it.
> >
> > AFAIK this is currently not possible in Flink, and indeed it's a real
> > bottleneck for ML algorithms. This is the reason our current
> > SGD implementation does a pass over the whole dataset at each iteration,
> > since we cannot take a sample from the dataset
> > and iterate only over that (so it's not really stochastic).
> >
> > The relevant JIRA is here:
> > https://issues.apache.org/jira/browse/FLINK-2396
> >
> > I would love to start a discussion on how we can proceed to fix this.
> >
> > Regards,
> > Theodore
> >
> > On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dl...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > probably more of a question for Till:
> > >
> > > Imagine a common ML algorithm flow that runs until convergence.
> > >
> > > typical distributed flow would be something like that (e.g. GMM EM
> would
> > be
> > > exactly like that):
> > >
> > > A: input
> > >
> > > do {
> > >
> > >    stat1 = A.map.reduce
> > >    A = A.update-map(stat1)
> > >    conv = A.map.reduce
> > > } until conv > convThreshold
> > >
> > > There probably could be 1 map-reduce step originating on A to compute
> > both
> > > convergence criteria statistics and udpate statistics in one step. not
> > the
> > > point.
> > >
> > > The point is that update and map.reduce originate on the same dataset
> > > intermittently.
> > >
> > > In spark we would normally commit A to a object tree cache so that data
> > is
> > > available to subsequent map passes without any I/O or serialization
> > > operations, thus insuring high rate of iterations.
> > >
> > > We observe the same pattern pretty much everywhere. clustering,
> > > probabilistic algorithms, even batch gradient descent of quasi newton
> > > algorithms fitting.
> > >
> > > How do we do something like that, for example, in FlinkML?
> > >
> > > Thoughts?
> > >
> > > thanks.
> > >
> > > -Dmitriy
> > >
> >
>

Re: a typical ML algorithm flow

Posted by Till Rohrmann <tr...@apache.org>.
Hi Dmitriy,

I’m not sure whether I’ve understood your question correctly, so please
correct me if I’m wrong.

So you’re asking whether it is a problem that

stat1 = A.map.reduce
A = A.update.map(stat1)

are executed on the same input data set A and whether we have to cache A
for that, right? I assume you’re worried that A is calculated twice.

Since you don’t have a API call which triggers eager execution of the data
flow, the map.reduce and map(stat1) call will only construct the data flow
of your program. Both operators will depend on the result of A which is
only once calculated (when execute, collect or count is called) and then
sent to the map.reduce and map(stat1) operator.

However, it is not recommended using an explicit loop to do iterative
computations with Flink. The problem here is that you will basically unroll
the loop and construct a long pipeline with the operations of each
iterations. Once you execute this long pipeline you will face considerable
memory fragmentation, because every operator will get a proportional
fraction of the available memory assigned. Even worse, if you trigger the
execution of your data flow to evaluate the convergence criterion, you will
execute for each iteration the complete pipeline which has been built up so
far. Thus, you’ll end up with a quadratic complexity in the number of
iterations. Therefore, I would highly recommend using Flink’s built in
support for native iterations which won’t suffer from this problem or to
materialize at least for every n iterations the intermediate result. At the
moment this would mean to write the data to some sink and then reading it
from there again.

I hope this answers your question. If not, then don’t hesitate to ask me
again.

Cheers,
Till
​

On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> Hello Dmitriy,
>
> If I understood correctly what you are basically talking about modifying a
> DataSet as you iterate over it.
>
> AFAIK this is currently not possible in Flink, and indeed it's a real
> bottleneck for ML algorithms. This is the reason our current
> SGD implementation does a pass over the whole dataset at each iteration,
> since we cannot take a sample from the dataset
> and iterate only over that (so it's not really stochastic).
>
> The relevant JIRA is here:
> https://issues.apache.org/jira/browse/FLINK-2396
>
> I would love to start a discussion on how we can proceed to fix this.
>
> Regards,
> Theodore
>
> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dl...@gmail.com>
> wrote:
>
> > Hi,
> >
> > probably more of a question for Till:
> >
> > Imagine a common ML algorithm flow that runs until convergence.
> >
> > typical distributed flow would be something like that (e.g. GMM EM would
> be
> > exactly like that):
> >
> > A: input
> >
> > do {
> >
> >    stat1 = A.map.reduce
> >    A = A.update-map(stat1)
> >    conv = A.map.reduce
> > } until conv > convThreshold
> >
> > There probably could be 1 map-reduce step originating on A to compute
> both
> > convergence criteria statistics and udpate statistics in one step. not
> the
> > point.
> >
> > The point is that update and map.reduce originate on the same dataset
> > intermittently.
> >
> > In spark we would normally commit A to a object tree cache so that data
> is
> > available to subsequent map passes without any I/O or serialization
> > operations, thus insuring high rate of iterations.
> >
> > We observe the same pattern pretty much everywhere. clustering,
> > probabilistic algorithms, even batch gradient descent of quasi newton
> > algorithms fitting.
> >
> > How do we do something like that, for example, in FlinkML?
> >
> > Thoughts?
> >
> > thanks.
> >
> > -Dmitriy
> >
>

Re: a typical ML algorithm flow

Posted by Theodore Vasiloudis <th...@gmail.com>.
Hello Dmitriy,

If I understood correctly what you are basically talking about modifying a
DataSet as you iterate over it.

AFAIK this is currently not possible in Flink, and indeed it's a real
bottleneck for ML algorithms. This is the reason our current
SGD implementation does a pass over the whole dataset at each iteration,
since we cannot take a sample from the dataset
and iterate only over that (so it's not really stochastic).

The relevant JIRA is here: https://issues.apache.org/jira/browse/FLINK-2396

I would love to start a discussion on how we can proceed to fix this.

Regards,
Theodore

On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> Hi,
>
> probably more of a question for Till:
>
> Imagine a common ML algorithm flow that runs until convergence.
>
> typical distributed flow would be something like that (e.g. GMM EM would be
> exactly like that):
>
> A: input
>
> do {
>
>    stat1 = A.map.reduce
>    A = A.update-map(stat1)
>    conv = A.map.reduce
> } until conv > convThreshold
>
> There probably could be 1 map-reduce step originating on A to compute both
> convergence criteria statistics and udpate statistics in one step. not the
> point.
>
> The point is that update and map.reduce originate on the same dataset
> intermittently.
>
> In spark we would normally commit A to a object tree cache so that data is
> available to subsequent map passes without any I/O or serialization
> operations, thus insuring high rate of iterations.
>
> We observe the same pattern pretty much everywhere. clustering,
> probabilistic algorithms, even batch gradient descent of quasi newton
> algorithms fitting.
>
> How do we do something like that, for example, in FlinkML?
>
> Thoughts?
>
> thanks.
>
> -Dmitriy
>