You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gustavo Enrique Salazar Torres <gs...@ime.usp.br> on 2015/03/03 05:39:30 UTC

LBGFS optimizer performace

Hi there:

I'm using LBFGS optimizer to train a logistic regression model. The code I
implemented follows the pattern showed in
https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training
data is obtained from a Spark SQL RDD.
The problem I'm having is that LBFGS tries to count the elements in my RDD
and that results in a OOM exception since my dataset is huge.
I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop
YARN. My dataset is about 150 GB but I sample (I take only 1% of the data)
it in order to scale logistic regression.
The exception I'm getting is this:

15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in stage
2.0 (TID 7600, ip-10-155-20-71.ec2.internal): java.lang.OutOfMemoryError:
Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:2694)
        at java.lang.String.<init>(String.java:203)
        at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
        at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
        at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
        at
com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
        at
com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
        at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
        at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.sql.execution.joins.HashOuterJoin.org
$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
        at
org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
        at
org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
        at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

I'm using this parameters at runtime:
--num-executors 128 --executor-memory 1G --driver-memory 4G
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.storage.memoryFraction=0.2

I also persist my dataset using MEMORY_AND_DISK_SER but get the same error.
I will appreciate any help on this problem. I have been trying to solve it
for days and I'm running out of time and hair.

Thanks
Gustavo

Re: LBGFS optimizer performace

Posted by Gustavo Enrique Salazar Torres <gs...@ime.usp.br>.
Yeah, without caching makes it gets really slow. I will try to minimize the
number of columns on my tables, that may save lots of memory and will
eventually work.
I will let you know.

Thanks!
Gustavo

On Tue, Mar 3, 2015 at 8:58 PM, Joseph Bradley <jo...@databricks.com>
wrote:

> I would recommend caching; if you can't persist, iterative algorithms will
> not work well.
>
> I don't think calling count on the dataset is problematic; every iteration
> in LBFGS iterates over the whole dataset and does a lot more computation
> than count().
>
> It would be helpful to see some error occurring within LBFGS.  With the
> given stack trace, I'm not sure what part of LBFGS it's happening in.
>
> On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres <
> gsalazar@ime.usp.br> wrote:
>
>> Yeah, I can call count before that and it works. Also I was over caching
>> tables but I removed those. Now there is no caching but it gets really slow
>> since it calculates my table RDD many times.
>> Also hacked the LBFGS code to pass the number of examples which I
>> calculated outside in a Spark SQL query but just moved the location of the
>> problem.
>>
>> The query I'm running looks like this:
>>
>> s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB
>>  ON tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "
>>
>> mappedFields contains a list of fields which I'm interested in. The
>> result of that query goes through (including sampling) some transformations
>> before being input to LBFGS.
>>
>> My dataset has 180GB just for feature selection, I'm planning to use
>> 450GB to train the final model and I'm using 16 c3.2xlarge EC2 instances,
>> that means I have 240GB of RAM available.
>>
>> Any suggestion? I'm starting to check the algorithm because I don't
>> understand why it needs to count the dataset.
>>
>> Thanks
>>
>> Gustavo
>>
>> On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jo...@databricks.com>
>> wrote:
>>
>>> Is that error actually occurring in LBFGS?  It looks like it might be
>>> happening before the data even gets to LBFGS.  (Perhaps the outer join
>>> you're trying to do is making the dataset size explode a bit.)  Are you
>>> able to call count() (or any RDD action) on the data before you pass it to
>>> LBFGS?
>>>
>>> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres <
>>> gsalazar@ime.usp.br> wrote:
>>>
>>>> Just did with the same error.
>>>> I think the problem is the "data.count()" call in LBFGS because for
>>>> huge datasets that's naive to do.
>>>> I was thinking to write my version of LBFGS but instead of doing
>>>> data.count() I will pass that parameter which I will calculate from a Spark
>>>> SQL query.
>>>>
>>>> I will let you know.
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> Can you try increasing your driver memory, reducing the executors and
>>>>> increasing the executor memory?
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
>>>>> gsalazar@ime.usp.br> wrote:
>>>>>
>>>>>> Hi there:
>>>>>>
>>>>>> I'm using LBFGS optimizer to train a logistic regression model. The
>>>>>> code I implemented follows the pattern showed in
>>>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
>>>>>> training data is obtained from a Spark SQL RDD.
>>>>>> The problem I'm having is that LBFGS tries to count the elements in
>>>>>> my RDD and that results in a OOM exception since my dataset is huge.
>>>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on
>>>>>> Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of the
>>>>>> data) it in order to scale logistic regression.
>>>>>> The exception I'm getting is this:
>>>>>>
>>>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>>>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>         at java.lang.String.<init>(String.java:203)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>>         at
>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>>>         at
>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>>         at
>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>>>>>         at
>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>>         at
>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>         at
>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>         at
>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>         at
>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>         at org.apache.spark.sql.execution.joins.HashOuterJoin.org
>>>>>> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>>>>>         at
>>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>>>>>         at
>>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>>>>>         at
>>>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>>>>         at
>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>>
>>>>>> I'm using this parameters at runtime:
>>>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>>>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>>>>>> --conf spark.storage.memoryFraction=0.2
>>>>>>
>>>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>>>>>> error.
>>>>>> I will appreciate any help on this problem. I have been trying to
>>>>>> solve it for days and I'm running out of time and hair.
>>>>>>
>>>>>> Thanks
>>>>>> Gustavo
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: LBGFS optimizer performace

