You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Drewes <dr...@campus.tu-berlin.de> on 2016/09/02 10:16:02 UTC

Flink Iterations vs. While loop

Hi,

for my bachelor thesis I'm testing an implementation of L-BFGS algorithm 
with Flink Iterations against a version without Flink Iterations but a 
casual while loop instead. Both programs use the same Map and Reduce 
transformations in each iteration. It was expected, that the performance 
of the Flink Iterations would scale better with increasing size of the 
input data set. However, the measured results on an ibm-power-cluster 
are very similar for both versions, e.g. around 30 minutes for 200 GB 
data. The cluster has 8 nodes, was configured with 4 slots per node and 
I used a total parallelism of 32.
In every Iteration of the while loop a new flink job is started and I 
thought, that also the data would be distributed over the network again 
in each iteration which should consume a significant and measurable 
amount of time. Is that thought wrong or what is the computional 
overhead of the flink iterations that is equalizing this disadvantage?
I include the relevant part of both programs and also attach the 
generated execution plans.
Thank you for any ideas as I could not find much about this issue in the 
flink docs.

Best, Dan

*Flink Iterations:*

DataSet<double[]> data = ...

State  state =initialState(m, initweights,0,new double[initweights.length]);
DataSet<State> statedataset = env.fromElements(state);

//start of iteration section IterativeDataSet<State> loop= statedataset.iterate(niter);;


DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop,"state")
               .reduce(accumulate)
               .map(new NormLossGradient(datasize))
               .map(new SetLossGradient()).withBroadcastSet(loop,"state")
               .map(new LBFGS());


DataSet<State> converged = statewithnewlossgradient.filter(
    new FilterFunction<State>() {
       @Override public boolean filter(State  value)throws Exception {
          if(value.getIflag()[0] ==0){
             return false;
          }
          return true;
       }
    }
);

DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);

*
**While loop:

*

DataSet<double[]> data =... State  state =initialState(m, initweights,0,new double[initweights.length]);

int cnt=0;
do{
    LBFGS lbfgs =new LBFGS();
    statedataset=data.map(difffunction).withBroadcastSet(statedataset,"state")
       .reduce(accumulate)
       .map(new NormLossGradient(datasize))
       .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
       .map(lbfgs);
    cnt++;
}while (cnt<niter && statedataset.collect().get(0).getIflag()[0] !=0);



Re: Flink Iterations vs. While loop

Posted by Dan Drewes <dr...@campus.tu-berlin.de>.
Hi,

I am not broadcasting the data but the model, i.e. the weight vector 
contained in the "State".

You are right, it would be better for the implementation with the while 
loop to have the data on HDFS. But that's exactly the point of my 
question: Why are the Flink Iterations not faster if you don't have the 
data directly available to the workers by HDFS?

-Dan


Am 05.09.2016 um 16:10 schrieb Theodore Vasiloudis:
> Hello Dan,
>
> are you broadcasting the 85GB of data then? I don't get why you 
> wouldn't store that file on HDFS so it's accessible by your workers.
>
>
> If you have the full code available somewhere we might be able to help 
> better.
>
> For L-BFGS you should only be broadcasting the model (i.e. the weight 
> vector), and yes that would happen at each iteration, since you are 
> updating the model at each iteration.
>
> On Fri, Sep 2, 2016 at 5:30 PM, Dan Drewes <drewes@campus.tu-berlin.de 
> <ma...@campus.tu-berlin.de>> wrote:
>
>     Hi Greg,
>
>     thanks for your response!
>
>     I just had a look and realized that it's just about 85 GB of data.
>     Sorry about that wrong information.
>
>     It's read from a csv file on the master node's local file system.
>     The 8 nodes have more than 40 GB available memory each and since
>     the data is equally distributed I assume there should be no need
>     to spill anything on disk.
>
>     There are 9 iterations.
>
>     Is it possible that also with Flink Iterations the data is
>     repeatedly distributed? Or the other way around: Might it be that
>     flink "remembers" somehow that the data is already distributed
>     even for the while loop?
>
>     -Dan
>
>
>
>     Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>>     Hi Dan,
>>
>>     Where are you reading the 200 GB "data" from? How much memory per
>>     node? If the DataSet is read from a distributed filesystem and if
>>     with iterations Flink must spill to disk then I wouldn't expect
>>     much difference. About how many iterations are run in the 30
>>     minutes? I don't know that this is reported explicitly, but if
>>     your convergence function only has one input record per iteration
>>     then the reported total is the iteration count.
>>
>>     One other thought, we should soon have support for object reuse
>>     with arrays (FLINK-3695). This would be implemented as
>>     DoubleValueArray or ValueArray<DoubleValue> rather than double[]
>>     but it would be interesting to test for a change in performance.
>>
>>     Greg
>>
>>     On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes
>>     <drewes@campus.tu-berlin.de <ma...@campus.tu-berlin.de>>
>>     wrote:
>>
>>         Hi,
>>
>>         for my bachelor thesis I'm testing an implementation of
>>         L-BFGS algorithm with Flink Iterations against a version
>>         without Flink Iterations but a casual while loop instead.
>>         Both programs use the same Map and Reduce transformations in
>>         each iteration. It was expected, that the performance of the
>>         Flink Iterations would scale better with increasing size of
>>         the input data set. However, the measured results on an
>>         ibm-power-cluster are very similar for both versions, e.g.
>>         around 30 minutes for 200 GB data. The cluster has 8 nodes,
>>         was configured with 4 slots per node and I used a total
>>         parallelism of 32.
>>         In every Iteration of the while loop a new flink job is
>>         started and I thought, that also the data would be
>>         distributed over the network again in each iteration which
>>         should consume a significant and measurable amount of time.
>>         Is that thought wrong or what is the computional overhead of
>>         the flink iterations that is equalizing this disadvantage?
>>         I include the relevant part of both programs and also attach
>>         the generated execution plans.
>>         Thank you for any ideas as I could not find much about this
>>         issue in the flink docs.
>>
>>         Best, Dan
>>
>>         *Flink Iterations:*
>>
>>         DataSet<double[]> data = ...
>>
>>         State  state =initialState(m, initweights,0,new double[initweights.length]);
>>         DataSet<State> statedataset = env.fromElements(state);
>>
>>         //start of iteration section IterativeDataSet<State> loop= statedataset.iterate(niter);;
>>
>>
>>         DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop,"state")
>>                        .reduce(accumulate)
>>                        .map(new NormLossGradient(datasize))
>>                        .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>                        .map(new LBFGS());
>>
>>
>>         DataSet<State> converged = statewithnewlossgradient.filter(
>>             new FilterFunction<State>() {
>>                @Override public boolean filter(State  value)throws Exception {
>>                   if(value.getIflag()[0] ==0){
>>                      return false;
>>                   }
>>                   return true;
>>                }
>>             }
>>         );
>>
>>         DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);
>>
>>         ***While loop: *
>>
>>         DataSet<double[]> data =... State  state =initialState(m, initweights,0,new double[initweights.length]);
>>
>>         int cnt=0;
>>         do{
>>             LBFGS lbfgs =new LBFGS();
>>             statedataset=data.map(difffunction).withBroadcastSet(statedataset,"state")
>>                .reduce(accumulate)
>>                .map(new NormLossGradient(datasize))
>>                .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>                .map(lbfgs);
>>             cnt++;
>>         }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] !=0);
>>


