You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Mike Hynes <91...@gmail.com> on 2015/09/26 19:20:31 UTC

treeAggregate timing / SGD performance with miniBatchFraction < 1

Hi Evan,

(I just realized my initial email was a reply to the wrong thread; I'm
very sorry about this).

Thanks for your email, and your thoughts on the sampling. That the
gradient computations are essentially the cost of a pass through each
element of the partition makes sense, especially given the sparsity of
the feature vectors.

Would you have any idea why the communication time is so much larger
in the final level of the aggregation, however? I can't immediately
see why it should take longer to transfer the local gradient vectors
in that level, since they are dense in every level. Furthermore, the
driver is receiving the result of only 4 tasks, which is relatively
small.

Mike


On 9/26/15, Evan R. Sparks <ev...@gmail.com> wrote:
> Mike,
>
> I believe the reason you're seeing near identical performance on the
> gradient computations is twofold
> 1) Gradient computations for GLM models are computationally pretty cheap
> from a FLOPs/byte read perspective. They are essentially a BLAS "gemv" call
> in the dense case, which is well known to be bound by memory bandwidth on
> modern processors. So, you're basically paying the cost of a scan of the
> points you've sampled to do the gradient computation.
> 2) The default sampling mechanism used by the GradientDescent optimizer in
> MLlib is implemented via RDD.sample, which does reservoir sampling on each
> partition. This requires a full scan of each partition at every iteration
> to collect the samples.
>
> So - you're going to pay the cost of a scan to do the sampling anyway, and
> the gradient computation is essentially free at this point (and can be
> pipelined, etc.).
>
> It is quite possible to improve #2 by coming up with a better sampling
> algorithm. One easy algorithm would be to assume the data is already
> randomly shuffled (or do that once) and then use the first
> miniBatchFraction*partitionSize records on the first iteration, the second
> set on the second set on the second iteration, and so on. You could
> protoype this algorithm pretty easily by converting your data to an
> RDD[Array[DenseVector]] and doing some bookkeeping at each iteration.
>
> That said - eventually the overheads of the platform catch up to you. As a
> rule of thumb I estimate about 50ms/iteration as a floor for things like
> task serialization and other platform overheads. You've got to balance how
> much computation you want to do vs. the amount of time you want to spend
> waiting for the platform.
>
> - Evan
>
> On Sat, Sep 26, 2015 at 9:27 AM, Mike Hynes <91...@gmail.com> wrote:
>
>> Hello Devs,
>>
>> This email concerns some timing results for a treeAggregate in
>> computing a (stochastic) gradient over an RDD of labelled points, as
>> is currently done in the MLlib optimization routine for SGD.
>>
>> In SGD, the underlying RDD is downsampled by a fraction f \in (0,1],
>> and the subgradients over all the instances in the downsampled RDD are
>> aggregated to the driver as a dense vector. However, we have noticed
>> some unusual behaviour when f < 1: it takes the same amount of time to
>> compute the stochastic gradient for a stochastic minibatch as it does
>> for a full batch (f = 1).
>>
>> Attached are two plots of the mean task timing metrics for each level
>> in the aggregation, which has been performed with 4 levels (level 4 is
>> the final level, in which the results are communicated to the driver).
>> 16 nodes are used, and the RDD has 256 partitions. We run in (client)
>> standalone mode. Here, the total time for the tasks is shown (\tau)
>> alongside the execution time (not counting GC),
>> serialization/deserialization time, the GC time, and the difference
>> between tau and all other times, assumed to be variable
>> IO/communication/waiting time. The RDD in this case is a labelled
>> point representation of the KDD Bridge to Algebra dataset, with 20M
>> (sparse) instances and a problem dimension of 30M. The sparsity of the
>> instances is very high---each individual instance vector may have only
>> a hundred nonzeros. All metrics have been taken from the JSON Spark
>> event logs.
>>
>> The plot gradient_f1.pdf shows the times for a gradient computation
>> with f = 1, and gradient_f-3.pdf shows the same metrics with f = 1e-3.
>> For other f values in {1e-1 1e-2 ... 1e-5}, the same effect is
>> observed.
>>
>> What I would like to mention about these plots, and ask if anyone has
>> experience with, is the following:
>> 1. The times are essentially identical; I would have thought that
>> downsampling the RDD before aggregating the subgradients would at
>> least reduce the execution time required, if not the
>> communication/serialization times.
>> 2. The serialization time in level 4 is almost entirely from the
>> result serialization to the driver, and not the task deserialization.
>> In each level of the treeAggregation, however, the local (dense)
>> gradients have to be communicated between compute nodes, so I am
>> surprised that it takes so much longer to return the vectors to the
>> driver.
>>
>> I initially wondered if the large IO overhead in the last stage had
>> anything to do with client mode vs cluster mode, since, from what I
>> understand, only a single core is allocated to the driver thread in
>> client mode. However, when running tests in the two modes, I have
>> previously seen no appreciable difference in the running time for
>> other (admittedly smaller) problems. Furthermore, I am still very
>> confused about why the execution time for each task is just as large
>> for the downsampled RDD. It seems unlikely that sampling each
>> partition would be as expensive as the gradient computations, even for
>> sparse feature vectors.
>>
>> If anyone has experience working with the sampling in minibatch SGD or
>> has tested the scalability of the treeAggregation operation for
>> vectors, I'd really appreciate your thoughts.
>>
>> Thanks,
>> Mike
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> For additional commands, e-mail: dev-help@spark.apache.org
>>
>


