You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@mahout.apache.org by jelmer <jk...@gmail.com> on 2016/06/23 13:47:52 UTC

Scaling up spark Iitem similarity on big data data sets

Hi,

I am trying to build a simple recommendation engine using spark item
similarity (eg with
org.apache.mahout.math.cf.SimilarityAnalysis.cooccurrencesIDSs)

Things work fine on comparatively small dataset but I am having difficulty
scaling it up

The input I am using is CSV data containing 19.988.422 view item events
produced by 1.384.107 users. Looking at 5.135.845 distinct products

The csv data is stored on hdfs and is split up over 15 files, consequently
the resultant RDD will have 15 partitions.

After tweaking some parameters I did manage to get the job to run without
going out of memory but the job takes a very very long time to run

After running for 15 hours it still is stuck on

org.apache.spark.rdd.RDD.flatMap(RDD.scala:332)
org.apache.mahout.sparkbindings.blas.AtA$.at_a_nongraph_mmul(AtA.scala:254)
org.apache.mahout.sparkbindings.blas.AtA$.at_a(AtA.scala:61)
org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:325)
org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:339)
org.apache.mahout.sparkbindings.SparkEngine$.toPhysical(SparkEngine.scala:123)
org.apache.mahout.math.drm.logical.CheckpointAction.checkpoint(CheckpointAction.scala:41)
org.apache.mahout.math.drm.package$.drm2Checkpointed(package.scala:95)
org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:145)
org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:143)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)scala.collection.AbstractIterator.to(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
scala.collection.AbstractIterator.toList(Iterator.scala:1157)


I am using spark on yarn and containers cannot use more than 16gb

I figured I would be able to speed things up by throwing a larger number of
executors at the problem. but so far that is not working out very well

I tried assigning 500 executors and repartitioning the input data to 500
partitions and even changing the spark.yarn.driver.memoryOverhead to crazy
values (half of the heap) did not resolve this.

Could someone offer any guidance on how to best speed up item similarity
jobs ?

Re: Scaling up spark Iitem similarity on big data data sets

Posted by Pat Ferrel <pa...@occamsmachete.com>.
I just ran into the opposite case Sebastian mentions, where a very large % of users have only one interaction. They come from Social media or Search and see only thing and leave. Processing this data turned into a huge job but led to virtually no change in the model since users with very few interactions also have minimal effect on the math. I removed any user with 1 interaction only and sped up the model calc by 10x. The moral of the story is that data prep can really help. 

I’ve a mind to put min AND max interactions into the algorithm and save people the trouble of doing it themselves.

Seems like setting the min = 2 should be the default, at least for the primary/conversion event. You could override to any number.


On Jun 23, 2016, at 7:01 AM, Sebastian <ss...@apache.org> wrote:

Hi,

Pairwise similarity is a quadratic problem and its very easy to run into a problem size does not scale anymore, especially with so many items. Our code downsamples the input data to help with this.

One thing you can do is decrease the argument maxNumInteractions to a lower number to increase the amount of downsampling. Another thing you can do is to remove the items with the highest amount of interactions from the dataset as they are not very interesting usually (everybody knows the topsellers already) and heavily impact the computation.

Best,
Sebastian


On 23.06.2016 15:47, jelmer wrote:
> Hi,
> 
> I am trying to build a simple recommendation engine using spark item
> similarity (eg with
> org.apache.mahout.math.cf.SimilarityAnalysis.cooccurrencesIDSs)
> 
> Things work fine on comparatively small dataset but I am having difficulty
> scaling it up
> 
> The input I am using is CSV data containing 19.988.422 view item events
> produced by 1.384.107 users. Looking at 5.135.845 distinct products
> 
> The csv data is stored on hdfs and is split up over 15 files, consequently
> the resultant RDD will have 15 partitions.
> 
> After tweaking some parameters I did manage to get the job to run without
> going out of memory but the job takes a very very long time to run
> 
> After running for 15 hours it still is stuck on
> 
> org.apache.spark.rdd.RDD.flatMap(RDD.scala:332)
> org.apache.mahout.sparkbindings.blas.AtA$.at_a_nongraph_mmul(AtA.scala:254)
> org.apache.mahout.sparkbindings.blas.AtA$.at_a(AtA.scala:61)
> org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:325)
> org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:339)
> org.apache.mahout.sparkbindings.SparkEngine$.toPhysical(SparkEngine.scala:123)
> org.apache.mahout.math.drm.logical.CheckpointAction.checkpoint(CheckpointAction.scala:41)
> org.apache.mahout.math.drm.package$.drm2Checkpointed(package.scala:95)
> org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:145)
> org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:143)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
> scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)scala.collection.AbstractIterator.to(Iterator.scala:1157)
> scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
> scala.collection.AbstractIterator.toList(Iterator.scala:1157)
> 
> 
> I am using spark on yarn and containers cannot use more than 16gb
> 
> I figured I would be able to speed things up by throwing a larger number of
> executors at the problem. but so far that is not working out very well
> 
> I tried assigning 500 executors and repartitioning the input data to 500
> partitions and even changing the spark.yarn.driver.memoryOverhead to crazy
> values (half of the heap) did not resolve this.
> 
> Could someone offer any guidance on how to best speed up item similarity
> jobs ?
> 