---
Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
https://www.avast.com/antivirus

Re: Flink Iterations vs. While loop

Posted by Theodore Vasiloudis <th...@gmail.com>.
Hello Dan,

are you broadcasting the 85GB of data then? I don't get why you wouldn't
store that file on HDFS so it's accessible by your workers.


If you have the full code available somewhere we might be able to help
better.

For L-BFGS you should only be broadcasting the model (i.e. the weight
vector), and yes that would happen at each iteration, since you are
updating the model at each iteration.

On Fri, Sep 2, 2016 at 5:30 PM, Dan Drewes <dr...@campus.tu-berlin.de>
wrote:

> Hi Greg,
>
> thanks for your response!
>
> I just had a look and realized that it's just about 85 GB of data. Sorry
> about that wrong information.
>
> It's read from a csv file on the master node's local file system. The 8
> nodes have more than 40 GB available memory each and since the data is
> equally distributed I assume there should be no need to spill anything on
> disk.
>
> There are 9 iterations.
>
> Is it possible that also with Flink Iterations the data is repeatedly
> distributed? Or the other way around: Might it be that flink "remembers"
> somehow that the data is already distributed even for the while loop?
>
> -Dan
>
>
>
> Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>
> Hi Dan,
>
> Where are you reading the 200 GB "data" from? How much memory per node? If
> the DataSet is read from a distributed filesystem and if with iterations
> Flink must spill to disk then I wouldn't expect much difference. About how
> many iterations are run in the 30 minutes? I don't know that this is
> reported explicitly, but if your convergence function only has one input
> record per iteration then the reported total is the iteration count.
>
> One other thought, we should soon have support for object reuse with
> arrays (FLINK-3695). This would be implemented as DoubleValueArray or
> ValueArray<DoubleValue> rather than double[] but it would be interesting to
> test for a change in performance.
>
> Greg
>
> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dr...@campus.tu-berlin.de>
> wrote:
>
>> Hi,
>>
>> for my bachelor thesis I'm testing an implementation of L-BFGS algorithm
>> with Flink Iterations against a version without Flink Iterations but a
>> casual while loop instead. Both programs use the same Map and Reduce
>> transformations in each iteration. It was expected, that the performance of
>> the Flink Iterations would scale better with increasing size of the input
>> data set. However, the measured results on an ibm-power-cluster are very
>> similar for both versions, e.g. around 30 minutes for 200 GB data. The
>> cluster has 8 nodes, was configured with 4 slots per node and I used a
>> total parallelism of 32.
>> In every Iteration of the while loop a new flink job is started and I
>> thought, that also the data would be distributed over the network again in
>> each iteration which should consume a significant and measurable amount of
>> time. Is that thought wrong or what is the computional overhead of the
>> flink iterations that is equalizing this disadvantage?
>> I include the relevant part of both programs and also attach the
>> generated execution plans.
>> Thank you for any ideas as I could not find much about this issue in the
>> flink docs.
>>
>> Best, Dan
>>
>> *Flink Iterations:*
>>
>> DataSet<double[]> data = ...
>>
>> State state = initialState(m, initweights,0,new double[initweights.length]);
>> DataSet<State> statedataset = env.fromElements(state);
>> //start of iteration sectionIterativeDataSet<State> loop= statedataset.iterate(niter);;
>>
>>
>> DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop, "state")
>>               .reduce(accumulate)
>>               .map(new NormLossGradient(datasize))
>>               .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>               .map(new LBFGS());
>>
>>
>> DataSet<State> converged = statewithnewlossgradient.filter(
>>    new FilterFunction<State>() {
>>       @Override      public boolean filter(State value) throws Exception {
>>          if(value.getIflag()[0] == 0){
>>             return false;
>>          }
>>          return true;
>>       }
>>    }
>> );
>>
>> DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);
>>
>> *While loop: *
>>
>> DataSet<double[]> data =...
>> State state = initialState(m, initweights,0,new double[initweights.length]);
>> int cnt=0;do{
>>    LBFGS lbfgs = new LBFGS();
>>    statedataset=data.map(difffunction).withBroadcastSet(statedataset, "state")
>>       .reduce(accumulate)
>>       .map(new NormLossGradient(datasize))
>>       .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>       .map(lbfgs);
>>    cnt++;
>> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0);
>>
>>

Re: Flink Iterations vs. While loop

Posted by "Drewes, Dan Benedikt" <dr...@campus.tu-berlin.de>.
Hi Till,

you're right, my implementation wouldn't scale well for a very large number of features.Thank you for that hint! However, i'm not using that much features, so this shouldn't be the cause for the strange behaviour.


Yes, the 30 Minutes is the time for all jobs together. It's the time displayed in the web dashboard and I also thought about this beeing only the time for the last iteration, but then again it matches the time from starting the program until it quits. I.e. all iterations/jobs must be included in this time.


-Dan


Am 07.09.2016 um 17:25 schrieb Till Rohrmann:
Hi Dan,

first a general remark: I fear that your L-BFGS implementation is not well suited for large scale problems. You might wanna take a look at [1].

In the case of the while loop solution you're actually executing n jobs with n being the number of iterations. Thus, you have to add the execution times for all jobs together. Did you do that?

[1] https://papers.nips.cc/paper/5333-large-scale-l-bfgs-using-mapreduce.pdf

On Wed, Sep 7, 2016 at 3:43 PM, Dan Drewes <dr...@campus.tu-berlin.de>> wrote:

Thank you for your replys so far!

I've uploaded the files to github:

iterations: https://github.com/dan-drewes/thesis/blob/master/IterationsLBFGS.java

while loop: https://github.com/dan-drewes/thesis/blob/master/flinkLBFGS.java

The additional classes I wrote are there, too.

I had 32 task slots in total, that's 4 on each node, so their should actually be enough memory.

