You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Ulanov, Alexander" <al...@hp.com> on 2015/01/23 19:00:05 UTC

Maximum size of vector that reduce can handle

Dear Spark developers,

I am trying to measure the Spark reduce performance for big vectors. My motivation is related to machine learning gradient. Gradient is a vector that is computed on each worker and then all results need to be summed up and broadcasted back to workers. For example, present machine learning applications involve very long parameter vectors, for deep neural networks it can be up to 2Billions. So, I want to measure the time that is needed for this operation depending on the size of vector and number of workers. I wrote few lines of code that assume that Spark will distribute partitions among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, 16GB RAM), each runs 2 Workers.

import org.apache.spark.mllib.rdd.RDDFunctions._
import breeze.linalg._
import org.apache.log4j._
Logger.getRootLogger.setLevel(Level.OFF)
val n = 60000000
val p = 12
val vv = sc.parallelize(0 until p, p).map(i => DenseVector.rand[Double]( n ))
vv.reduce(_ + _)

When executing in shell with 60M vector it crashes after some period of time. One of the node contains the following in stdout:
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000755500000, 2863661056, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 2863661056 bytes for committing reserved memory.

I run shell with --executor-memory 8G --driver-memory 8G, so handling 60M vector of Double should not be a problem. Are there any big overheads for this? What is the maximum size of vector that reduce can handle? 

Best regards, Alexander

P.S. 

"spark.driver.maxResultSize 0" needs to set in order to run this code. I also needed to change "java.io.tmpdir" and "spark.local.dir" folders because my /tmp folder which is default, was too small and Spark swaps heavily into this folder. Without these settings I get either "no space left on device" or "out of memory" exceptions.

I also submitted a bug https://issues.apache.org/jira/browse/SPARK-5386

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Maximum size of vector that reduce can handle

Posted by Boromir Widas <vc...@gmail.com>.
I am running into this issue as well, when storing large Arrays as the
value in a kv pair
and then doing a reducebykey.
Can one of the experts please comment if it would make sense to add an
operation to
add values in place like accumulators do - this would essentially merge the
vectors for
a given key in place, avoiding multiple allocations of temp array/vectors.
This should
be faster for datasets with frequently repeated keys.

On Tue, Jan 27, 2015 at 11:03 AM, Xiangrui Meng <me...@gmail.com> wrote:

> 60m-vector costs 480MB memory. You have 12 of them to be reduced to the
> driver. So you need ~6GB memory not counting the temp vectors generated
> from '_+_'. You need to increase driver memory to make it work. That being
> said, ~10^7 hits the limit for the current impl of glm. -Xiangrui
> On Jan 23, 2015 2:19 PM, "DB Tsai" <db...@dbtsai.com> wrote:
>
> > Hi Alexander,
> >
> > For `reduce`, it's an action that will collect all the data from
> > mapper to driver, and perform the aggregation in driver. As a result,
> > if the output from the mapper is very large, and the numbers of
> > partitions in mapper are large, it might cause a problem.
> >
> > For `treeReduce`, as the name indicates, the way it works is in the
> > first layer, it aggregates the output of the mappers two by two
> > resulting half of the numbers of output. And then, we continuously do
> > the aggregation layer by layer. The final aggregation will be done in
> > driver but in this time, the numbers of data are small.
> >
> > By default, depth 2 is used, so if you have so many partitions of
> > large vector, this may still cause issue. You can increase the depth
> > into higher numbers such that in the final reduce in driver, the
> > number of partitions are very small.
> >
> > Sincerely,
> >
> > DB Tsai
> > -------------------------------------------------------
> > Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> >
> > On Fri, Jan 23, 2015 at 12:07 PM, Ulanov, Alexander
> > <al...@hp.com> wrote:
> > > Hi DB Tsai,
> > >
> > > Thank you for your suggestion. Actually, I've started my experiments
> > with "treeReduce". Originally, I had "vv.treeReduce(_ + _, 2)" in my
> script
> > exactly because MLlib optimizers are using it, as you pointed out with
> > LBFGS. However, it leads to the same problems as "reduce", but presumably
> > not so directly. As far as I understand, treeReduce limits the number of
> > communications between workers and master forcing workers to partially
> > compute the reduce operation.
> > >
> > > Are you sure that driver will first collect all results (or all partial
> > results in treeReduce) and ONLY then perform aggregation? If that is the
> > problem, then how to force it to do aggregation after receiving each
> > portion of data from Workers?
> > >
> > > Best regards, Alexander
> > >
> > > -----Original Message-----
> > > From: DB Tsai [mailto:dbtsai@dbtsai.com]
> > > Sent: Friday, January 23, 2015 11:53 AM
> > > To: Ulanov, Alexander
> > > Cc: dev@spark.apache.org
> > > Subject: Re: Maximum size of vector that reduce can handle
> > >
> > > Hi Alexander,
> > >
> > > When you use `reduce` to aggregate the vectors, those will actually be
> > pulled into driver, and merged over there. Obviously, it's not scaleable
> > given you are doing deep neural networks which have so many coefficients.
> > >
> > > Please try treeReduce instead which is what we do in linear regression
> > and logistic regression.
> > >
> > > See
> >
> https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
> > > for example.
> > >
> > > val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n),
> > 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
> > features)) => val l = localGradient.compute( features, label, bcW.value,
> > grad) (grad, loss + l) }, combOp = (c1, c2) => (c1, c2) match { case
> > ((grad1, loss1), (grad2, loss2)) => axpy(1.0, grad2, grad1) (grad1,
> loss1 +
> > loss2)
> > > })
> > >
> > > Sincerely,
> > >
> > > DB Tsai
> > > -------------------------------------------------------
> > > Blog: https://www.dbtsai.com
> > > LinkedIn: https://www.linkedin.com/in/dbtsai
> > >
> > >
> > >
> > > On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander <
> > alexander.ulanov@hp.com> wrote:
> > >> Dear Spark developers,
> > >>
> > >> I am trying to measure the Spark reduce performance for big vectors.
> My
> > motivation is related to machine learning gradient. Gradient is a vector
> > that is computed on each worker and then all results need to be summed up
> > and broadcasted back to workers. For example, present machine learning
> > applications involve very long parameter vectors, for deep neural
> networks
> > it can be up to 2Billions. So, I want to measure the time that is needed
> > for this operation depending on the size of vector and number of
> workers. I
> > wrote few lines of code that assume that Spark will distribute partitions
> > among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4
> cores,
> > 16GB RAM), each runs 2 Workers.
> > >>
> > >> import org.apache.spark.mllib.rdd.RDDFunctions._
> > >> import breeze.linalg._
> > >> import org.apache.log4j._
> > >> Logger.getRootLogger.setLevel(Level.OFF)
> > >> val n = 60000000
> > >> val p = 12
> > >> val vv = sc.parallelize(0 until p, p).map(i =>
> > >> DenseVector.rand[Double]( n )) vv.reduce(_ + _)
> > >>
> > >> When executing in shell with 60M vector it crashes after some period
> of
> > time. One of the node contains the following in stdout:
> > >> Java HotSpot(TM) 64-Bit Server VM warning: INFO:
> > >> os::commit_memory(0x0000000755500000, 2863661056, 0) failed;
> > >> error='Cannot allocate memory' (errno=12) # # There is insufficient
> > memory for the Java Runtime Environment to continue.
> > >> # Native memory allocation (malloc) failed to allocate 2863661056
> bytes
> > for committing reserved memory.
> > >>
> > >> I run shell with --executor-memory 8G --driver-memory 8G, so handling
> > 60M vector of Double should not be a problem. Are there any big overheads
> > for this? What is the maximum size of vector that reduce can handle?
> > >>
> > >> Best regards, Alexander
> > >>
> > >> P.S.
> > >>
> > >> "spark.driver.maxResultSize 0" needs to set in order to run this code.
> > I also needed to change "java.io.tmpdir" and "spark.local.dir" folders
> > because my /tmp folder which is default, was too small and Spark swaps
> > heavily into this folder. Without these settings I get either "no space
> > left on device" or "out of memory" exceptions.
> > >>
> > >> I also submitted a bug
> > >> https://issues.apache.org/jira/browse/SPARK-5386
> > >>
> > >> ---------------------------------------------------------------------
> > >> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org For
> > >> additional commands, e-mail: dev-help@spark.apache.org
> > >>
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> > For additional commands, e-mail: dev-help@spark.apache.org
> >
> >
>

