You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2017/11/03 12:18:00 UTC

[jira] [Resolved] (SPARK-22438) OutOfMemoryError on very small data sets

     [ https://issues.apache.org/jira/browse/SPARK-22438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-22438.
-------------------------------
    Resolution: Duplicate

Have a look through JIRA first. This looks like https://issues.apache.org/jira/browse/SPARK-21033

> OutOfMemoryError on very small data sets
> ----------------------------------------
>
>                 Key: SPARK-22438
>                 URL: https://issues.apache.org/jira/browse/SPARK-22438
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Morten Hornbech
>            Priority: Critical
>
> We have a customer that uses Spark as an engine for running SQL on a collection of small datasets, typically no greater than a few thousand rows. Recently we started observing out-of-memory errors on some new workloads. Even though the datasets were only a few kilobytes, the job would almost immediately spike to > 10GB of memory usage, producing an out-of-memory error on the modest hardware (2 CPUs, 16 RAM) that is used. Using larger hardware and allocating more memory to Spark (4 CPUs, 32 RAM) made the job complete, but still with an unreasonable high memory usage.
> The query involved was a left join on two datasets. In some, but not all, cases we were able to remove or reduce the problem by rewriting the query to use an exists sub-select instead. After a lot of debugging we were able to reproduce the problem locally with the following test:
> {code:java}
> case class Data(value: String)
> val session = SparkSession.builder.master("local[1]").getOrCreate()
> import session.implicits._
> val foo = session.createDataset((1 to 500).map(i => Data(i.toString)))
> val bar = session.createDataset((1 to 1).map(i => Data(i.toString)))
> foo.persist(StorageLevel.MEMORY_ONLY)
> foo.createTempView("foo")
> bar.createTempView("bar")
> val result = session.sql("select * from bar left join foo on bar.value = foo.value")
> result.coalesce(2).collect()
> {code}
> Running this produces the error below:
> {code:java}
> java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
>    at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:127)
>    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:372)
>    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
>    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
>    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
>    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
>    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>    at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
>    at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:774)
>    at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:649)
>    at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:198)
>    at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:136)
>    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
>    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
>    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>    at org.apache.spark.scheduler.Task.run(Task.scala:108)
>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>    at java.lang.Thread.run(Thread.java:745)
> {code}
> The exact failure point varies with the number of threads given to spark, the "coalesce" value and the number of rows in "foo". Using an inner join, removing the call to persist, removing the call to coalease (or using repartition) will all independently make the error go away.
> The reason persist and coalesce are used in the workload at all is because it is part of a more general Spark-based processing engine, not limited to these small datasets. Therefore the workaround is not a simple as it may seem, since we cannot tailor the Spark code to this specific case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org