I don't know if there is a better way of profiling but I have the timelines for both versions attached with this post. As far as I can see it, there is not much difference in reading the data. However, maybe you can see anything i didn't.

Thanks again for your help,

Dan



Am 07.09.2016 um 10:23 schrieb Till Rohrmann:
Usually, the while loop solution should perform much worse since it will execute with each new iteration all previous iterations steps without persisting the intermediate results. Thus, it should have a quadratic complexity in terms of iteration step operations instead of a linear complexity. Additionally the while loop will suffer from memory fragmentation because of the explicit DAG unrolling.

I agree with Theo that access to the full code would help a lot to pinpoint the problem.

Cheers,
Till

On Tue, Sep 6, 2016 at 6:50 PM, Theodore Vasiloudis <th...@gmail.com>> wrote:

Have you tried profiling the application to see where most of the time is spent during the runs?

If most of the time is spent reading in the data maybe any difference between the two methods is being obscured.

--
Sent from a mobile device. May contain autocorrect errors.

On Sep 6, 2016 4:55 PM, "Greg Hogan" <co...@greghogan.com>> wrote:
Hi Dan,

Flink currently allocates each task slot an equal portion of managed memory. I don't know the best way to count task slots.
  https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html#workers-slots-resources

If you assign TaskManagers less memory then Linux will use the memory to cache spill files.

Greg

On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes <dr...@campus.tu-berlin.de>> wrote:

Hi Greg,

thanks for your response!

I just had a look and realized that it's just about 85 GB of data. Sorry about that wrong information.

It's read from a csv file on the master node's local file system. The 8 nodes have more than 40 GB available memory each and since the data is equally distributed I assume there should be no need to spill anything on disk.

There are 9 iterations.

Is it possible that also with Flink Iterations the data is repeatedly distributed? Or the other way around: Might it be that flink "remembers" somehow that the data is already distributed even for the while loop?

-Dan


Am 02.09.2016 um 16:39 schrieb Greg Hogan:
Hi Dan,

Where are you reading the 200 GB "data" from? How much memory per node? If the DataSet is read from a distributed filesystem and if with iterations Flink must spill to disk then I wouldn't expect much difference. About how many iterations are run in the 30 minutes? I don't know that this is reported explicitly, but if your convergence function only has one input record per iteration then the reported total is the iteration count.

One other thought, we should soon have support for object reuse with arrays (FLINK-3695). This would be implemented as DoubleValueArray or ValueArray<DoubleValue> rather than double[] but it would be interesting to test for a change in performance.

Greg

On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dr...@campus.tu-berlin.de>> wrote:
Hi,

for my bachelor thesis I'm testing an implementation of L-BFGS algorithm with Flink Iterations against a version without Flink Iterations but a casual while loop instead. Both programs use the same Map and Reduce transformations in each iteration. It was expected, that the performance of the Flink Iterations would scale better with increasing size of the input data set. However, the measured results on an ibm-power-cluster are very similar for both versions, e.g. around 30 minutes for 200 GB data. The cluster has 8 nodes, was configured with 4 slots per node and I used a total parallelism of 32.
In every Iteration of the while loop a new flink job is started and I thought, that also the data would be distributed over the network again in each iteration which should consume a significant and measurable amount of time. Is that thought wrong or what is the computional overhead of the flink iterations that is equalizing this disadvantage?
I include the relevant part of both programs and also attach the generated execution plans.
Thank you for any ideas as I could not find much about this issue in the flink docs.

Best, Dan

Flink Iterations:


DataSet<double[]> data = ...


State state = initialState(m, initweights,0,new double[initweights.length]);
DataSet<State> statedataset = env.fromElements(state);

//start of iteration section

IterativeDataSet<State> loop= statedataset.iterate(niter);;


DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop, "state")
              .reduce(accumulate)
              .map(new NormLossGradient(datasize))
              .map(new SetLossGradient()).withBroadcastSet(loop,"state")
              .map(new LBFGS());


DataSet<State> converged = statewithnewlossgradient.filter(
   new FilterFunction<State>() {
      @Override
      public boolean filter(State value) throws Exception {
         if(value.getIflag()[0] == 0){
            return false;
         }
         return true;
      }
   }
);

DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);

While loop:

DataSet<double[]> data =...
State state = initialState(m, initweights,0,new double[initweights.length]);

int cnt=0;
do{
   LBFGS lbfgs = new LBFGS();
   statedataset=data.map(difffunction).withBroadcastSet(statedataset, "state")
      .reduce(accumulate)
      .map(new NormLossGradient(datasize))
      .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
      .map(lbfgs);
   cnt++;
}while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0);

