You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mahout.apache.org by Andres Perez <an...@tresata.com> on 2015/04/06 20:04:29 UTC

spark-itemsimilarity OOM errors

Hi. I was trying to run the spark-itemsimilarity job on a dataset of mine,
and am running into java heap space OOM errors (stack trace below). I
noticed that the job fails on a reduce step that refers to
SparkEngine.scala:79, which reduces on a += operation. Presumably this is
building a DenseVector to contain all the similarity scores against all
items for each item. Is the list meant to be stored in memory? If so, it
would seem that the job will fail if we have a number of items in the tens
of thousands (my guess). Is this a correct assumption? I'm including my
stack trace:

  java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
    at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
    at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
    at java.io.DataOutputStream.writeDouble(DataOutputStream.java:259)
    at
org.apache.mahout.math.VectorWritable.writeVectorContents(VectorWritable.java:213)
    at
org.apache.mahout.math.MatrixWritable.writeMatrix(MatrixWritable.java:194)
    at org.apache.mahout.math.MatrixWritable.write(MatrixWritable.java:56)
    at
org.apache.mahout.sparkbindings.io.WritableKryoSerializer.write(WritableKryoSerializer.scala:29)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
    at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
    at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
    at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:751)
    at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:750)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:750)
    at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:746)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:746)
    at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

Thanks,

Andy

Fwd: spark-itemsimilarity OOM errors

Posted by Pat Ferrel <pa...@occamsmachete.com>.
Try allocating more memory on executors with -sem 8g.  More or less as needed. The only large non-rdd objects (rdds can be partially disk based) are guava HashBiMaps, these are broadcast to all workers so one copy is kept on every node. They grow with the memory needed to store all your user and item ids as string values. This means the number of items or users is not unlimited but is seldom reached on the type of cluster machines you get from AWS or other vendor. Ask yourself how much memory to store all user and item ids as strings.

The error below may not be where all the memory is used.


On Apr 6, 2015, at 11:04 AM, Andres Perez <an...@tresata.com> wrote:

Hi. I was trying to run the spark-itemsimilarity job on a dataset of mine,
and am running into java heap space OOM errors (stack trace below). I
noticed that the job fails on a reduce step that refers to
SparkEngine.scala:79, which reduces on a += operation. Presumably this is
building a DenseVector to contain all the similarity scores against all
items for each item. Is the list meant to be stored in memory? If so, it
would seem that the job will fail if we have a number of items in the tens
of thousands (my guess). Is this a correct assumption? I'm including my
stack trace:

java.lang.OutOfMemoryError: Java heap space
  at java.util.Arrays.copyOf(Arrays.java:2271)
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
  at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
  at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
  at java.io.DataOutputStream.writeDouble(DataOutputStream.java:259)
  at
org.apache.mahout.math.VectorWritable.writeVectorContents(VectorWritable.java:213)
  at
org.apache.mahout.math.MatrixWritable.writeMatrix(MatrixWritable.java:194)
  at org.apache.mahout.math.MatrixWritable.write(MatrixWritable.java:56)
  at
org.apache.mahout.sparkbindings.io.WritableKryoSerializer.write(WritableKryoSerializer.scala:29)
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
  at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
  at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
  at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
  at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
  at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:751)
  at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:750)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:750)
  at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:746)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:746)
  at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
  at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:56)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

Thanks,

Andy



Re: spark-itemsimilarity OOM errors

Posted by Pat Ferrel <pa...@occamsmachete.com>.
Yes. 

On each node, so if you have 16 cores per node, only one copy of them. The largest catalog and user list I’ve seen will easily fit in 1G but the job will require more to run well. I think 4g per executor is default but it will benefit and may require more. It’s I/O bound not compute bound so the more kept in memory the better performance. This is a major difference with Hadoop mapreduce, which keeps intermediate results completely on disk.


On Apr 7, 2015, at 8:05 AM, Andres Perez <an...@tresata.com> wrote:

Thanks for the response. So if I understand correctly, the design is such
that all the user and item IDs must fit in memory?

On Mon, Apr 6, 2015 at 3:15 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:

> Try allocating more memory on executors with -sem 8g.  More or less as
> needed. The only large non-rdd objects (rdds can be partially disk based)
> are guava HashBiMaps, these are broadcast to all workers so one copy is
> kept on every node. They grow with the memory needed to store all your user
> and item ids as string values. This means the number of items or users is
> not unlimited but is seldom reached on the type of cluster machines you get
> from AWS or other vendor. Ask yourself how much memory to store all user
> and item ids as strings.
> 
> The error below may not be where all the memory is used.
> 
> 
> On Apr 6, 2015, at 11:04 AM, Andres Perez <an...@tresata.com> wrote:
> 
> Hi. I was trying to run the spark-itemsimilarity job on a dataset of mine,
> and am running into java heap space OOM errors (stack trace below). I
> noticed that the job fails on a reduce step that refers to
> SparkEngine.scala:79, which reduces on a += operation. Presumably this is
> building a DenseVector to contain all the similarity scores against all
> items for each item. Is the list meant to be stored in memory? If so, it
> would seem that the job will fail if we have a number of items in the tens
> of thousands (my guess). Is this a correct assumption? I'm including my
> stack trace:
> 
> java.lang.OutOfMemoryError: Java heap space
>   at java.util.Arrays.copyOf(Arrays.java:2271)
>   at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>   at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>   at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
>   at java.io.DataOutputStream.writeDouble(DataOutputStream.java:259)
>   at
> 
> org.apache.mahout.math.VectorWritable.writeVectorContents(VectorWritable.java:213)
>   at
> org.apache.mahout.math.MatrixWritable.writeMatrix(MatrixWritable.java:194)
>   at org.apache.mahout.math.MatrixWritable.write(MatrixWritable.java:56)
>   at
> 
> org.apache.mahout.sparkbindings.io.WritableKryoSerializer.write(WritableKryoSerializer.scala:29)
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
>   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>   at
> 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
>   at
> 
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
>   at
> 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:751)
>   at
> 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:750)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at
> 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:750)
>   at
> 
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:746)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at
> 
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:746)
>   at
> 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
>   at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>   at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>   at
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 
> Thanks,
> 
> Andy
> 
> 