Re: Maximum size of vector that reduce can handle

Posted by Xiangrui Meng <me...@gmail.com>.
60m-vector costs 480MB memory. You have 12 of them to be reduced to the
driver. So you need ~6GB memory not counting the temp vectors generated
from '_+_'. You need to increase driver memory to make it work. That being
said, ~10^7 hits the limit for the current impl of glm. -Xiangrui
On Jan 23, 2015 2:19 PM, "DB Tsai" <db...@dbtsai.com> wrote:

> Hi Alexander,
>
> For `reduce`, it's an action that will collect all the data from
> mapper to driver, and perform the aggregation in driver. As a result,
> if the output from the mapper is very large, and the numbers of
> partitions in mapper are large, it might cause a problem.
>
> For `treeReduce`, as the name indicates, the way it works is in the
> first layer, it aggregates the output of the mappers two by two
> resulting half of the numbers of output. And then, we continuously do
> the aggregation layer by layer. The final aggregation will be done in
> driver but in this time, the numbers of data are small.
>
> By default, depth 2 is used, so if you have so many partitions of
> large vector, this may still cause issue. You can increase the depth
> into higher numbers such that in the final reduce in driver, the
> number of partitions are very small.
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
>
> On Fri, Jan 23, 2015 at 12:07 PM, Ulanov, Alexander
> <al...@hp.com> wrote:
> > Hi DB Tsai,
> >
> > Thank you for your suggestion. Actually, I've started my experiments
> with "treeReduce". Originally, I had "vv.treeReduce(_ + _, 2)" in my script
> exactly because MLlib optimizers are using it, as you pointed out with
> LBFGS. However, it leads to the same problems as "reduce", but presumably
> not so directly. As far as I understand, treeReduce limits the number of
> communications between workers and master forcing workers to partially
> compute the reduce operation.
> >
> > Are you sure that driver will first collect all results (or all partial
> results in treeReduce) and ONLY then perform aggregation? If that is the
> problem, then how to force it to do aggregation after receiving each
> portion of data from Workers?
> >
> > Best regards, Alexander
> >
> > -----Original Message-----
> > From: DB Tsai [mailto:dbtsai@dbtsai.com]
> > Sent: Friday, January 23, 2015 11:53 AM
> > To: Ulanov, Alexander
> > Cc: dev@spark.apache.org
> > Subject: Re: Maximum size of vector that reduce can handle
> >
> > Hi Alexander,
> >
> > When you use `reduce` to aggregate the vectors, those will actually be
> pulled into driver, and merged over there. Obviously, it's not scaleable
> given you are doing deep neural networks which have so many coefficients.
> >
> > Please try treeReduce instead which is what we do in linear regression
> and logistic regression.
> >
> > See
> https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
> > for example.
> >
> > val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n),
> 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
> features)) => val l = localGradient.compute( features, label, bcW.value,
> grad) (grad, loss + l) }, combOp = (c1, c2) => (c1, c2) match { case
> ((grad1, loss1), (grad2, loss2)) => axpy(1.0, grad2, grad1) (grad1, loss1 +
> loss2)
> > })
> >
> > Sincerely,
> >
> > DB Tsai
> > -------------------------------------------------------
> > Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> >
> > On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander <
> alexander.ulanov@hp.com> wrote:
> >> Dear Spark developers,
> >>
> >> I am trying to measure the Spark reduce performance for big vectors. My
> motivation is related to machine learning gradient. Gradient is a vector
> that is computed on each worker and then all results need to be summed up
> and broadcasted back to workers. For example, present machine learning
> applications involve very long parameter vectors, for deep neural networks
> it can be up to 2Billions. So, I want to measure the time that is needed
> for this operation depending on the size of vector and number of workers. I
> wrote few lines of code that assume that Spark will distribute partitions
> among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores,
> 16GB RAM), each runs 2 Workers.
> >>
> >> import org.apache.spark.mllib.rdd.RDDFunctions._
> >> import breeze.linalg._
> >> import org.apache.log4j._
> >> Logger.getRootLogger.setLevel(Level.OFF)
> >> val n = 60000000
> >> val p = 12
> >> val vv = sc.parallelize(0 until p, p).map(i =>
> >> DenseVector.rand[Double]( n )) vv.reduce(_ + _)
> >>
> >> When executing in shell with 60M vector it crashes after some period of
> time. One of the node contains the following in stdout:
> >> Java HotSpot(TM) 64-Bit Server VM warning: INFO:
> >> os::commit_memory(0x0000000755500000, 2863661056, 0) failed;
> >> error='Cannot allocate memory' (errno=12) # # There is insufficient
> memory for the Java Runtime Environment to continue.
> >> # Native memory allocation (malloc) failed to allocate 2863661056 bytes
> for committing reserved memory.
> >>
> >> I run shell with --executor-memory 8G --driver-memory 8G, so handling
> 60M vector of Double should not be a problem. Are there any big overheads
> for this? What is the maximum size of vector that reduce can handle?
> >>
> >> Best regards, Alexander
> >>
> >> P.S.
> >>
> >> "spark.driver.maxResultSize 0" needs to set in order to run this code.
> I also needed to change "java.io.tmpdir" and "spark.local.dir" folders
> because my /tmp folder which is default, was too small and Spark swaps
> heavily into this folder. Without these settings I get either "no space
> left on device" or "out of memory" exceptions.
> >>
> >> I also submitted a bug
> >> https://issues.apache.org/jira/browse/SPARK-5386
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org For
> >> additional commands, e-mail: dev-help@spark.apache.org
> >>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>