[https://ipmcdn.avast.com/images/icons/icon-envelope-tick-round-orange-animated-tick-v1.gif]<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>   Virenfrei. www.avast.com<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>

Re: Flink Iterations vs. While loop

Posted by Till Rohrmann <tr...@apache.org>.
Hi Dan,

first a general remark: I fear that your L-BFGS implementation is not well
suited for large scale problems. You might wanna take a look at [1].

In the case of the while loop solution you're actually executing n jobs
with n being the number of iterations. Thus, you have to add the execution
times for all jobs together. Did you do that?

[1] https://papers.nips.cc/paper/5333-large-scale-l-bfgs-using-mapreduce.pdf

On Wed, Sep 7, 2016 at 3:43 PM, Dan Drewes <dr...@campus.tu-berlin.de>
wrote:

> Thank you for your replys so far!
>
> I've uploaded the files to github:
>
> iterations: https://github.com/dan-drewes/thesis/blob/master/
> IterationsLBFGS.java
>
> while loop: https://github.com/dan-drewes/thesis/blob/master/flinkLBFGS.
> java
>
> The additional classes I wrote are there, too.
>
> I had 32 task slots in total, that's 4 on each node, so their should
> actually be enough memory.
>
> I don't know if there is a better way of profiling but I have the
> timelines for both versions attached with this post. As far as I can see
> it, there is not much difference in reading the data. However, maybe you
> can see anything i didn't.
>
> Thanks again for your help,
> Dan
>
>
>
> Am 07.09.2016 um 10:23 schrieb Till Rohrmann:
>
> Usually, the while loop solution should perform much worse since it will
> execute with each new iteration all previous iterations steps without
> persisting the intermediate results. Thus, it should have a quadratic
> complexity in terms of iteration step operations instead of a linear
> complexity. Additionally the while loop will suffer from memory
> fragmentation because of the explicit DAG unrolling.
>
> I agree with Theo that access to the full code would help a lot to
> pinpoint the problem.
>
> Cheers,
> Till
>
> On Tue, Sep 6, 2016 at 6:50 PM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
>> Have you tried profiling the application to see where most of the time is
>> spent during the runs?
>>
>> If most of the time is spent reading in the data maybe any difference
>> between the two methods is being obscured.
>>
>> --
>> Sent from a mobile device. May contain autocorrect errors.
>>
>> On Sep 6, 2016 4:55 PM, "Greg Hogan" <co...@greghogan.com> wrote:
>>
>>> Hi Dan,
>>>
>>> Flink currently allocates each task slot an equal portion of managed
>>> memory. I don't know the best way to count task slots.
>>>   https://ci.apache.org/projects/flink/flink-docs-master/conce
>>> pts/index.html#workers-slots-resources
>>>
>>> If you assign TaskManagers less memory then Linux will use the memory to
>>> cache spill files.
>>>
>>> Greg
>>>
>>> On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes <dr...@campus.tu-berlin.de>
>>> wrote:
>>>
>>>> Hi Greg,
>>>>
>>>> thanks for your response!
>>>>
>>>> I just had a look and realized that it's just about 85 GB of data.
>>>> Sorry about that wrong information.
>>>>
>>>> It's read from a csv file on the master node's local file system. The 8
>>>> nodes have more than 40 GB available memory each and since the data is
>>>> equally distributed I assume there should be no need to spill anything on
>>>> disk.
>>>>
>>>> There are 9 iterations.
>>>>
>>>> Is it possible that also with Flink Iterations the data is repeatedly
>>>> distributed? Or the other way around: Might it be that flink "remembers"
>>>> somehow that the data is already distributed even for the while loop?
>>>>
>>>> -Dan
>>>>
>>>>
>>>>
>>>> Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>>>>
>>>> Hi Dan,
>>>>
>>>> Where are you reading the 200 GB "data" from? How much memory per node?
>>>> If the DataSet is read from a distributed filesystem and if with iterations
>>>> Flink must spill to disk then I wouldn't expect much difference. About how
>>>> many iterations are run in the 30 minutes? I don't know that this is
>>>> reported explicitly, but if your convergence function only has one input
>>>> record per iteration then the reported total is the iteration count.
>>>>
>>>> One other thought, we should soon have support for object reuse with
>>>> arrays (FLINK-3695). This would be implemented as DoubleValueArray or
>>>> ValueArray<DoubleValue> rather than double[] but it would be interesting to
>>>> test for a change in performance.
>>>>
>>>> Greg
>>>>
>>>> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dr...@campus.tu-berlin.de>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> for my bachelor thesis I'm testing an implementation of L-BFGS
>>>>> algorithm with Flink Iterations against a version without Flink Iterations
>>>>> but a casual while loop instead. Both programs use the same Map and Reduce
>>>>> transformations in each iteration. It was expected, that the performance of
>>>>> the Flink Iterations would scale better with increasing size of the input
>>>>> data set. However, the measured results on an ibm-power-cluster are very
>>>>> similar for both versions, e.g. around 30 minutes for 200 GB data. The
>>>>> cluster has 8 nodes, was configured with 4 slots per node and I used a
>>>>> total parallelism of 32.
>>>>> In every Iteration of the while loop a new flink job is started and I
>>>>> thought, that also the data would be distributed over the network again in
>>>>> each iteration which should consume a significant and measurable amount of
>>>>> time. Is that thought wrong or what is the computional overhead of the
>>>>> flink iterations that is equalizing this disadvantage?
>>>>> I include the relevant part of both programs and also attach the
>>>>> generated execution plans.
>>>>> Thank you for any ideas as I could not find much about this issue in
>>>>> the flink docs.
>>>>>
>>>>> Best, Dan
>>>>>
>>>>> *Flink Iterations:*
>>>>>
>>>>> DataSet<double[]> data = ...
>>>>>
>>>>> State state = initialState(m, initweights,0,new double[initweights.length]);
>>>>> DataSet<State> statedataset = env.fromElements(state);
>>>>> //start of iteration sectionIterativeDataSet<State> loop= statedataset.iterate(niter);;
>>>>>
>>>>>
>>>>> DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop, "state")
>>>>>               .reduce(accumulate)
>>>>>               .map(new NormLossGradient(datasize))
>>>>>               .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>>>>               .map(new LBFGS());
>>>>>
>>>>>
>>>>> DataSet<State> converged = statewithnewlossgradient.filter(
>>>>>    new FilterFunction<State>() {
>>>>>       @Override      public boolean filter(State value) throws Exception {
>>>>>          if(value.getIflag()[0] == 0){
>>>>>             return false;
>>>>>          }
>>>>>          return true;
>>>>>       }
>>>>>    }
>>>>> );
>>>>>
>>>>> DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);
>>>>>
>>>>> *While loop: *
>>>>>
>>>>> DataSet<double[]> data =...
>>>>> State state = initialState(m, initweights,0,new double[initweights.length]);
>>>>> int cnt=0;do{
>>>>>    LBFGS lbfgs = new LBFGS();
>>>>>    statedataset=data.map(difffunction).withBroadcastSet(statedataset, "state")
>>>>>       .reduce(accumulate)
>>>>>       .map(new NormLossGradient(datasize))
>>>>>       .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>>>>       .map(lbfgs);
>>>>>    cnt++;
>>>>> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0);
>>>>>
>>>>>
>
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient> Virenfrei.
> www.avast.com
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>
>

Re: Flink Iterations vs. While loop

Posted by Dan Drewes <dr...@campus.tu-berlin.de>.
Thank you for your replys so far!

I've uploaded the files to github:

iterations: 
https://github.com/dan-drewes/thesis/blob/master/IterationsLBFGS.java

while loop: https://github.com/dan-drewes/thesis/blob/master/flinkLBFGS.java

The additional classes I wrote are there, too.

I had 32 task slots in total, that's 4 on each node, so their should 
actually be enough memory.

I don't know if there is a better way of profiling but I have the 
timelines for both versions attached with this post. As far as I can see 
it, there is not much difference in reading the data. However, maybe you 
can see anything i didn't.

Thanks again for your help,

Dan