Re: spark-itemsimilarity OOM errors

Posted by Andres Perez <an...@tresata.com>.
Thanks for the response. So if I understand correctly, the design is such
that all the user and item IDs must fit in memory?

On Mon, Apr 6, 2015 at 3:15 PM, Pat Ferrel <pa...@occamsmachete.com> wrote:

> Try allocating more memory on executors with -sem 8g.  More or less as
> needed. The only large non-rdd objects (rdds can be partially disk based)
> are guava HashBiMaps, these are broadcast to all workers so one copy is
> kept on every node. They grow with the memory needed to store all your user
> and item ids as string values. This means the number of items or users is
> not unlimited but is seldom reached on the type of cluster machines you get
> from AWS or other vendor. Ask yourself how much memory to store all user
> and item ids as strings.
>
> The error below may not be where all the memory is used.
>
>
> On Apr 6, 2015, at 11:04 AM, Andres Perez <an...@tresata.com> wrote:
>
> Hi. I was trying to run the spark-itemsimilarity job on a dataset of mine,
> and am running into java heap space OOM errors (stack trace below). I
> noticed that the job fails on a reduce step that refers to
> SparkEngine.scala:79, which reduces on a += operation. Presumably this is
> building a DenseVector to contain all the similarity scores against all
> items for each item. Is the list meant to be stored in memory? If so, it
> would seem that the job will fail if we have a number of items in the tens
> of thousands (my guess). Is this a correct assumption? I'm including my
> stack trace:
>
>  java.lang.OutOfMemoryError: Java heap space
>    at java.util.Arrays.copyOf(Arrays.java:2271)
>    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>    at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>    at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
>    at java.io.DataOutputStream.writeDouble(DataOutputStream.java:259)
>    at
>
> org.apache.mahout.math.VectorWritable.writeVectorContents(VectorWritable.java:213)
>    at
> org.apache.mahout.math.MatrixWritable.writeMatrix(MatrixWritable.java:194)
>    at org.apache.mahout.math.MatrixWritable.write(MatrixWritable.java:56)
>    at
>
> org.apache.mahout.sparkbindings.io.WritableKryoSerializer.write(WritableKryoSerializer.scala:29)
>    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>    at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
>    at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
>    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>    at
>
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
>    at
>
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
>    at
>
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:751)
>    at
>
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:750)
>    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>    at
>
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:750)
>    at
>
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:746)
>    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>    at
>
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:746)
>    at
>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
>    at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>    at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>    at org.apache.spark.scheduler.Task.run(Task.scala:56)
>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>    at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>    at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> Thanks,
>
> Andy
>
>

Re: spark-itemsimilarity OOM errors

Posted by Pat Ferrel <pa...@occamsmachete.com>.
Try allocating more memory on executors with -sem 8g.  More or less as needed. The only large non-rdd objects (rdds can be partially disk based) are guava HashBiMaps, these are broadcast to all workers so one copy is kept on every node. They grow with the memory needed to store all your user and item ids as string values. This means the number of items or users is not unlimited but is seldom reached on the type of cluster machines you get from AWS or other vendor. Ask yourself how much memory to store all user and item ids as strings.

The error below may not be where all the memory is used.


On Apr 6, 2015, at 11:04 AM, Andres Perez <an...@tresata.com> wrote:

Hi. I was trying to run the spark-itemsimilarity job on a dataset of mine,
and am running into java heap space OOM errors (stack trace below). I
noticed that the job fails on a reduce step that refers to
SparkEngine.scala:79, which reduces on a += operation. Presumably this is
building a DenseVector to contain all the similarity scores against all
items for each item. Is the list meant to be stored in memory? If so, it
would seem that the job will fail if we have a number of items in the tens
of thousands (my guess). Is this a correct assumption? I'm including my
stack trace:

 java.lang.OutOfMemoryError: Java heap space
   at java.util.Arrays.copyOf(Arrays.java:2271)
   at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
   at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
   at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
   at java.io.DataOutputStream.writeDouble(DataOutputStream.java:259)
   at
org.apache.mahout.math.VectorWritable.writeVectorContents(VectorWritable.java:213)
   at
org.apache.mahout.math.MatrixWritable.writeMatrix(MatrixWritable.java:194)
   at org.apache.mahout.math.MatrixWritable.write(MatrixWritable.java:56)
   at
org.apache.mahout.sparkbindings.io.WritableKryoSerializer.write(WritableKryoSerializer.scala:29)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
   at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
   at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:751)
   at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:750)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
   at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:750)
   at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:746)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
   at
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:746)
   at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
   at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
   at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
   at org.apache.spark.scheduler.Task.run(Task.scala:56)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

Thanks,

Andy