Re: Maximum size of vector that reduce can handle

Posted by DB Tsai <db...@dbtsai.com>.
Hi Alexander,

For `reduce`, it's an action that will collect all the data from
mapper to driver, and perform the aggregation in driver. As a result,
if the output from the mapper is very large, and the numbers of
partitions in mapper are large, it might cause a problem.

For `treeReduce`, as the name indicates, the way it works is in the
first layer, it aggregates the output of the mappers two by two
resulting half of the numbers of output. And then, we continuously do
the aggregation layer by layer. The final aggregation will be done in
driver but in this time, the numbers of data are small.

By default, depth 2 is used, so if you have so many partitions of
large vector, this may still cause issue. You can increase the depth
into higher numbers such that in the final reduce in driver, the
number of partitions are very small.

Sincerely,

DB Tsai
-------------------------------------------------------
Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai



On Fri, Jan 23, 2015 at 12:07 PM, Ulanov, Alexander
<al...@hp.com> wrote:
> Hi DB Tsai,
>
> Thank you for your suggestion. Actually, I've started my experiments with "treeReduce". Originally, I had "vv.treeReduce(_ + _, 2)" in my script exactly because MLlib optimizers are using it, as you pointed out with LBFGS. However, it leads to the same problems as "reduce", but presumably not so directly. As far as I understand, treeReduce limits the number of communications between workers and master forcing workers to partially compute the reduce operation.
>
> Are you sure that driver will first collect all results (or all partial results in treeReduce) and ONLY then perform aggregation? If that is the problem, then how to force it to do aggregation after receiving each portion of data from Workers?
>
> Best regards, Alexander
>
> -----Original Message-----
> From: DB Tsai [mailto:dbtsai@dbtsai.com]
> Sent: Friday, January 23, 2015 11:53 AM
> To: Ulanov, Alexander
> Cc: dev@spark.apache.org
> Subject: Re: Maximum size of vector that reduce can handle
>
> Hi Alexander,
>
> When you use `reduce` to aggregate the vectors, those will actually be pulled into driver, and merged over there. Obviously, it's not scaleable given you are doing deep neural networks which have so many coefficients.
>
> Please try treeReduce instead which is what we do in linear regression and logistic regression.
>
> See https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
> for example.
>
> val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => val l = localGradient.compute( features, label, bcW.value, grad) (grad, loss + l) }, combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => axpy(1.0, grad2, grad1) (grad1, loss1 + loss2)
> })
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
>
> On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander <al...@hp.com> wrote:
>> Dear Spark developers,
>>
>> I am trying to measure the Spark reduce performance for big vectors. My motivation is related to machine learning gradient. Gradient is a vector that is computed on each worker and then all results need to be summed up and broadcasted back to workers. For example, present machine learning applications involve very long parameter vectors, for deep neural networks it can be up to 2Billions. So, I want to measure the time that is needed for this operation depending on the size of vector and number of workers. I wrote few lines of code that assume that Spark will distribute partitions among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, 16GB RAM), each runs 2 Workers.
>>
>> import org.apache.spark.mllib.rdd.RDDFunctions._
>> import breeze.linalg._
>> import org.apache.log4j._
>> Logger.getRootLogger.setLevel(Level.OFF)
>> val n = 60000000
>> val p = 12
>> val vv = sc.parallelize(0 until p, p).map(i =>
>> DenseVector.rand[Double]( n )) vv.reduce(_ + _)
>>
>> When executing in shell with 60M vector it crashes after some period of time. One of the node contains the following in stdout:
>> Java HotSpot(TM) 64-Bit Server VM warning: INFO:
>> os::commit_memory(0x0000000755500000, 2863661056, 0) failed;
>> error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue.
>> # Native memory allocation (malloc) failed to allocate 2863661056 bytes for committing reserved memory.
>>
>> I run shell with --executor-memory 8G --driver-memory 8G, so handling 60M vector of Double should not be a problem. Are there any big overheads for this? What is the maximum size of vector that reduce can handle?
>>
>> Best regards, Alexander
>>
>> P.S.
>>
>> "spark.driver.maxResultSize 0" needs to set in order to run this code. I also needed to change "java.io.tmpdir" and "spark.local.dir" folders because my /tmp folder which is default, was too small and Spark swaps heavily into this folder. Without these settings I get either "no space left on device" or "out of memory" exceptions.
>>
>> I also submitted a bug
>> https://issues.apache.org/jira/browse/SPARK-5386
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org For
>> additional commands, e-mail: dev-help@spark.apache.org
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