Am 07.09.2016 um 10:23 schrieb Till Rohrmann:
> Usually, the while loop solution should perform much worse since it 
> will execute with each new iteration all previous iterations steps 
> without persisting the intermediate results. Thus, it should have a 
> quadratic complexity in terms of iteration step operations instead of 
> a linear complexity. Additionally the while loop will suffer from 
> memory fragmentation because of the explicit DAG unrolling.
>
> I agree with Theo that access to the full code would help a lot to 
> pinpoint the problem.
>
> Cheers,
> Till
>
> On Tue, Sep 6, 2016 at 6:50 PM, Theodore Vasiloudis 
> <theodoros.vasiloudis@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Have you tried profiling the application to see where most of the
>     time is spent during the runs?
>
>     If most of the time is spent reading in the data maybe any
>     difference between the two methods is being obscured.
>
>     -- 
>     Sent from a mobile device. May contain autocorrect errors.
>
>
>     On Sep 6, 2016 4:55 PM, "Greg Hogan" <code@greghogan.com
>     <ma...@greghogan.com>> wrote:
>
>         Hi Dan,
>
>         Flink currently allocates each task slot an equal portion of
>         managed memory. I don't know the best way to count task slots.
>         https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html#workers-slots-resources
>         <https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html#workers-slots-resources>
>
>         If you assign TaskManagers less memory then Linux will use the
>         memory to cache spill files.
>
>         Greg
>
>         On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes
>         <drewes@campus.tu-berlin.de
>         <ma...@campus.tu-berlin.de>> wrote:
>
>             Hi Greg,
>
>             thanks for your response!
>
>             I just had a look and realized that it's just about 85 GB
>             of data. Sorry about that wrong information.
>
>             It's read from a csv file on the master node's local file
>             system. The 8 nodes have more than 40 GB available memory
>             each and since the data is equally distributed I assume
>             there should be no need to spill anything on disk.
>
>             There are 9 iterations.
>
>             Is it possible that also with Flink Iterations the data is
>             repeatedly distributed? Or the other way around: Might it
>             be that flink "remembers" somehow that the data is already
>             distributed even for the while loop?
>
>             -Dan
>
>
>
>             Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>>             Hi Dan,
>>
>>             Where are you reading the 200 GB "data" from? How much
>>             memory per node? If the DataSet is read from a
>>             distributed filesystem and if with iterations Flink must
>>             spill to disk then I wouldn't expect much difference.
>>             About how many iterations are run in the 30 minutes? I
>>             don't know that this is reported explicitly, but if your
>>             convergence function only has one input record per
>>             iteration then the reported total is the iteration count.
>>
>>             One other thought, we should soon have support for object
>>             reuse with arrays (FLINK-3695). This would be implemented
>>             as DoubleValueArray or ValueArray<DoubleValue> rather
>>             than double[] but it would be interesting to test for a
>>             change in performance.
>>
>>             Greg
>>
>>             On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes
>>             <drewes@campus.tu-berlin.de
>>             <ma...@campus.tu-berlin.de>> wrote:
>>
>>                 Hi,
>>
>>                 for my bachelor thesis I'm testing an implementation
>>                 of L-BFGS algorithm with Flink Iterations against a
>>                 version without Flink Iterations but a casual while
>>                 loop instead. Both programs use the same Map and
>>                 Reduce transformations in each iteration. It was
>>                 expected, that the performance of the Flink
>>                 Iterations would scale better with increasing size of
>>                 the input data set. However, the measured results on
>>                 an ibm-power-cluster are very similar for both
>>                 versions, e.g. around 30 minutes for 200 GB data. The
>>                 cluster has 8 nodes, was configured with 4 slots per
>>                 node and I used a total parallelism of 32.
>>                 In every Iteration of the while loop a new flink job
>>                 is started and I thought, that also the data would be
>>                 distributed over the network again in each iteration
>>                 which should consume a significant and measurable
>>                 amount of time. Is that thought wrong or what is the
>>                 computional overhead of the flink iterations that is
>>                 equalizing this disadvantage?
>>                 I include the relevant part of both programs and also
>>                 attach the generated execution plans.
>>                 Thank you for any ideas as I could not find much
>>                 about this issue in the flink docs.
>>
>>                 Best, Dan
>>
>>                 *Flink Iterations:*
>>
>>                 DataSet<double[]> data = ...
>>
>>                 State  state =initialState(m, initweights,0,new double[initweights.length]);
>>                 DataSet<State> statedataset = env.fromElements(state);
>>
>>                 //start of iteration section IterativeDataSet<State> loop= statedataset.iterate(niter);;
>>
>>
>>                 DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop,"state")
>>                                .reduce(accumulate)
>>                                .map(new NormLossGradient(datasize))
>>                                .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>                                .map(new LBFGS());
>>
>>
>>                 DataSet<State> converged = statewithnewlossgradient.filter(
>>                     new FilterFunction<State>() {
>>                        @Override public boolean filter(State  value)throws Exception {
>>                           if(value.getIflag()[0] ==0){
>>                              return false;
>>                           }
>>                           return true;
>>                        }
>>                     }
>>                 );
>>
>>                 DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);
>>
>>                 ***While loop: *
>>
>>                 DataSet<double[]> data =... State  state =initialState(m, initweights,0,new double[initweights.length]);
>>
>>                 int cnt=0;
>>                 do{
>>                     LBFGS lbfgs =new LBFGS();
>>                     statedataset=data.map(difffunction).withBroadcastSet(statedataset,"state")
>>                        .reduce(accumulate)
>>                        .map(new NormLossGradient(datasize))
>>                        .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>                        .map(lbfgs);
>>                     cnt++;
>>                 }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] !=0);
>>


---
Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
https://www.avast.com/antivirus

Re: Flink Iterations vs. While loop

Posted by Till Rohrmann <tr...@apache.org>.
Usually, the while loop solution should perform much worse since it will
execute with each new iteration all previous iterations steps without
persisting the intermediate results. Thus, it should have a quadratic
complexity in terms of iteration step operations instead of a linear
complexity. Additionally the while loop will suffer from memory
fragmentation because of the explicit DAG unrolling.

I agree with Theo that access to the full code would help a lot to pinpoint
the problem.

Cheers,
Till