Posted by Joseph Bradley <jo...@databricks.com>.
I would recommend caching; if you can't persist, iterative algorithms will
not work well.

I don't think calling count on the dataset is problematic; every iteration
in LBFGS iterates over the whole dataset and does a lot more computation
than count().

It would be helpful to see some error occurring within LBFGS.  With the
given stack trace, I'm not sure what part of LBFGS it's happening in.

On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres <
gsalazar@ime.usp.br> wrote:

> Yeah, I can call count before that and it works. Also I was over caching
> tables but I removed those. Now there is no caching but it gets really slow
> since it calculates my table RDD many times.
> Also hacked the LBFGS code to pass the number of examples which I
> calculated outside in a Spark SQL query but just moved the location of the
> problem.
>
> The query I'm running looks like this:
>
> s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB  ON
> tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "
>
> mappedFields contains a list of fields which I'm interested in. The result
> of that query goes through (including sampling) some transformations before
> being input to LBFGS.
>
> My dataset has 180GB just for feature selection, I'm planning to use 450GB
> to train the final model and I'm using 16 c3.2xlarge EC2 instances, that
> means I have 240GB of RAM available.
>
> Any suggestion? I'm starting to check the algorithm because I don't
> understand why it needs to count the dataset.
>
> Thanks
>
> Gustavo
>
> On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jo...@databricks.com>
> wrote:
>
>> Is that error actually occurring in LBFGS?  It looks like it might be
>> happening before the data even gets to LBFGS.  (Perhaps the outer join
>> you're trying to do is making the dataset size explode a bit.)  Are you
>> able to call count() (or any RDD action) on the data before you pass it to
>> LBFGS?
>>
>> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres <
>> gsalazar@ime.usp.br> wrote:
>>
>>> Just did with the same error.
>>> I think the problem is the "data.count()" call in LBFGS because for huge
>>> datasets that's naive to do.
>>> I was thinking to write my version of LBFGS but instead of doing
>>> data.count() I will pass that parameter which I will calculate from a Spark
>>> SQL query.
>>>
>>> I will let you know.
>>>
>>> Thanks
>>>
>>>
>>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Can you try increasing your driver memory, reducing the executors and
>>>> increasing the executor memory?
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
>>>> gsalazar@ime.usp.br> wrote:
>>>>
>>>>> Hi there:
>>>>>
>>>>> I'm using LBFGS optimizer to train a logistic regression model. The
>>>>> code I implemented follows the pattern showed in
>>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
>>>>> training data is obtained from a Spark SQL RDD.
>>>>> The problem I'm having is that LBFGS tries to count the elements in my
>>>>> RDD and that results in a OOM exception since my dataset is huge.
>>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on
>>>>> Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of the
>>>>> data) it in order to scale logistic regression.
>>>>> The exception I'm getting is this:
>>>>>
>>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>         at java.lang.String.<init>(String.java:203)
>>>>>         at
>>>>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>>         at
>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>>>>         at
>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>         at
>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>         at
>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>         at
>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>         at
>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>         at org.apache.spark.sql.execution.joins.HashOuterJoin.org
>>>>> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>>>>         at
>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>>>>         at
>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>>>>         at
>>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>
>>>>> I'm using this parameters at runtime:
>>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>>>>> --conf spark.storage.memoryFraction=0.2
>>>>>
>>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>>>>> error.
>>>>> I will appreciate any help on this problem. I have been trying to
>>>>> solve it for days and I'm running out of time and hair.
>>>>>
>>>>> Thanks
>>>>> Gustavo
>>>>>
>>>>
>>>>
>>>
>>
>