-- 
Thanks,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: treeAggregate timing / SGD performance with miniBatchFraction < 1

Posted by Mike Hynes <91...@gmail.com>.
That is an interesting point; I run the driver as a background process
on the master node so that I can still pipe the stdout/stderr
filestreams to the (network) filesystem.
I should mention that the master is connected to the slaves with a 10
Gb link on the same managed switch that the slaves use.

On 9/26/15, Evan R. Sparks <ev...@gmail.com> wrote:
> Off the top of my head, I'm not sure, but it looks like virtually all the
> extra time between each stage is accounted for with T_{io} in your plot,
> which I'm guessing is time spent communicating results over the network? Is
> your driver running on the master or is it on a different node? If you look
> at the code for treeAggregate, the last stage uses a .reduce() for the
> final combination, which happens on the driver. In this case, the size of
> the gradients is O(1GB) so if you've got to go over a slow link for the
> last portion this could really make a difference.
>
> On Sat, Sep 26, 2015 at 10:20 AM, Mike Hynes <91...@gmail.com> wrote:
>
>> Hi Evan,
>>
>> (I just realized my initial email was a reply to the wrong thread; I'm
>> very sorry about this).
>>
>> Thanks for your email, and your thoughts on the sampling. That the
>> gradient computations are essentially the cost of a pass through each
>> element of the partition makes sense, especially given the sparsity of
>> the feature vectors.
>>
>> Would you have any idea why the communication time is so much larger
>> in the final level of the aggregation, however? I can't immediately
>> see why it should take longer to transfer the local gradient vectors
>> in that level, since they are dense in every level. Furthermore, the
>> driver is receiving the result of only 4 tasks, which is relatively
>> small.
>>
>> Mike
>>
>>
>> On 9/26/15, Evan R. Sparks <ev...@gmail.com> wrote:
>> > Mike,
>> >
>> > I believe the reason you're seeing near identical performance on the
>> > gradient computations is twofold
>> > 1) Gradient computations for GLM models are computationally pretty
>> > cheap
>> > from a FLOPs/byte read perspective. They are essentially a BLAS "gemv"
>> call
>> > in the dense case, which is well known to be bound by memory bandwidth
>> > on
>> > modern processors. So, you're basically paying the cost of a scan of
>> > the
>> > points you've sampled to do the gradient computation.
>> > 2) The default sampling mechanism used by the GradientDescent optimizer
>> in
>> > MLlib is implemented via RDD.sample, which does reservoir sampling on
>> each
>> > partition. This requires a full scan of each partition at every
>> > iteration
>> > to collect the samples.
>> >
>> > So - you're going to pay the cost of a scan to do the sampling anyway,
>> and
>> > the gradient computation is essentially free at this point (and can be
>> > pipelined, etc.).
>> >
>> > It is quite possible to improve #2 by coming up with a better sampling
>> > algorithm. One easy algorithm would be to assume the data is already
>> > randomly shuffled (or do that once) and then use the first
>> > miniBatchFraction*partitionSize records on the first iteration, the
>> second
>> > set on the second set on the second iteration, and so on. You could
>> > protoype this algorithm pretty easily by converting your data to an
>> > RDD[Array[DenseVector]] and doing some bookkeeping at each iteration.
>> >
>> > That said - eventually the overheads of the platform catch up to you.
>> > As
>> a
>> > rule of thumb I estimate about 50ms/iteration as a floor for things
>> > like
>> > task serialization and other platform overheads. You've got to balance
>> how
>> > much computation you want to do vs. the amount of time you want to
>> > spend
>> > waiting for the platform.
>> >
>> > - Evan
>> >
>> > On Sat, Sep 26, 2015 at 9:27 AM, Mike Hynes <91...@gmail.com> wrote:
>> >
>> >> Hello Devs,
>> >>
>> >> This email concerns some timing results for a treeAggregate in
>> >> computing a (stochastic) gradient over an RDD of labelled points, as
>> >> is currently done in the MLlib optimization routine for SGD.
>> >>
>> >> In SGD, the underlying RDD is downsampled by a fraction f \in (0,1],
>> >> and the subgradients over all the instances in the downsampled RDD are
>> >> aggregated to the driver as a dense vector. However, we have noticed
>> >> some unusual behaviour when f < 1: it takes the same amount of time to
>> >> compute the stochastic gradient for a stochastic minibatch as it does
>> >> for a full batch (f = 1).
>> >>
>> >> Attached are two plots of the mean task timing metrics for each level
>> >> in the aggregation, which has been performed with 4 levels (level 4 is
>> >> the final level, in which the results are communicated to the driver).
>> >> 16 nodes are used, and the RDD has 256 partitions. We run in (client)
>> >> standalone mode. Here, the total time for the tasks is shown (\tau)
>> >> alongside the execution time (not counting GC),
>> >> serialization/deserialization time, the GC time, and the difference
>> >> between tau and all other times, assumed to be variable
>> >> IO/communication/waiting time. The RDD in this case is a labelled
>> >> point representation of the KDD Bridge to Algebra dataset, with 20M
>> >> (sparse) instances and a problem dimension of 30M. The sparsity of the
>> >> instances is very high---each individual instance vector may have only
>> >> a hundred nonzeros. All metrics have been taken from the JSON Spark
>> >> event logs.
>> >>
>> >> The plot gradient_f1.pdf shows the times for a gradient computation
>> >> with f = 1, and gradient_f-3.pdf shows the same metrics with f = 1e-3.
>> >> For other f values in {1e-1 1e-2 ... 1e-5}, the same effect is
>> >> observed.
>> >>
>> >> What I would like to mention about these plots, and ask if anyone has
>> >> experience with, is the following:
>> >> 1. The times are essentially identical; I would have thought that
>> >> downsampling the RDD before aggregating the subgradients would at
>> >> least reduce the execution time required, if not the
>> >> communication/serialization times.
>> >> 2. The serialization time in level 4 is almost entirely from the
>> >> result serialization to the driver, and not the task deserialization.
>> >> In each level of the treeAggregation, however, the local (dense)
>> >> gradients have to be communicated between compute nodes, so I am
>> >> surprised that it takes so much longer to return the vectors to the
>> >> driver.
>> >>
>> >> I initially wondered if the large IO overhead in the last stage had
>> >> anything to do with client mode vs cluster mode, since, from what I
>> >> understand, only a single core is allocated to the driver thread in
>> >> client mode. However, when running tests in the two modes, I have
>> >> previously seen no appreciable difference in the running time for
>> >> other (admittedly smaller) problems. Furthermore, I am still very
>> >> confused about why the execution time for each task is just as large
>> >> for the downsampled RDD. It seems unlikely that sampling each
>> >> partition would be as expensive as the gradient computations, even for
>> >> sparse feature vectors.
>> >>
>> >> If anyone has experience working with the sampling in minibatch SGD or
>> >> has tested the scalability of the treeAggregation operation for
>> >> vectors, I'd really appreciate your thoughts.
>> >>
>> >> Thanks,
>> >> Mike
>> >>
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> >> For additional commands, e-mail: dev-help@spark.apache.org
>> >>
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>