On Tue, Sep 6, 2016 at 6:50 PM, Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> Have you tried profiling the application to see where most of the time is
> spent during the runs?
>
> If most of the time is spent reading in the data maybe any difference
> between the two methods is being obscured.
>
> --
> Sent from a mobile device. May contain autocorrect errors.
>
> On Sep 6, 2016 4:55 PM, "Greg Hogan" <co...@greghogan.com> wrote:
>
>> Hi Dan,
>>
>> Flink currently allocates each task slot an equal portion of managed
>> memory. I don't know the best way to count task slots.
>>   https://ci.apache.org/projects/flink/flink-docs-master/
>> concepts/index.html#workers-slots-resources
>>
>> If you assign TaskManagers less memory then Linux will use the memory to
>> cache spill files.
>>
>> Greg
>>
>> On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes <dr...@campus.tu-berlin.de>
>> wrote:
>>
>>> Hi Greg,
>>>
>>> thanks for your response!
>>>
>>> I just had a look and realized that it's just about 85 GB of data. Sorry
>>> about that wrong information.
>>>
>>> It's read from a csv file on the master node's local file system. The 8
>>> nodes have more than 40 GB available memory each and since the data is
>>> equally distributed I assume there should be no need to spill anything on
>>> disk.
>>>
>>> There are 9 iterations.
>>>
>>> Is it possible that also with Flink Iterations the data is repeatedly
>>> distributed? Or the other way around: Might it be that flink "remembers"
>>> somehow that the data is already distributed even for the while loop?
>>>
>>> -Dan
>>>
>>>
>>>
>>> Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>>>
>>> Hi Dan,
>>>
>>> Where are you reading the 200 GB "data" from? How much memory per node?
>>> If the DataSet is read from a distributed filesystem and if with iterations
>>> Flink must spill to disk then I wouldn't expect much difference. About how
>>> many iterations are run in the 30 minutes? I don't know that this is
>>> reported explicitly, but if your convergence function only has one input
>>> record per iteration then the reported total is the iteration count.
>>>
>>> One other thought, we should soon have support for object reuse with
>>> arrays (FLINK-3695). This would be implemented as DoubleValueArray or
>>> ValueArray<DoubleValue> rather than double[] but it would be interesting to
>>> test for a change in performance.
>>>
>>> Greg
>>>
>>> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dr...@campus.tu-berlin.de>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> for my bachelor thesis I'm testing an implementation of L-BFGS
>>>> algorithm with Flink Iterations against a version without Flink Iterations
>>>> but a casual while loop instead. Both programs use the same Map and Reduce
>>>> transformations in each iteration. It was expected, that the performance of
>>>> the Flink Iterations would scale better with increasing size of the input
>>>> data set. However, the measured results on an ibm-power-cluster are very
>>>> similar for both versions, e.g. around 30 minutes for 200 GB data. The
>>>> cluster has 8 nodes, was configured with 4 slots per node and I used a
>>>> total parallelism of 32.
>>>> In every Iteration of the while loop a new flink job is started and I
>>>> thought, that also the data would be distributed over the network again in
>>>> each iteration which should consume a significant and measurable amount of
>>>> time. Is that thought wrong or what is the computional overhead of the
>>>> flink iterations that is equalizing this disadvantage?
>>>> I include the relevant part of both programs and also attach the
>>>> generated execution plans.
>>>> Thank you for any ideas as I could not find much about this issue in
>>>> the flink docs.
>>>>
>>>> Best, Dan
>>>>
>>>> *Flink Iterations:*
>>>>
>>>> DataSet<double[]> data = ...
>>>>
>>>> State state = initialState(m, initweights,0,new double[initweights.length]);
>>>> DataSet<State> statedataset = env.fromElements(state);
>>>> //start of iteration sectionIterativeDataSet<State> loop= statedataset.iterate(niter);;
>>>>
>>>>
>>>> DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop, "state")
>>>>               .reduce(accumulate)
>>>>               .map(new NormLossGradient(datasize))
>>>>               .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>>>               .map(new LBFGS());
>>>>
>>>>
>>>> DataSet<State> converged = statewithnewlossgradient.filter(
>>>>    new FilterFunction<State>() {
>>>>       @Override      public boolean filter(State value) throws Exception {
>>>>          if(value.getIflag()[0] == 0){
>>>>             return false;
>>>>          }
>>>>          return true;
>>>>       }
>>>>    }
>>>> );
>>>>
>>>> DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);
>>>>
>>>> *While loop: *
>>>>
>>>> DataSet<double[]> data =...
>>>> State state = initialState(m, initweights,0,new double[initweights.length]);
>>>> int cnt=0;do{
>>>>    LBFGS lbfgs = new LBFGS();
>>>>    statedataset=data.map(difffunction).withBroadcastSet(statedataset, "state")
>>>>       .reduce(accumulate)
>>>>       .map(new NormLossGradient(datasize))
>>>>       .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>>>       .map(lbfgs);
>>>>    cnt++;
>>>> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0);
>>>>
>>>>
>>

Re: Flink Iterations vs. While loop

Posted by Theodore Vasiloudis <th...@gmail.com>.
Have you tried profiling the application to see where most of the time is
spent during the runs?

If most of the time is spent reading in the data maybe any difference
between the two methods is being obscured.

-- 
Sent from a mobile device. May contain autocorrect errors.

On Sep 6, 2016 4:55 PM, "Greg Hogan" <co...@greghogan.com> wrote:

> Hi Dan,
>
> Flink currently allocates each task slot an equal portion of managed
> memory. I don't know the best way to count task slots.
>   https://ci.apache.org/projects/flink/flink-docs-
> master/concepts/index.html#workers-slots-resources
>
> If you assign TaskManagers less memory then Linux will use the memory to
> cache spill files.
>
> Greg
>
> On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes <dr...@campus.tu-berlin.de>
> wrote:
>
>> Hi Greg,
>>
>> thanks for your response!
>>
>> I just had a look and realized that it's just about 85 GB of data. Sorry
>> about that wrong information.
>>
>> It's read from a csv file on the master node's local file system. The 8
>> nodes have more than 40 GB available memory each and since the data is
>> equally distributed I assume there should be no need to spill anything on
>> disk.
>>
>> There are 9 iterations.
>>
>> Is it possible that also with Flink Iterations the data is repeatedly
>> distributed? Or the other way around: Might it be that flink "remembers"
>> somehow that the data is already distributed even for the while loop?
>>
>> -Dan
>>
>>
>>
>> Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>>
>> Hi Dan,
>>
>> Where are you reading the 200 GB "data" from? How much memory per node?
>> If the DataSet is read from a distributed filesystem and if with iterations
>> Flink must spill to disk then I wouldn't expect much difference. About how
>> many iterations are run in the 30 minutes? I don't know that this is
>> reported explicitly, but if your convergence function only has one input
>> record per iteration then the reported total is the iteration count.
>>
>> One other thought, we should soon have support for object reuse with
>> arrays (FLINK-3695). This would be implemented as DoubleValueArray or
>> ValueArray<DoubleValue> rather than double[] but it would be interesting to
>> test for a change in performance.
>>
>> Greg
>>
>> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dr...@campus.tu-berlin.de>
>> wrote:
>>
>>> Hi,
>>>
>>> for my bachelor thesis I'm testing an implementation of L-BFGS algorithm
>>> with Flink Iterations against a version without Flink Iterations but a
>>> casual while loop instead. Both programs use the same Map and Reduce
>>> transformations in each iteration. It was expected, that the performance of
>>> the Flink Iterations would scale better with increasing size of the input
>>> data set. However, the measured results on an ibm-power-cluster are very
>>> similar for both versions, e.g. around 30 minutes for 200 GB data. The
>>> cluster has 8 nodes, was configured with 4 slots per node and I used a
>>> total parallelism of 32.
>>> In every Iteration of the while loop a new flink job is started and I
>>> thought, that also the data would be distributed over the network again in
>>> each iteration which should consume a significant and measurable amount of
>>> time. Is that thought wrong or what is the computional overhead of the
>>> flink iterations that is equalizing this disadvantage?
>>> I include the relevant part of both programs and also attach the
>>> generated execution plans.
>>> Thank you for any ideas as I could not find much about this issue in the
>>> flink docs.
>>>
>>> Best, Dan
>>>
>>> *Flink Iterations:*
>>>
>>> DataSet<double[]> data = ...
>>>
>>> State state = initialState(m, initweights,0,new double[initweights.length]);
>>> DataSet<State> statedataset = env.fromElements(state);
>>> //start of iteration sectionIterativeDataSet<State> loop= statedataset.iterate(niter);;
>>>
>>>
>>> DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop, "state")
>>>               .reduce(accumulate)
>>>               .map(new NormLossGradient(datasize))
>>>               .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>>               .map(new LBFGS());
>>>
>>>
>>> DataSet<State> converged = statewithnewlossgradient.filter(
>>>    new FilterFunction<State>() {
>>>       @Override      public boolean filter(State value) throws Exception {
>>>          if(value.getIflag()[0] == 0){
>>>             return false;
>>>          }
>>>          return true;
>>>       }
>>>    }
>>> );
>>>
>>> DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);
>>>
>>> *While loop: *
>>>
>>> DataSet<double[]> data =...
>>> State state = initialState(m, initweights,0,new double[initweights.length]);
>>> int cnt=0;do{
>>>    LBFGS lbfgs = new LBFGS();
>>>    statedataset=data.map(difffunction).withBroadcastSet(statedataset, "state")
>>>       .reduce(accumulate)
>>>       .map(new NormLossGradient(datasize))
>>>       .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>>       .map(lbfgs);
>>>    cnt++;
>>> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0);
>>>
>>>
>

Re: Flink Iterations vs. While loop

Posted by Greg Hogan <co...@greghogan.com>.
Hi Dan,

Flink currently allocates each task slot an equal portion of managed
memory. I don't know the best way to count task slots.

https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html#workers-slots-resources

If you assign TaskManagers less memory then Linux will use the memory to
cache spill files.

Greg

On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes <dr...@campus.tu-berlin.de>
wrote:

> Hi Greg,
>
> thanks for your response!
>
> I just had a look and realized that it's just about 85 GB of data. Sorry
> about that wrong information.
>
> It's read from a csv file on the master node's local file system. The 8
> nodes have more than 40 GB available memory each and since the data is
> equally distributed I assume there should be no need to spill anything on
> disk.
>
> There are 9 iterations.
>
> Is it possible that also with Flink Iterations the data is repeatedly
> distributed? Or the other way around: Might it be that flink "remembers"
> somehow that the data is already distributed even for the while loop?
>
> -Dan
>
>
>
> Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>
> Hi Dan,
>
> Where are you reading the 200 GB "data" from? How much memory per node? If
> the DataSet is read from a distributed filesystem and if with iterations
> Flink must spill to disk then I wouldn't expect much difference. About how
> many iterations are run in the 30 minutes? I don't know that this is
> reported explicitly, but if your convergence function only has one input
> record per iteration then the reported total is the iteration count.
>
> One other thought, we should soon have support for object reuse with
> arrays (FLINK-3695). This would be implemented as DoubleValueArray or
> ValueArray<DoubleValue> rather than double[] but it would be interesting to
> test for a change in performance.
>
> Greg
>
> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dr...@campus.tu-berlin.de>
> wrote:
>
>> Hi,
>>
>> for my bachelor thesis I'm testing an implementation of L-BFGS algorithm
>> with Flink Iterations against a version without Flink Iterations but a
>> casual while loop instead. Both programs use the same Map and Reduce
>> transformations in each iteration. It was expected, that the performance of
>> the Flink Iterations would scale better with increasing size of the input
>> data set. However, the measured results on an ibm-power-cluster are very
>> similar for both versions, e.g. around 30 minutes for 200 GB data. The
>> cluster has 8 nodes, was configured with 4 slots per node and I used a
>> total parallelism of 32.
>> In every Iteration of the while loop a new flink job is started and I
>> thought, that also the data would be distributed over the network again in
>> each iteration which should consume a significant and measurable amount of
>> time. Is that thought wrong or what is the computional overhead of the
>> flink iterations that is equalizing this disadvantage?
>> I include the relevant part of both programs and also attach the
>> generated execution plans.
>> Thank you for any ideas as I could not find much about this issue in the
>> flink docs.
>>
>> Best, Dan
>>
>> *Flink Iterations:*
>>
>> DataSet<double[]> data = ...
>>
>> State state = initialState(m, initweights,0,new double[initweights.length]);
>> DataSet<State> statedataset = env.fromElements(state);
>> //start of iteration sectionIterativeDataSet<State> loop= statedataset.iterate(niter);;
>>
>>
>> DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop, "state")
>>               .reduce(accumulate)
>>               .map(new NormLossGradient(datasize))
>>               .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>               .map(new LBFGS());
>>
>>
>> DataSet<State> converged = statewithnewlossgradient.filter(
>>    new FilterFunction<State>() {
>>       @Override      public boolean filter(State value) throws Exception {
>>          if(value.getIflag()[0] == 0){
>>             return false;
>>          }
>>          return true;
>>       }
>>    }
>> );
>>
>> DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);
>>
>> *While loop: *
>>
>> DataSet<double[]> data =...
>> State state = initialState(m, initweights,0,new double[initweights.length]);
>> int cnt=0;do{
>>    LBFGS lbfgs = new LBFGS();
>>    statedataset=data.map(difffunction).withBroadcastSet(statedataset, "state")
>>       .reduce(accumulate)
>>       .map(new NormLossGradient(datasize))
>>       .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>       .map(lbfgs);
>>    cnt++;
>> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0);
>>
>>

Re: Flink Iterations vs. While loop

Posted by Dan Drewes <dr...@campus.tu-berlin.de>.
Hi Greg,

thanks for your response!

I just had a look and realized that it's just about 85 GB of data. Sorry 
about that wrong information.