Re: Scaling up spark Iitem similarity on big data data sets

Posted by Ted Dunning <te...@gmail.com>.
This actually sounds like a very small problem.

My guess is that there are bad settings for the interaction and frequency
cuts.



On Thu, Jun 23, 2016 at 11:07 AM, Pat Ferrel <pa...@occamsmachete.com> wrote:

> In addition to increasing downsampling there are some other things to
> note. The original OOM was caused by the use of BiMaps to store your row
> and column ids. These will increase with the size of the total storage
> needed for 2 hashmaps per id type. With only 16g you may have very little
> else for the running algo. These data structures are needed for each
> executor and the driver so won’t benefit from more executors. The only way
> to decrease this is to decrease the size of you id strings somehow. I would
> be surprised if you can run the algo with that much data in 16g.
>
> I’m working on an implementation that uses RDDs and joins to be more
> memory efficient when translating ids but it will run a fair bit slower. in
> this case increasing the partition number and number of executors would
> help speed it up.
>
> On Jun 23, 2016, at 7:01 AM, Sebastian <ss...@apache.org> wrote:
>
> Hi,
>
> Pairwise similarity is a quadratic problem and its very easy to run into a
> problem size does not scale anymore, especially with so many items. Our
> code downsamples the input data to help with this.
>
> One thing you can do is decrease the argument maxNumInteractions to a
> lower number to increase the amount of downsampling. Another thing you can
> do is to remove the items with the highest amount of interactions from the
> dataset as they are not very interesting usually (everybody knows the
> topsellers already) and heavily impact the computation.
>
> Best,
> Sebastian
>
>
> On 23.06.2016 15:47, jelmer wrote:
> > Hi,
> >
> > I am trying to build a simple recommendation engine using spark item
> > similarity (eg with
> > org.apache.mahout.math.cf.SimilarityAnalysis.cooccurrencesIDSs)
> >
> > Things work fine on comparatively small dataset but I am having
> difficulty
> > scaling it up
> >
> > The input I am using is CSV data containing 19.988.422 view item events
> > produced by 1.384.107 users. Looking at 5.135.845 distinct products
> >
> > The csv data is stored on hdfs and is split up over 15 files,
> consequently
> > the resultant RDD will have 15 partitions.
> >
> > After tweaking some parameters I did manage to get the job to run without
> > going out of memory but the job takes a very very long time to run
> >
> > After running for 15 hours it still is stuck on
> >
> > org.apache.spark.rdd.RDD.flatMap(RDD.scala:332)
> >
> org.apache.mahout.sparkbindings.blas.AtA$.at_a_nongraph_mmul(AtA.scala:254)
> > org.apache.mahout.sparkbindings.blas.AtA$.at_a(AtA.scala:61)
> >
> org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:325)
> >
> org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:339)
> >
> org.apache.mahout.sparkbindings.SparkEngine$.toPhysical(SparkEngine.scala:123)
> >
> org.apache.mahout.math.drm.logical.CheckpointAction.checkpoint(CheckpointAction.scala:41)
> > org.apache.mahout.math.drm.package$.drm2Checkpointed(package.scala:95)
> >
> org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:145)
> >
> org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:143)
> > scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> > scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> > scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
> > scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
> > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> scala.collection.AbstractIterator.to(Iterator.scala:1157)
> > scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
> > scala.collection.AbstractIterator.toList(Iterator.scala:1157)
> >
> >
> > I am using spark on yarn and containers cannot use more than 16gb
> >
> > I figured I would be able to speed things up by throwing a larger number
> of
> > executors at the problem. but so far that is not working out very well
> >
> > I tried assigning 500 executors and repartitioning the input data to 500
> > partitions and even changing the spark.yarn.driver.memoryOverhead to
> crazy
> > values (half of the heap) did not resolve this.
> >
> > Could someone offer any guidance on how to best speed up item similarity
> > jobs ?
> >
>
>

Re: Scaling up spark Iitem similarity on big data data sets

Posted by Pat Ferrel <pa...@occamsmachete.com>.
In addition to increasing downsampling there are some other things to note. The original OOM was caused by the use of BiMaps to store your row and column ids. These will increase with the size of the total storage needed for 2 hashmaps per id type. With only 16g you may have very little else for the running algo. These data structures are needed for each executor and the driver so won’t benefit from more executors. The only way to decrease this is to decrease the size of you id strings somehow. I would be surprised if you can run the algo with that much data in 16g.

I’m working on an implementation that uses RDDs and joins to be more memory efficient when translating ids but it will run a fair bit slower. in this case increasing the partition number and number of executors would help speed it up.

On Jun 23, 2016, at 7:01 AM, Sebastian <ss...@apache.org> wrote:

Hi,

Pairwise similarity is a quadratic problem and its very easy to run into a problem size does not scale anymore, especially with so many items. Our code downsamples the input data to help with this.