Re: LBGFS optimizer performace

Posted by Gustavo Enrique Salazar Torres <gs...@ime.usp.br>.
Hi there:

Yeah, I came to that same conclusion after tuning spark sql shuffle
parameter. Also cut out some classes I was using to parse my dataset and
finally created schema only with the fields needed for my model (before
that I was creating it with 63 fields while I just needed 15).
So I came with this set of parameters:

--num-executors 200
--executor-memory 800M
--conf spark.executor.extraJavaOptions="-XX:+UseCompressedOops
-XX:+AggressiveOpts"
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.storage.memoryFraction=0.3
--conf spark.rdd.compress=true
--conf spark.sql.shuffle.partitions=4000
--driver-memory 4G

Now I processed 270 GB in 35 minutes and no OOM errors.
I have one question though: Does Spark SQL handle skewed tables? I was
wondering about that since my data has that feature and maybe there is more
room for performance improvement.

Thanks again.

Gustavo


On Thu, Mar 5, 2015 at 6:45 PM, DB Tsai <db...@dbtsai.com> wrote:

> PS, I will recommend you compress the data when you cache the RDD.
> There will be some overhead in compression/decompression, and
> serialization/deserialization, but it will help a lot for iterative
> algorithms with ability to caching more data.
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> Blog: https://www.dbtsai.com
>
>
> On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres
> <gs...@ime.usp.br> wrote:
> > Yeah, I can call count before that and it works. Also I was over caching
> > tables but I removed those. Now there is no caching but it gets really
> slow
> > since it calculates my table RDD many times.
> > Also hacked the LBFGS code to pass the number of examples which I
> calculated
> > outside in a Spark SQL query but just moved the location of the problem.
> >
> > The query I'm running looks like this:
> >
> > s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB
> ON
> > tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "
> >
> > mappedFields contains a list of fields which I'm interested in. The
> result
> > of that query goes through (including sampling) some transformations
> before
> > being input to LBFGS.
> >
> > My dataset has 180GB just for feature selection, I'm planning to use
> 450GB
> > to train the final model and I'm using 16 c3.2xlarge EC2 instances, that
> > means I have 240GB of RAM available.
> >
> > Any suggestion? I'm starting to check the algorithm because I don't
> > understand why it needs to count the dataset.
> >
> > Thanks
> >
> > Gustavo
> >
> > On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jo...@databricks.com>
> > wrote:
> >>
> >> Is that error actually occurring in LBFGS?  It looks like it might be
> >> happening before the data even gets to LBFGS.  (Perhaps the outer join
> >> you're trying to do is making the dataset size explode a bit.)  Are you
> able
> >> to call count() (or any RDD action) on the data before you pass it to
> LBFGS?
> >>
> >> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres
> >> <gs...@ime.usp.br> wrote:
> >>>
> >>> Just did with the same error.
> >>> I think the problem is the "data.count()" call in LBFGS because for
> huge
> >>> datasets that's naive to do.
> >>> I was thinking to write my version of LBFGS but instead of doing
> >>> data.count() I will pass that parameter which I will calculate from a
> Spark
> >>> SQL query.
> >>>
> >>> I will let you know.
> >>>
> >>> Thanks
> >>>
> >>>
> >>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
> >>> wrote:
> >>>>
> >>>> Can you try increasing your driver memory, reducing the executors and
> >>>> increasing the executor memory?
> >>>>
> >>>> Thanks
> >>>> Best Regards
> >>>>
> >>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres
> >>>> <gs...@ime.usp.br> wrote:
> >>>>>
> >>>>> Hi there:
> >>>>>
> >>>>> I'm using LBFGS optimizer to train a logistic regression model. The
> >>>>> code I implemented follows the pattern showed in
> >>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
> training
> >>>>> data is obtained from a Spark SQL RDD.
> >>>>> The problem I'm having is that LBFGS tries to count the elements in
> my
> >>>>> RDD and that results in a OOM exception since my dataset is huge.
> >>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on
> Hadoop
> >>>>> YARN. My dataset is about 150 GB but I sample (I take only 1% of the
> data)
> >>>>> it in order to scale logistic regression.
> >>>>> The exception I'm getting is this:
> >>>>>
> >>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
> >>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
> >>>>> java.lang.OutOfMemoryError: Java heap space
> >>>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
> >>>>>         at java.lang.String.<init>(String.java:203)
> >>>>>         at
> >>>>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
> >>>>>         at
> >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
> >>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
> >>>>>         at
> >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> >>>>>         at
> >>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
> >>>>>         at
> >>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
> >>>>>         at
> >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> >>>>>         at
> >>>>>
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
> >>>>>         at
> >>>>>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
> >>>>>         at
> >>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>>>>         at
> >>>>>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> >>>>>         at
> >>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>>>>         at
> >>>>>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> >>>>>         at
> >>>>>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> >>>>>         at
> >>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>>>         at
> >>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>>>         at
> >>>>> org.apache.spark.sql.execution.joins.HashOuterJoin.org
> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
> >>>>>         at
> >>>>>
> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
> >>>>>         at
> >>>>>
> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
> >>>>>         at
> >>>>>
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
> >>>>>         at
> >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> >>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> >>>>>         at
> >>>>>
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> >>>>>         at
> >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> >>>>>
> >>>>> I'm using this parameters at runtime:
> >>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
> >>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
> >>>>> --conf spark.storage.memoryFraction=0.2
> >>>>>
> >>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
> >>>>> error.
> >>>>> I will appreciate any help on this problem. I have been trying to
> solve
> >>>>> it for days and I'm running out of time and hair.
> >>>>>
> >>>>> Thanks
> >>>>> Gustavo
> >>>>
> >>>>
> >>>
> >>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: LBGFS optimizer performace

Posted by DB Tsai <db...@dbtsai.com>.
PS, I will recommend you compress the data when you cache the RDD.
There will be some overhead in compression/decompression, and
serialization/deserialization, but it will help a lot for iterative
algorithms with ability to caching more data.

Sincerely,

DB Tsai
-------------------------------------------------------
Blog: https://www.dbtsai.com


On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres
<gs...@ime.usp.br> wrote:
> Yeah, I can call count before that and it works. Also I was over caching
> tables but I removed those. Now there is no caching but it gets really slow
> since it calculates my table RDD many times.
> Also hacked the LBFGS code to pass the number of examples which I calculated
> outside in a Spark SQL query but just moved the location of the problem.
>
> The query I'm running looks like this:
>
> s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB  ON
> tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "
>
> mappedFields contains a list of fields which I'm interested in. The result
> of that query goes through (including sampling) some transformations before
> being input to LBFGS.
>
> My dataset has 180GB just for feature selection, I'm planning to use 450GB
> to train the final model and I'm using 16 c3.2xlarge EC2 instances, that
> means I have 240GB of RAM available.
>
> Any suggestion? I'm starting to check the algorithm because I don't
> understand why it needs to count the dataset.
>
> Thanks
>
> Gustavo
>
> On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jo...@databricks.com>
> wrote:
>>
>> Is that error actually occurring in LBFGS?  It looks like it might be
>> happening before the data even gets to LBFGS.  (Perhaps the outer join
>> you're trying to do is making the dataset size explode a bit.)  Are you able
>> to call count() (or any RDD action) on the data before you pass it to LBFGS?
>>
>> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres
>> <gs...@ime.usp.br> wrote:
>>>
>>> Just did with the same error.
>>> I think the problem is the "data.count()" call in LBFGS because for huge
>>> datasets that's naive to do.
>>> I was thinking to write my version of LBFGS but instead of doing
>>> data.count() I will pass that parameter which I will calculate from a Spark
>>> SQL query.
>>>
>>> I will let you know.
>>>
>>> Thanks
>>>
>>>
>>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>>
>>>> Can you try increasing your driver memory, reducing the executors and
>>>> increasing the executor memory?
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres
>>>> <gs...@ime.usp.br> wrote:
>>>>>
>>>>> Hi there:
>>>>>
>>>>> I'm using LBFGS optimizer to train a logistic regression model. The
>>>>> code I implemented follows the pattern showed in
>>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training
>>>>> data is obtained from a Spark SQL RDD.
>>>>> The problem I'm having is that LBFGS tries to count the elements in my
>>>>> RDD and that results in a OOM exception since my dataset is huge.
>>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop
>>>>> YARN. My dataset is about 150 GB but I sample (I take only 1% of the data)
>>>>> it in order to scale logistic regression.
>>>>> The exception I'm getting is this:
>>>>>
>>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>         at java.lang.String.<init>(String.java:203)
>>>>>         at
>>>>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>>         at
>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>         at
>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>>>>         at
>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>         at
>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>         at
>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>         at
>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>         at
>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>         at
>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>         at
>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>>>>         at
>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>>>>         at
>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>>>>         at
>>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>
>>>>> I'm using this parameters at runtime:
>>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>>>>> --conf spark.storage.memoryFraction=0.2
>>>>>
>>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>>>>> error.
>>>>> I will appreciate any help on this problem. I have been trying to solve
>>>>> it for days and I'm running out of time and hair.
>>>>>
>>>>> Thanks
>>>>> Gustavo
>>>>
>>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: LBGFS optimizer performace

Posted by Gustavo Enrique Salazar Torres <gs...@ime.usp.br>.
Yeah, I can call count before that and it works. Also I was over caching
tables but I removed those. Now there is no caching but it gets really slow
since it calculates my table RDD many times.
Also hacked the LBFGS code to pass the number of examples which I
calculated outside in a Spark SQL query but just moved the location of the
problem.

The query I'm running looks like this:

s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB  ON
tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "

mappedFields contains a list of fields which I'm interested in. The result
of that query goes through (including sampling) some transformations before
being input to LBFGS.

My dataset has 180GB just for feature selection, I'm planning to use 450GB
to train the final model and I'm using 16 c3.2xlarge EC2 instances, that
means I have 240GB of RAM available.

Any suggestion? I'm starting to check the algorithm because I don't
understand why it needs to count the dataset.

Thanks

Gustavo

On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jo...@databricks.com>
wrote:

> Is that error actually occurring in LBFGS?  It looks like it might be
> happening before the data even gets to LBFGS.  (Perhaps the outer join
> you're trying to do is making the dataset size explode a bit.)  Are you
> able to call count() (or any RDD action) on the data before you pass it to
> LBFGS?
>
> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres <
> gsalazar@ime.usp.br> wrote:
>
>> Just did with the same error.
>> I think the problem is the "data.count()" call in LBFGS because for huge
>> datasets that's naive to do.
>> I was thinking to write my version of LBFGS but instead of doing
>> data.count() I will pass that parameter which I will calculate from a Spark
>> SQL query.
>>
>> I will let you know.
>>
>> Thanks
>>
>>
>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Can you try increasing your driver memory, reducing the executors and
>>> increasing the executor memory?
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
>>> gsalazar@ime.usp.br> wrote:
>>>
>>>> Hi there:
>>>>
>>>> I'm using LBFGS optimizer to train a logistic regression model. The
>>>> code I implemented follows the pattern showed in
>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
>>>> training data is obtained from a Spark SQL RDD.
>>>> The problem I'm having is that LBFGS tries to count the elements in my
>>>> RDD and that results in a OOM exception since my dataset is huge.
>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop
>>>> YARN. My dataset is about 150 GB but I sample (I take only 1% of the data)
>>>> it in order to scale logistic regression.
>>>> The exception I'm getting is this:
>>>>
>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>         at java.lang.String.<init>(String.java:203)
>>>>         at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>         at
>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>         at
>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>         at
>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>>>         at
>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>         at
>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>         at
>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>         at
>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>         at
>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>         at
>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>         at
>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>         at
>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>         at org.apache.spark.sql.execution.joins.HashOuterJoin.org
>>>> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>>>         at
>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>>>         at
>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>>>         at
>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>
>>>> I'm using this parameters at runtime:
>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>>>> --conf spark.storage.memoryFraction=0.2
>>>>
>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>>>> error.
>>>> I will appreciate any help on this problem. I have been trying to solve
>>>> it for days and I'm running out of time and hair.
>>>>
>>>> Thanks
>>>> Gustavo
>>>>
>>>
>>>
>>
>

Re: LBGFS optimizer performace

Posted by Joseph Bradley <jo...@databricks.com>.
Is that error actually occurring in LBFGS?  It looks like it might be
happening before the data even gets to LBFGS.  (Perhaps the outer join
you're trying to do is making the dataset size explode a bit.)  Are you
able to call count() (or any RDD action) on the data before you pass it to
LBFGS?

On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres <
gsalazar@ime.usp.br> wrote:

> Just did with the same error.
> I think the problem is the "data.count()" call in LBFGS because for huge
> datasets that's naive to do.
> I was thinking to write my version of LBFGS but instead of doing
> data.count() I will pass that parameter which I will calculate from a Spark
> SQL query.
>
> I will let you know.
>
> Thanks
>
>
> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Can you try increasing your driver memory, reducing the executors and
>> increasing the executor memory?
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
>> gsalazar@ime.usp.br> wrote:
>>
>>> Hi there:
>>>
>>> I'm using LBFGS optimizer to train a logistic regression model. The code
>>> I implemented follows the pattern showed in
>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
>>> training data is obtained from a Spark SQL RDD.
>>> The problem I'm having is that LBFGS tries to count the elements in my
>>> RDD and that results in a OOM exception since my dataset is huge.
>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop
>>> YARN. My dataset is about 150 GB but I sample (I take only 1% of the data)
>>> it in order to scale logistic regression.
>>> The exception I'm getting is this:
>>>
>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>>> java.lang.OutOfMemoryError: Java heap space
>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>         at java.lang.String.<init>(String.java:203)
>>>         at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>>         at
>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>>>         at
>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>         at
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>         at
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>         at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>         at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>         at
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>         at
>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>         at
>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>>         at
>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>         at
>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>         at
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>         at
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>         at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>         at org.apache.spark.sql.execution.joins.HashOuterJoin.org
>>> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>>         at
>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>>         at
>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>>         at
>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>
>>> I'm using this parameters at runtime:
>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>>> --conf spark.storage.memoryFraction=0.2
>>>
>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>>> error.
>>> I will appreciate any help on this problem. I have been trying to solve
>>> it for days and I'm running out of time and hair.
>>>
>>> Thanks
>>> Gustavo
>>>
>>
>>
>

Re: LBGFS optimizer performace

Posted by Gustavo Enrique Salazar Torres <gs...@ime.usp.br>.
Just did with the same error.
I think the problem is the "data.count()" call in LBFGS because for huge
datasets that's naive to do.
I was thinking to write my version of LBFGS but instead of doing
data.count() I will pass that parameter which I will calculate from a Spark
SQL query.

I will let you know.

Thanks


On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Can you try increasing your driver memory, reducing the executors and
> increasing the executor memory?
>
> Thanks
> Best Regards
>
> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
> gsalazar@ime.usp.br> wrote:
>
>> Hi there:
>>
>> I'm using LBFGS optimizer to train a logistic regression model. The code
>> I implemented follows the pattern showed in
>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
>> training data is obtained from a Spark SQL RDD.
>> The problem I'm having is that LBFGS tries to count the elements in my
>> RDD and that results in a OOM exception since my dataset is huge.
>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop
>> YARN. My dataset is about 150 GB but I sample (I take only 1% of the data)
>> it in order to scale logistic regression.
>> The exception I'm getting is this:
>>
>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in stage
>> 2.0 (TID 7600, ip-10-155-20-71.ec2.internal): java.lang.OutOfMemoryError:
>> Java heap space
>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>         at java.lang.String.<init>(String.java:203)
>>         at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>         at
>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>>         at
>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>>         at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>         at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>         at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>         at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>         at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>         at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>         at
>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>         at
>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>         at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>         at
>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>         at
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>         at
>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>         at
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>         at
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>         at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at org.apache.spark.sql.execution.joins.HashOuterJoin.org
>> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>         at
>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>         at
>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>         at
>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>
>> I'm using this parameters at runtime:
>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>> --conf spark.storage.memoryFraction=0.2
>>
>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>> error.
>> I will appreciate any help on this problem. I have been trying to solve
>> it for days and I'm running out of time and hair.
>>
>> Thanks
>> Gustavo
>>
>
>

Re: LBGFS optimizer performace

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Can you try increasing your driver memory, reducing the executors and
increasing the executor memory?

Thanks
Best Regards

On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
gsalazar@ime.usp.br> wrote:

> Hi there:
>
> I'm using LBFGS optimizer to train a logistic regression model. The code I
> implemented follows the pattern showed in
> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
> training data is obtained from a Spark SQL RDD.
> The problem I'm having is that LBFGS tries to count the elements in my RDD
> and that results in a OOM exception since my dataset is huge.
> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop
> YARN. My dataset is about 150 GB but I sample (I take only 1% of the data)
> it in order to scale logistic regression.
> The exception I'm getting is this:
>
> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in stage
> 2.0 (TID 7600, ip-10-155-20-71.ec2.internal): java.lang.OutOfMemoryError:
> Java heap space
>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>         at java.lang.String.<init>(String.java:203)
>         at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>         at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>         at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>         at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>         at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>         at
> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>         at
> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>         at
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>         at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>         at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>         at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>         at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>         at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at org.apache.spark.sql.execution.joins.HashOuterJoin.org
> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>         at
> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>         at
> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>         at
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
> I'm using this parameters at runtime:
> --num-executors 128 --executor-memory 1G --driver-memory 4G
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
> --conf spark.storage.memoryFraction=0.2
>
> I also persist my dataset using MEMORY_AND_DISK_SER but get the same error.
> I will appreciate any help on this problem. I have been trying to solve it
> for days and I'm running out of time and hair.
>
> Thanks
> Gustavo
>