It's read from a csv file on the master node's local file system. The 8 
nodes have more than 40 GB available memory each and since the data is 
equally distributed I assume there should be no need to spill anything 
on disk.

There are 9 iterations.

Is it possible that also with Flink Iterations the data is repeatedly 
distributed? Or the other way around: Might it be that flink "remembers" 
somehow that the data is already distributed even for the while loop?

-Dan



Am 02.09.2016 um 16:39 schrieb Greg Hogan:
> Hi Dan,
>
> Where are you reading the 200 GB "data" from? How much memory per 
> node? If the DataSet is read from a distributed filesystem and if with 
> iterations Flink must spill to disk then I wouldn't expect much 
> difference. About how many iterations are run in the 30 minutes? I 
> don't know that this is reported explicitly, but if your convergence 
> function only has one input record per iteration then the reported 
> total is the iteration count.
>
> One other thought, we should soon have support for object reuse with 
> arrays (FLINK-3695). This would be implemented as DoubleValueArray or 
> ValueArray<DoubleValue> rather than double[] but it would be 
> interesting to test for a change in performance.
>
> Greg
>
> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <drewes@campus.tu-berlin.de 
> <ma...@campus.tu-berlin.de>> wrote:
>
>     Hi,
>
>     for my bachelor thesis I'm testing an implementation of L-BFGS
>     algorithm with Flink Iterations against a version without Flink
>     Iterations but a casual while loop instead. Both programs use the
>     same Map and Reduce transformations in each iteration. It was
>     expected, that the performance of the Flink Iterations would scale
>     better with increasing size of the input data set. However, the
>     measured results on an ibm-power-cluster are very similar for both
>     versions, e.g. around 30 minutes for 200 GB data. The cluster has
>     8 nodes, was configured with 4 slots per node and I used a total
>     parallelism of 32.
>     In every Iteration of the while loop a new flink job is started
>     and I thought, that also the data would be distributed over the
>     network again in each iteration which should consume a significant
>     and measurable amount of time. Is that thought wrong or what is
>     the computional overhead of the flink iterations that is
>     equalizing this disadvantage?
>     I include the relevant part of both programs and also attach the
>     generated execution plans.
>     Thank you for any ideas as I could not find much about this issue
>     in the flink docs.
>
>     Best, Dan
>
>     *Flink Iterations:*
>
>     DataSet<double[]> data = ...
>
>     State  state =initialState(m, initweights,0,new double[initweights.length]);
>     DataSet<State> statedataset = env.fromElements(state);
>
>     //start of iteration section IterativeDataSet<State> loop= statedataset.iterate(niter);;
>
>
>     DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop,"state")
>                    .reduce(accumulate)
>                    .map(new NormLossGradient(datasize))
>                    .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>                    .map(new LBFGS());
>
>
>     DataSet<State> converged = statewithnewlossgradient.filter(
>         new FilterFunction<State>() {
>            @Override public boolean filter(State  value)throws Exception {
>               if(value.getIflag()[0] ==0){
>                  return false;
>               }
>               return true;
>            }
>         }
>     );
>
>     DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);
>
>     ***While loop: *
>
>     DataSet<double[]> data =... State  state =initialState(m, initweights,0,new double[initweights.length]);
>
>     int cnt=0;
>     do{
>         LBFGS lbfgs =new LBFGS();
>         statedataset=data.map(difffunction).withBroadcastSet(statedataset,"state")
>            .reduce(accumulate)
>            .map(new NormLossGradient(datasize))
>            .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>            .map(lbfgs);
>         cnt++;
>     }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] !=0);
>

Re: Flink Iterations vs. While loop

Posted by Greg Hogan <co...@greghogan.com>.
Hi Dan,

Where are you reading the 200 GB "data" from? How much memory per node? If
the DataSet is read from a distributed filesystem and if with iterations
Flink must spill to disk then I wouldn't expect much difference. About how
many iterations are run in the 30 minutes? I don't know that this is
reported explicitly, but if your convergence function only has one input
record per iteration then the reported total is the iteration count.

One other thought, we should soon have support for object reuse with arrays
(FLINK-3695). This would be implemented as DoubleValueArray or
ValueArray<DoubleValue> rather than double[] but it would be interesting to
test for a change in performance.

Greg

On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dr...@campus.tu-berlin.de>
wrote:

> Hi,
>
> for my bachelor thesis I'm testing an implementation of L-BFGS algorithm
> with Flink Iterations against a version without Flink Iterations but a
> casual while loop instead. Both programs use the same Map and Reduce
> transformations in each iteration. It was expected, that the performance of
> the Flink Iterations would scale better with increasing size of the input
> data set. However, the measured results on an ibm-power-cluster are very
> similar for both versions, e.g. around 30 minutes for 200 GB data. The
> cluster has 8 nodes, was configured with 4 slots per node and I used a
> total parallelism of 32.
> In every Iteration of the while loop a new flink job is started and I
> thought, that also the data would be distributed over the network again in
> each iteration which should consume a significant and measurable amount of
> time. Is that thought wrong or what is the computional overhead of the
> flink iterations that is equalizing this disadvantage?
> I include the relevant part of both programs and also attach the generated
> execution plans.
> Thank you for any ideas as I could not find much about this issue in the
> flink docs.
>
> Best, Dan
>
> *Flink Iterations:*
>
> DataSet<double[]> data = ...
>
> State state = initialState(m, initweights,0,new double[initweights.length]);
> DataSet<State> statedataset = env.fromElements(state);
> //start of iteration sectionIterativeDataSet<State> loop= statedataset.iterate(niter);;
>
>
> DataSet<State> statewithnewlossgradient = data.map(difffunction).withBroadcastSet(loop, "state")
>               .reduce(accumulate)
>               .map(new NormLossGradient(datasize))
>               .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>               .map(new LBFGS());
>
>
> DataSet<State> converged = statewithnewlossgradient.filter(
>    new FilterFunction<State>() {
>       @Override      public boolean filter(State value) throws Exception {
>          if(value.getIflag()[0] == 0){
>             return false;
>          }
>          return true;
>       }
>    }
> );
>
> DataSet<State> finalstate = loop.closeWith(statewithnewlossgradient,converged);
>
>
>
>
> *While loop: *
>
> DataSet<double[]> data =...
> State state = initialState(m, initweights,0,new double[initweights.length]);
> int cnt=0;do{
>    LBFGS lbfgs = new LBFGS();
>    statedataset=data.map(difffunction).withBroadcastSet(statedataset, "state")
>       .reduce(accumulate)
>       .map(new NormLossGradient(datasize))
>       .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>       .map(lbfgs);
>    cnt++;
> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0);
>
>
>