RE: Maximum size of vector that reduce can handle

Posted by "Ulanov, Alexander" <al...@hp.com>.
Hi DB Tsai,

Thank you for your suggestion. Actually, I've started my experiments with "treeReduce". Originally, I had "vv.treeReduce(_ + _, 2)" in my script exactly because MLlib optimizers are using it, as you pointed out with LBFGS. However, it leads to the same problems as "reduce", but presumably not so directly. As far as I understand, treeReduce limits the number of communications between workers and master forcing workers to partially compute the reduce operation.

Are you sure that driver will first collect all results (or all partial results in treeReduce) and ONLY then perform aggregation? If that is the problem, then how to force it to do aggregation after receiving each portion of data from Workers?

Best regards, Alexander

-----Original Message-----
From: DB Tsai [mailto:dbtsai@dbtsai.com] 
Sent: Friday, January 23, 2015 11:53 AM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Maximum size of vector that reduce can handle

Hi Alexander,

When you use `reduce` to aggregate the vectors, those will actually be pulled into driver, and merged over there. Obviously, it's not scaleable given you are doing deep neural networks which have so many coefficients.

Please try treeReduce instead which is what we do in linear regression and logistic regression.

See https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
for example.

val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => val l = localGradient.compute( features, label, bcW.value, grad) (grad, loss + l) }, combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => axpy(1.0, grad2, grad1) (grad1, loss1 + loss2)
})

Sincerely,

DB Tsai
-------------------------------------------------------
Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai



On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander <al...@hp.com> wrote:
> Dear Spark developers,
>
> I am trying to measure the Spark reduce performance for big vectors. My motivation is related to machine learning gradient. Gradient is a vector that is computed on each worker and then all results need to be summed up and broadcasted back to workers. For example, present machine learning applications involve very long parameter vectors, for deep neural networks it can be up to 2Billions. So, I want to measure the time that is needed for this operation depending on the size of vector and number of workers. I wrote few lines of code that assume that Spark will distribute partitions among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, 16GB RAM), each runs 2 Workers.
>
> import org.apache.spark.mllib.rdd.RDDFunctions._
> import breeze.linalg._
> import org.apache.log4j._
> Logger.getRootLogger.setLevel(Level.OFF)
> val n = 60000000
> val p = 12
> val vv = sc.parallelize(0 until p, p).map(i => 
> DenseVector.rand[Double]( n )) vv.reduce(_ + _)
>
> When executing in shell with 60M vector it crashes after some period of time. One of the node contains the following in stdout:
> Java HotSpot(TM) 64-Bit Server VM warning: INFO: 
> os::commit_memory(0x0000000755500000, 2863661056, 0) failed; 
> error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue.
> # Native memory allocation (malloc) failed to allocate 2863661056 bytes for committing reserved memory.
>
> I run shell with --executor-memory 8G --driver-memory 8G, so handling 60M vector of Double should not be a problem. Are there any big overheads for this? What is the maximum size of vector that reduce can handle?
>
> Best regards, Alexander
>
> P.S.
>
> "spark.driver.maxResultSize 0" needs to set in order to run this code. I also needed to change "java.io.tmpdir" and "spark.local.dir" folders because my /tmp folder which is default, was too small and Spark swaps heavily into this folder. Without these settings I get either "no space left on device" or "out of memory" exceptions.
>
> I also submitted a bug 
> https://issues.apache.org/jira/browse/SPARK-5386
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org For 
> additional commands, e-mail: dev-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Maximum size of vector that reduce can handle

Posted by DB Tsai <db...@dbtsai.com>.
Hi Alexander,

When you use `reduce` to aggregate the vectors, those will actually be
pulled into driver, and merged over there. Obviously, it's not
scaleable given you are doing deep neural networks which have so many
coefficients.

Please try treeReduce instead which is what we do in linear regression
and logistic regression.

See https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
for example.

val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = localGradient.compute(
features, label, bcW.value, grad)
(grad, loss + l)
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
axpy(1.0, grad2, grad1)
(grad1, loss1 + loss2)
})

Sincerely,

DB Tsai
-------------------------------------------------------
Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai



On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander
<al...@hp.com> wrote:
> Dear Spark developers,
>
> I am trying to measure the Spark reduce performance for big vectors. My motivation is related to machine learning gradient. Gradient is a vector that is computed on each worker and then all results need to be summed up and broadcasted back to workers. For example, present machine learning applications involve very long parameter vectors, for deep neural networks it can be up to 2Billions. So, I want to measure the time that is needed for this operation depending on the size of vector and number of workers. I wrote few lines of code that assume that Spark will distribute partitions among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, 16GB RAM), each runs 2 Workers.
>
> import org.apache.spark.mllib.rdd.RDDFunctions._
> import breeze.linalg._
> import org.apache.log4j._
> Logger.getRootLogger.setLevel(Level.OFF)
> val n = 60000000
> val p = 12
> val vv = sc.parallelize(0 until p, p).map(i => DenseVector.rand[Double]( n ))
> vv.reduce(_ + _)
>
> When executing in shell with 60M vector it crashes after some period of time. One of the node contains the following in stdout:
> Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000755500000, 2863661056, 0) failed; error='Cannot allocate memory' (errno=12)
> #
> # There is insufficient memory for the Java Runtime Environment to continue.
> # Native memory allocation (malloc) failed to allocate 2863661056 bytes for committing reserved memory.
>
> I run shell with --executor-memory 8G --driver-memory 8G, so handling 60M vector of Double should not be a problem. Are there any big overheads for this? What is the maximum size of vector that reduce can handle?
>
> Best regards, Alexander
>
> P.S.
>
> "spark.driver.maxResultSize 0" needs to set in order to run this code. I also needed to change "java.io.tmpdir" and "spark.local.dir" folders because my /tmp folder which is default, was too small and Spark swaps heavily into this folder. Without these settings I get either "no space left on device" or "out of memory" exceptions.
>
> I also submitted a bug https://issues.apache.org/jira/browse/SPARK-5386
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org