-- 
Thanks,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: treeAggregate timing / SGD performance with miniBatchFraction < 1

Posted by "Evan R. Sparks" <ev...@gmail.com>.
Off the top of my head, I'm not sure, but it looks like virtually all the
extra time between each stage is accounted for with T_{io} in your plot,
which I'm guessing is time spent communicating results over the network? Is
your driver running on the master or is it on a different node? If you look
at the code for treeAggregate, the last stage uses a .reduce() for the
final combination, which happens on the driver. In this case, the size of
the gradients is O(1GB) so if you've got to go over a slow link for the
last portion this could really make a difference.

On Sat, Sep 26, 2015 at 10:20 AM, Mike Hynes <91...@gmail.com> wrote:

> Hi Evan,
>
> (I just realized my initial email was a reply to the wrong thread; I'm
> very sorry about this).
>
> Thanks for your email, and your thoughts on the sampling. That the
> gradient computations are essentially the cost of a pass through each
> element of the partition makes sense, especially given the sparsity of
> the feature vectors.
>
> Would you have any idea why the communication time is so much larger
> in the final level of the aggregation, however? I can't immediately
> see why it should take longer to transfer the local gradient vectors
> in that level, since they are dense in every level. Furthermore, the
> driver is receiving the result of only 4 tasks, which is relatively
> small.
>
> Mike
>
>
> On 9/26/15, Evan R. Sparks <ev...@gmail.com> wrote:
> > Mike,
> >
> > I believe the reason you're seeing near identical performance on the
> > gradient computations is twofold
> > 1) Gradient computations for GLM models are computationally pretty cheap
> > from a FLOPs/byte read perspective. They are essentially a BLAS "gemv"
> call
> > in the dense case, which is well known to be bound by memory bandwidth on
> > modern processors. So, you're basically paying the cost of a scan of the
> > points you've sampled to do the gradient computation.
> > 2) The default sampling mechanism used by the GradientDescent optimizer
> in
> > MLlib is implemented via RDD.sample, which does reservoir sampling on
> each
> > partition. This requires a full scan of each partition at every iteration
> > to collect the samples.
> >
> > So - you're going to pay the cost of a scan to do the sampling anyway,
> and
> > the gradient computation is essentially free at this point (and can be
> > pipelined, etc.).
> >
> > It is quite possible to improve #2 by coming up with a better sampling
> > algorithm. One easy algorithm would be to assume the data is already
> > randomly shuffled (or do that once) and then use the first
> > miniBatchFraction*partitionSize records on the first iteration, the
> second
> > set on the second set on the second iteration, and so on. You could
> > protoype this algorithm pretty easily by converting your data to an
> > RDD[Array[DenseVector]] and doing some bookkeeping at each iteration.
> >
> > That said - eventually the overheads of the platform catch up to you. As
> a
> > rule of thumb I estimate about 50ms/iteration as a floor for things like
> > task serialization and other platform overheads. You've got to balance
> how
> > much computation you want to do vs. the amount of time you want to spend
> > waiting for the platform.
> >
> > - Evan
> >
> > On Sat, Sep 26, 2015 at 9:27 AM, Mike Hynes <91...@gmail.com> wrote:
> >
> >> Hello Devs,
> >>
> >> This email concerns some timing results for a treeAggregate in
> >> computing a (stochastic) gradient over an RDD of labelled points, as
> >> is currently done in the MLlib optimization routine for SGD.
> >>
> >> In SGD, the underlying RDD is downsampled by a fraction f \in (0,1],
> >> and the subgradients over all the instances in the downsampled RDD are
> >> aggregated to the driver as a dense vector. However, we have noticed
> >> some unusual behaviour when f < 1: it takes the same amount of time to
> >> compute the stochastic gradient for a stochastic minibatch as it does
> >> for a full batch (f = 1).
> >>
> >> Attached are two plots of the mean task timing metrics for each level
> >> in the aggregation, which has been performed with 4 levels (level 4 is
> >> the final level, in which the results are communicated to the driver).
> >> 16 nodes are used, and the RDD has 256 partitions. We run in (client)
> >> standalone mode. Here, the total time for the tasks is shown (\tau)
> >> alongside the execution time (not counting GC),
> >> serialization/deserialization time, the GC time, and the difference
> >> between tau and all other times, assumed to be variable
> >> IO/communication/waiting time. The RDD in this case is a labelled
> >> point representation of the KDD Bridge to Algebra dataset, with 20M
> >> (sparse) instances and a problem dimension of 30M. The sparsity of the
> >> instances is very high---each individual instance vector may have only
> >> a hundred nonzeros. All metrics have been taken from the JSON Spark
> >> event logs.
> >>
> >> The plot gradient_f1.pdf shows the times for a gradient computation
> >> with f = 1, and gradient_f-3.pdf shows the same metrics with f = 1e-3.
> >> For other f values in {1e-1 1e-2 ... 1e-5}, the same effect is
> >> observed.
> >>
> >> What I would like to mention about these plots, and ask if anyone has
> >> experience with, is the following:
> >> 1. The times are essentially identical; I would have thought that
> >> downsampling the RDD before aggregating the subgradients would at
> >> least reduce the execution time required, if not the
> >> communication/serialization times.
> >> 2. The serialization time in level 4 is almost entirely from the
> >> result serialization to the driver, and not the task deserialization.
> >> In each level of the treeAggregation, however, the local (dense)
> >> gradients have to be communicated between compute nodes, so I am
> >> surprised that it takes so much longer to return the vectors to the
> >> driver.
> >>
> >> I initially wondered if the large IO overhead in the last stage had
> >> anything to do with client mode vs cluster mode, since, from what I
> >> understand, only a single core is allocated to the driver thread in
> >> client mode. However, when running tests in the two modes, I have
> >> previously seen no appreciable difference in the running time for
> >> other (admittedly smaller) problems. Furthermore, I am still very
> >> confused about why the execution time for each task is just as large
> >> for the downsampled RDD. It seems unlikely that sampling each
> >> partition would be as expensive as the gradient computations, even for
> >> sparse feature vectors.
> >>
> >> If anyone has experience working with the sampling in minibatch SGD or
> >> has tested the scalability of the treeAggregation operation for
> >> vectors, I'd really appreciate your thoughts.
> >>
> >> Thanks,
> >> Mike
> >>
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> >> For additional commands, e-mail: dev-help@spark.apache.org
> >>
> >
>
>
> --
> Thanks,
> Mike
>