One thing you can do is decrease the argument maxNumInteractions to a lower number to increase the amount of downsampling. Another thing you can do is to remove the items with the highest amount of interactions from the dataset as they are not very interesting usually (everybody knows the topsellers already) and heavily impact the computation.

Best,
Sebastian


On 23.06.2016 15:47, jelmer wrote:
> Hi,
> 
> I am trying to build a simple recommendation engine using spark item
> similarity (eg with
> org.apache.mahout.math.cf.SimilarityAnalysis.cooccurrencesIDSs)
> 
> Things work fine on comparatively small dataset but I am having difficulty
> scaling it up
> 
> The input I am using is CSV data containing 19.988.422 view item events
> produced by 1.384.107 users. Looking at 5.135.845 distinct products
> 
> The csv data is stored on hdfs and is split up over 15 files, consequently
> the resultant RDD will have 15 partitions.
> 
> After tweaking some parameters I did manage to get the job to run without
> going out of memory but the job takes a very very long time to run
> 
> After running for 15 hours it still is stuck on
> 
> org.apache.spark.rdd.RDD.flatMap(RDD.scala:332)
> org.apache.mahout.sparkbindings.blas.AtA$.at_a_nongraph_mmul(AtA.scala:254)
> org.apache.mahout.sparkbindings.blas.AtA$.at_a(AtA.scala:61)
> org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:325)
> org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:339)
> org.apache.mahout.sparkbindings.SparkEngine$.toPhysical(SparkEngine.scala:123)
> org.apache.mahout.math.drm.logical.CheckpointAction.checkpoint(CheckpointAction.scala:41)
> org.apache.mahout.math.drm.package$.drm2Checkpointed(package.scala:95)
> org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:145)
> org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:143)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
> scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)scala.collection.AbstractIterator.to(Iterator.scala:1157)
> scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
> scala.collection.AbstractIterator.toList(Iterator.scala:1157)
> 
> 
> I am using spark on yarn and containers cannot use more than 16gb
> 
> I figured I would be able to speed things up by throwing a larger number of
> executors at the problem. but so far that is not working out very well
> 
> I tried assigning 500 executors and repartitioning the input data to 500
> partitions and even changing the spark.yarn.driver.memoryOverhead to crazy
> values (half of the heap) did not resolve this.
> 
> Could someone offer any guidance on how to best speed up item similarity
> jobs ?
> 


Re: Scaling up spark Iitem similarity on big data data sets

Posted by Sebastian <ss...@apache.org>.
Hi,

Pairwise similarity is a quadratic problem and its very easy to run into 
a problem size does not scale anymore, especially with so many items. 
Our code downsamples the input data to help with this.

One thing you can do is decrease the argument maxNumInteractions to a 
lower number to increase the amount of downsampling. Another thing you 
can do is to remove the items with the highest amount of interactions 
from the dataset as they are not very interesting usually (everybody 
knows the topsellers already) and heavily impact the computation.

Best,
Sebastian


On 23.06.2016 15:47, jelmer wrote:
> Hi,
>
> I am trying to build a simple recommendation engine using spark item
> similarity (eg with
> org.apache.mahout.math.cf.SimilarityAnalysis.cooccurrencesIDSs)
>
> Things work fine on comparatively small dataset but I am having difficulty
> scaling it up
>
> The input I am using is CSV data containing 19.988.422 view item events
> produced by 1.384.107 users. Looking at 5.135.845 distinct products
>
> The csv data is stored on hdfs and is split up over 15 files, consequently
> the resultant RDD will have 15 partitions.
>
> After tweaking some parameters I did manage to get the job to run without
> going out of memory but the job takes a very very long time to run
>
> After running for 15 hours it still is stuck on
>
> org.apache.spark.rdd.RDD.flatMap(RDD.scala:332)
> org.apache.mahout.sparkbindings.blas.AtA$.at_a_nongraph_mmul(AtA.scala:254)
> org.apache.mahout.sparkbindings.blas.AtA$.at_a(AtA.scala:61)
> org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:325)
> org.apache.mahout.sparkbindings.SparkEngine$.tr2phys(SparkEngine.scala:339)
> org.apache.mahout.sparkbindings.SparkEngine$.toPhysical(SparkEngine.scala:123)
> org.apache.mahout.math.drm.logical.CheckpointAction.checkpoint(CheckpointAction.scala:41)
> org.apache.mahout.math.drm.package$.drm2Checkpointed(package.scala:95)
> org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:145)
> org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$3.apply(SimilarityAnalysis.scala:143)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
> scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)scala.collection.AbstractIterator.to(Iterator.scala:1157)
> scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
> scala.collection.AbstractIterator.toList(Iterator.scala:1157)
>
>
> I am using spark on yarn and containers cannot use more than 16gb
>
> I figured I would be able to speed things up by throwing a larger number of
> executors at the problem. but so far that is not working out very well
>
> I tried assigning 500 executors and repartitioning the input data to 500
> partitions and even changing the spark.yarn.driver.memoryOverhead to crazy
> values (half of the heap) did not resolve this.
>
> Could someone offer any guidance on how to best speed up item similarity
> jobs ?
>