You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Babak Alipour <ba...@gmail.com> on 2016/10/02 03:35:45 UTC

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

Do you mean running a multi-JVM 'cluster' on the single machine? How would
that affect performance/memory-consumption? If a multi-JVM setup can handle
such a large input, then why can't a single-JVM break down the job into
smaller tasks?

I also found that SPARK-9411 mentions making the page_size configurable but
it's hard-limited to ((1L << 31) - 1) * 8L [1]

[1]
https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

​Spark-9452 also talks about larger page sizes but I don't know how that
affects my use case.​ [2]

[2] https://github.com/apache/spark/pull/7891


​The reason provided here is that the on-heap allocator's maximum page size
is limited by the maximum amount of data that can be stored in a long[]​.
Is it possible to force this specific operation to go off-heap so that it
can possibly use a bigger page size?



​>Babak​


*Babak Alipour ,*
*University of Florida*

On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <va...@datadoghq.com>
wrote:

> Run more smaller executors: change `spark.executor.memory` to 32g and
> `spark.executor.cores` to 2-4, for example.
>
> Changing driver's memory won't help because it doesn't participate in
> execution.
>
> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <ba...@gmail.com>
> wrote:
>
>> Thank you for your replies.
>>
>> @Mich, using LIMIT 100 in the query prevents the exception but given the
>> fact that there's enough memory, I don't think this should happen even
>> without LIMIT.
>>
>> @Vadim, here's the full stack trace:
>>
>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>> with more than 17179869176 bytes
>>         at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskM
>> emoryManager.java:241)
>>         at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryCo
>> nsumer.java:121)
>>         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>> orter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
>>         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>> orter.insertRecord(UnsafeExternalSorter.java:396)
>>         at org.apache.spark.sql.execution.UnsafeExternalRowSorter.inser
>> tRow(UnsafeExternalRowSorter.java:94)
>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>> eratedIterator.sort_addToSorter$(Unknown Source)
>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>> eratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>> eratedIterator.processNext(Unknown Source)
>>         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(B
>> ufferedRowIterator.java:43)
>>         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfu
>> n$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>         at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.w
>> rite(BypassMergeSortShuffleWriter.java:125)
>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:79)
>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:47)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.s
>> cala:274)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> I'm running spark in local mode so there is only one executor, the driver
>> and spark.driver.memory is set to 64g. Changing the driver's memory doesn't
>> help.
>>
>> *Babak Alipour ,*
>> *University of Florida*
>>
>> On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov <
>> vadim.semenov@datadoghq.com> wrote:
>>
>>> Can you post the whole exception stack trace?
>>> What are your executor memory settings?
>>>
>>> Right now I assume that it happens in UnsafeExternalRowSorter ->
>>> UnsafeExternalSorter:insertRecord
>>>
>>> Running more executors with lower `spark.executor.memory` should help.
>>>
>>>
>>> On Fri, Sep 30, 2016 at 12:57 PM, Babak Alipour <babak.alipour@gmail.com
>>> > wrote:
>>>
>>>> Greetings everyone,
>>>>
>>>> I'm trying to read a single field of a Hive table stored as Parquet in
>>>> Spark (~140GB for the entire table, this single field should be just a few
>>>> GB) and look at the sorted output using the following:
>>>>
>>>> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC")
>>>>
>>>> ​But this simple line of code gives:
>>>>
>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>>>> with more than 17179869176 bytes
>>>>
>>>> Same error for:
>>>>
>>>> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
>>>>
>>>> and:
>>>>
>>>> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
>>>>
>>>>
>>>> I'm running this on a machine with more than 200GB of RAM, running in
>>>> local mode with spark.driver.memory set to 64g.
>>>>
>>>> I do not know why it cannot allocate a big enough page, and why is it
>>>> trying to allocate such a big page in the first place?
>>>>
>>>> I hope someone with more knowledge of Spark can shed some light on
>>>> this. Thank you!
>>>>
>>>>
>>>> *​Best regards,​*
>>>> *Babak Alipour ,*
>>>> *University of Florida*
>>>>
>>>
>>>
>>
>

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

Posted by amarouni <am...@talend.com>.
You can get some more insights by using the Spark history server
(http://spark.apache.org/docs/latest/monitoring.html), it can show you
which task is failing and some other information that might help you
debugging the issue.


On 05/10/2016 19:00, Babak Alipour wrote:
> The issue seems to lie in the RangePartitioner trying to create equal
> ranges. [1]
>
> [1]
> https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.ml
> <https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html> 
>
>  The /Double/ values I'm trying to sort are mostly in the range [0,1]
> (~70% of the data which roughly equates 1 billion records), other
> numbers in the dataset are as high as 2000. With the RangePartitioner
> trying to create equal ranges, some tasks are becoming almost empty
> while others are extremely large, due to the heavily skewed distribution. 
>
> This is either a bug in Apache Spark or a major limitation of the
> framework. Has anyone else encountered this?
>
> */Babak Alipour ,/*
> */University of Florida/*
>
> On Sun, Oct 2, 2016 at 1:38 PM, Babak Alipour <babak.alipour@gmail.com
> <ma...@gmail.com>> wrote:
>
>     Thanks Vadim for sharing your experience, but I have tried
>     multi-JVM setup (2 workers), various sizes for
>     spark.executor.memory (8g, 16g, 20g, 32g, 64g) and
>     spark.executor.core (2-4), same error all along.
>
>     As for the files, these are all .snappy.parquet files, resulting
>     from inserting some data from other tables. None of them actually
>     exceeds 25MiB (I don't know why this number) Setting the DataFrame
>     to persist using StorageLevel.MEMORY_ONLY shows size in memory at
>     ~10g.  I still cannot understand why it is trying to create such a
>     big page when sorting. The entire column (this df has only 1
>     column) is not that big, neither are the original files. Any ideas?
>
>
>     >Babak
>
>
>
>     */Babak Alipour ,/*
>     */University of Florida/*
>
>     On Sun, Oct 2, 2016 at 1:45 AM, Vadim Semenov
>     <vadim.semenov@datadoghq.com <ma...@datadoghq.com>>
>     wrote:
>
>         oh, and try to run even smaller executors, i.e. with
>         `spark.executor.memory` <= 16GiB. I wonder what result you're
>         going to get.
>
>         On Sun, Oct 2, 2016 at 1:24 AM, Vadim Semenov
>         <vadim.semenov@datadoghq.com
>         <ma...@datadoghq.com>> wrote:
>
>             > Do you mean running a multi-JVM 'cluster' on the single
>             machine? 
>             Yes, that's what I suggested.
>
>             You can get some information here: 
>             http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
>             <http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/>
>
>             > How would that affect performance/memory-consumption? If
>             a multi-JVM setup can handle such a large input, then why
>             can't a single-JVM break down the job into smaller tasks?
>             I don't have an answer to these questions, it requires
>             understanding of Spark, JVM, and your setup internal.
>
>             I ran into the same issue only once when I tried to read a
>             gzipped file which size was >16GiB. That's the only time I
>             had to meet
>             this https://github.com/apache/spark/blob/5d84c7fd83502aeb551d46a740502db4862508fe/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L238-L243
>             <https://github.com/apache/spark/blob/5d84c7fd83502aeb551d46a740502db4862508fe/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L238-L243>
>             In the end I had to recompress my file into bzip2 that is
>             splittable to be able to read it with spark.
>
>
>             I'd look into size of your files and if they're huge I'd
>             try to connect the error you got to the size of the files
>             (but it's strange to me as a block size of a Parquet file
>             is 128MiB). I don't have any other suggestions, I'm sorry.
>
>
>             On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour
>             <babak.alipour@gmail.com <ma...@gmail.com>>
>             wrote:
>
>                 Do you mean running a multi-JVM 'cluster' on the
>                 single machine? How would that affect
>                 performance/memory-consumption? If a multi-JVM setup
>                 can handle such a large input, then why can't a
>                 single-JVM break down the job into smaller tasks?
>
>                 I also found that SPARK-9411 mentions making the
>                 page_size configurable but it's hard-limited
>                 to ((1L<<31) -1) *8L [1]
>
>                 [1]
>                 https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
>                 <https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java>
>
>
>                 \u200bSpark-9452 also talks about larger page sizes but I
>                 don't know how that affects my use case.\u200b [2]
>
>                 [2] https://github.com/apache/spark/pull/7891
>                 <https://github.com/apache/spark/pull/7891>
>
>
>                 \u200bThe reason provided here is that the on-heap
>                 allocator's maximum page size is limited by the
>                 maximum amount of data that can be stored in a long[]\u200b. 
>                 Is it possible to force this specific operation to go
>                 off-heap so that it can possibly use a bigger page size?
>
>
>
>                 \u200b>Babak\u200b
>
>
>                 */Babak Alipour ,/*
>                 */University of Florida/*
>
>                 On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov
>                 <vadim.semenov@datadoghq.com
>                 <ma...@datadoghq.com>> wrote:
>
>                     Run more smaller executors: change
>                     `spark.executor.memory` to 32g and
>                     `spark.executor.cores` to 2-4, for example.
>
>                     Changing driver's memory won't help because it
>                     doesn't participate in execution.
>
>                     On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour
>                     <babak.alipour@gmail.com
>                     <ma...@gmail.com>> wrote:
>
>                         Thank you for your replies.
>
>                         @Mich, using LIMIT 100 in the query prevents
>                         the exception but given the fact that there's
>                         enough memory, I don't think this should
>                         happen even without LIMIT. 
>
>                         @Vadim, here's the full stack trace:
>
>                         Caused by: java.lang.IllegalArgumentException:
>                         Cannot allocate a page with more than
>                         17179869176 <tel:17179869176> bytes
>                                 at
>                         org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241)
>                                 at
>                         org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121)
>                                 at
>                         org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
>                                 at
>                         org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
>                                 at
>                         org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
>                                 at
>                         org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>                         Source)
>                                 at
>                         org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>                         Source)
>                                 at
>                         org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>                         Source)
>                                 at
>                         org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>                                 at
>                         org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>                                 at
>                         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>                                 at
>                         org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>                                 at
>                         org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>                                 at
>                         org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>                                 at
>                         org.apache.spark.scheduler.Task.run(Task.scala:85)
>                                 at
>                         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>                                 at
>                         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>                                 at
>                         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>                                 at java.lang.Thread.run(Thread.java:745)
>
>                         I'm running spark in local mode so there is
>                         only one executor, the driver
>                         and spark.driver.memory is set to 64g.
>                         Changing the driver's memory doesn't help. 
>
>                         */Babak Alipour ,/*
>                         */University of Florida/*
>
>                         On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov
>                         <vadim.semenov@datadoghq.com
>                         <ma...@datadoghq.com>> wrote:
>
>                             Can you post the whole exception stack trace?
>                             What are your executor memory settings?
>
>                             Right now I assume that it happens in
>                             UnsafeExternalRowSorter ->
>                             UnsafeExternalSorter:insertRecord
>                             Running more executors with lower
>                             `spark.executor.memory` should help.
>                             On Fri, Sep 30, 2016 at 12:57 PM, Babak
>                             Alipour <babak.alipour@gmail.com
>                             <ma...@gmail.com>> wrote:
>
>                                 Greetings everyone,
>                                 I'm trying to read a single field of a
>                                 Hive table stored as Parquet in Spark
>                                 (~140GB for the entire table, this
>                                 single field should be just a few GB)
>                                 and look at the sorted output using
>                                 the following:
>
>                                 sql("SELECT " + field + " FROM MY_TABLEORDER BY " + field + " DESC") 
>
>                                 \u200bBut this simple line of code gives:
>                                 *//*
>                                 Caused by:
>                                 java.lang.IllegalArgumentException:
>                                 Cannot allocate a page with more than
>                                 17179869176 <tel:17179869176> bytes
>                                 Same error for:
>
>                                 sql("SELECT " + field + " FROM MY_TABLE).sort(field)
>
>                                 and:
>
>                                 sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
>
>                                 I'm running this on a machine with
>                                 more than 200GB of RAM, running in
>                                 local mode with spark.driver.memory
>                                 set to 64g.
>                                 I do not know why it cannot allocate a
>                                 big enough page, and why is it trying
>                                 to allocate such a big page in the
>                                 first place?
>                                 I hope someone with more knowledge of
>                                 Spark can shed some light on this.
>                                 Thank you!
>                                 */
>                                 \u200bBest regards,\u200b
>                                 /*
>                                 */Babak Alipour ,/*
>                                 */University of Florida/*
>

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

Posted by Babak Alipour <ba...@gmail.com>.
The issue seems to lie in the RangePartitioner trying to create equal
ranges. [1]

[1] https://spark.apache.org/docs/2.0.0/api/java/org/apache/
spark/RangePartitioner.html

 The *Double* values I'm trying to sort are mostly in the range [0,1] (~70%
of the data which roughly equates 1 billion records), other numbers in the
dataset are as high as 2000. With the RangePartitioner trying to create
equal ranges, some tasks are becoming almost empty while others are
extremely large, due to the heavily skewed distribution.

This is either a bug in Apache Spark or a major limitation of the
framework. Has anyone else encountered this?

*Babak Alipour ,*
*University of Florida*

On Sun, Oct 2, 2016 at 1:38 PM, Babak Alipour <ba...@gmail.com>
wrote:

> Thanks Vadim for sharing your experience, but I have tried multi-JVM setup
> (2 workers), various sizes for spark.executor.memory (8g, 16g, 20g, 32g,
> 64g) and spark.executor.core (2-4), same error all along.
>
> As for the files, these are all .snappy.parquet files, resulting from
> inserting some data from other tables. None of them actually exceeds 25MiB
> (I don't know why this number) Setting the DataFrame to persist using
> StorageLevel.MEMORY_ONLY shows size in memory at ~10g.  I still cannot
> understand why it is trying to create such a big page when sorting. The
> entire column (this df has only 1 column) is not that big, neither are the
> original files. Any ideas?
>
>
> >Babak
>
>
>
> *Babak Alipour ,*
> *University of Florida*
>
> On Sun, Oct 2, 2016 at 1:45 AM, Vadim Semenov <vadim.semenov@datadoghq.com
> > wrote:
>
>> oh, and try to run even smaller executors, i.e. with
>> `spark.executor.memory` <= 16GiB. I wonder what result you're going to get.
>>
>> On Sun, Oct 2, 2016 at 1:24 AM, Vadim Semenov <
>> vadim.semenov@datadoghq.com> wrote:
>>
>>> > Do you mean running a multi-JVM 'cluster' on the single machine?
>>> Yes, that's what I suggested.
>>>
>>> You can get some information here:
>>> http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apach
>>> e-spark-jobs-part-2/
>>>
>>> > How would that affect performance/memory-consumption? If a multi-JVM
>>> setup can handle such a large input, then why can't a single-JVM break down
>>> the job into smaller tasks?
>>> I don't have an answer to these questions, it requires understanding of
>>> Spark, JVM, and your setup internal.
>>>
>>> I ran into the same issue only once when I tried to read a gzipped file
>>> which size was >16GiB. That's the only time I had to meet this
>>> https://github.com/apache/spark/blob/5d84c7fd83502aeb55
>>> 1d46a740502db4862508fe/core/src/main/java/org/apache/spark/
>>> memory/TaskMemoryManager.java#L238-L243
>>> In the end I had to recompress my file into bzip2 that is splittable to
>>> be able to read it with spark.
>>>
>>>
>>> I'd look into size of your files and if they're huge I'd try to connect
>>> the error you got to the size of the files (but it's strange to me as a
>>> block size of a Parquet file is 128MiB). I don't have any other
>>> suggestions, I'm sorry.
>>>
>>>
>>> On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour <ba...@gmail.com>
>>> wrote:
>>>
>>>> Do you mean running a multi-JVM 'cluster' on the single machine? How
>>>> would that affect performance/memory-consumption? If a multi-JVM setup
>>>> can handle such a large input, then why can't a single-JVM break down the
>>>> job into smaller tasks?
>>>>
>>>> I also found that SPARK-9411 mentions making the page_size configurable
>>>> but it's hard-limited to ((1L << 31) - 1) * 8L [1]
>>>>
>>>> [1] https://github.com/apache/spark/blob/master/core/src/main/ja
>>>> va/org/apache/spark/memory/TaskMemoryManager.java
>>>>
>>>> ​Spark-9452 also talks about larger page sizes but I don't know how
>>>> that affects my use case.​ [2]
>>>>
>>>> [2] https://github.com/apache/spark/pull/7891
>>>>
>>>>
>>>> ​The reason provided here is that the on-heap allocator's maximum page
>>>> size is limited by the maximum amount of data that can be stored in a
>>>> long[]​.
>>>> Is it possible to force this specific operation to go off-heap so that
>>>> it can possibly use a bigger page size?
>>>>
>>>>
>>>>
>>>> ​>Babak​
>>>>
>>>>
>>>> *Babak Alipour ,*
>>>> *University of Florida*
>>>>
>>>> On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <
>>>> vadim.semenov@datadoghq.com> wrote:
>>>>
>>>>> Run more smaller executors: change `spark.executor.memory` to 32g and
>>>>> `spark.executor.cores` to 2-4, for example.
>>>>>
>>>>> Changing driver's memory won't help because it doesn't participate in
>>>>> execution.
>>>>>
>>>>> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <
>>>>> babak.alipour@gmail.com> wrote:
>>>>>
>>>>>> Thank you for your replies.
>>>>>>
>>>>>> @Mich, using LIMIT 100 in the query prevents the exception but given
>>>>>> the fact that there's enough memory, I don't think this should happen even
>>>>>> without LIMIT.
>>>>>>
>>>>>> @Vadim, here's the full stack trace:
>>>>>>
>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a
>>>>>> page with more than 17179869176 bytes
>>>>>>         at org.apache.spark.memory.TaskMe
>>>>>> moryManager.allocatePage(TaskMemoryManager.java:241)
>>>>>>         at org.apache.spark.memory.Memory
>>>>>> Consumer.allocatePage(MemoryConsumer.java:121)
>>>>>>         at org.apache.spark.util.collecti
>>>>>> on.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessar
>>>>>> y(UnsafeExternalSorter.java:374)
>>>>>>         at org.apache.spark.util.collecti
>>>>>> on.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExter
>>>>>> nalSorter.java:396)
>>>>>>         at org.apache.spark.sql.execution
>>>>>> .UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
>>>>>>         at org.apache.spark.sql.catalyst.
>>>>>> expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>>>>>> Source)
>>>>>>         at org.apache.spark.sql.catalyst.
>>>>>> expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>>>>>> Source)
>>>>>>         at org.apache.spark.sql.catalyst.
>>>>>> expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>>>>>> Source)
>>>>>>         at org.apache.spark.sql.execution
>>>>>> .BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>>>>>         at org.apache.spark.sql.execution
>>>>>> .WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStage
>>>>>> CodegenExec.scala:370)
>>>>>>         at scala.collection.Iterator$$ano
>>>>>> n$11.hasNext(Iterator.scala:408)
>>>>>>         at org.apache.spark.shuffle.sort.
>>>>>> BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWri
>>>>>> ter.java:125)
>>>>>>         at org.apache.spark.scheduler.Shu
>>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:79)
>>>>>>         at org.apache.spark.scheduler.Shu
>>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:47)
>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>>>>>         at org.apache.spark.executor.Exec
>>>>>> utor$TaskRunner.run(Executor.scala:274)
>>>>>>         at java.util.concurrent.ThreadPoo
>>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>         at java.util.concurrent.ThreadPoo
>>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> I'm running spark in local mode so there is only one executor, the
>>>>>> driver and spark.driver.memory is set to 64g. Changing the driver's memory
>>>>>> doesn't help.
>>>>>>
>>>>>> *Babak Alipour ,*
>>>>>> *University of Florida*
>>>>>>
>>>>>> On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov <
>>>>>> vadim.semenov@datadoghq.com> wrote:
>>>>>>
>>>>>>> Can you post the whole exception stack trace?
>>>>>>> What are your executor memory settings?
>>>>>>>
>>>>>>> Right now I assume that it happens in UnsafeExternalRowSorter ->
>>>>>>> UnsafeExternalSorter:insertRecord
>>>>>>>
>>>>>>> Running more executors with lower `spark.executor.memory` should
>>>>>>> help.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 30, 2016 at 12:57 PM, Babak Alipour <
>>>>>>> babak.alipour@gmail.com> wrote:
>>>>>>>
>>>>>>>> Greetings everyone,
>>>>>>>>
>>>>>>>> I'm trying to read a single field of a Hive table stored as Parquet
>>>>>>>> in Spark (~140GB for the entire table, this single field should be just a
>>>>>>>> few GB) and look at the sorted output using the following:
>>>>>>>>
>>>>>>>> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC")
>>>>>>>>
>>>>>>>> ​But this simple line of code gives:
>>>>>>>>
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a
>>>>>>>> page with more than 17179869176 bytes
>>>>>>>>
>>>>>>>> Same error for:
>>>>>>>>
>>>>>>>> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
>>>>>>>>
>>>>>>>> and:
>>>>>>>>
>>>>>>>> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
>>>>>>>>
>>>>>>>>
>>>>>>>> I'm running this on a machine with more than 200GB of RAM, running
>>>>>>>> in local mode with spark.driver.memory set to 64g.
>>>>>>>>
>>>>>>>> I do not know why it cannot allocate a big enough page, and why is
>>>>>>>> it trying to allocate such a big page in the first place?
>>>>>>>>
>>>>>>>> I hope someone with more knowledge of Spark can shed some light on
>>>>>>>> this. Thank you!
>>>>>>>>
>>>>>>>>
>>>>>>>> *​Best regards,​*
>>>>>>>> *Babak Alipour ,*
>>>>>>>> *University of Florida*
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

Posted by Babak Alipour <ba...@gmail.com>.
Thanks Vadim for sharing your experience, but I have tried multi-JVM setup
(2 workers), various sizes for spark.executor.memory (8g, 16g, 20g, 32g,
64g) and spark.executor.core (2-4), same error all along.

As for the files, these are all .snappy.parquet files, resulting from
inserting some data from other tables. None of them actually exceeds 25MiB
(I don't know why this number) Setting the DataFrame to persist using
StorageLevel.MEMORY_ONLY shows size in memory at ~10g.  I still cannot
understand why it is trying to create such a big page when sorting. The
entire column (this df has only 1 column) is not that big, neither are the
original files. Any ideas?


>Babak



*Babak Alipour ,*
*University of Florida*

On Sun, Oct 2, 2016 at 1:45 AM, Vadim Semenov <va...@datadoghq.com>
wrote:

> oh, and try to run even smaller executors, i.e. with
> `spark.executor.memory` <= 16GiB. I wonder what result you're going to get.
>
> On Sun, Oct 2, 2016 at 1:24 AM, Vadim Semenov <vadim.semenov@datadoghq.com
> > wrote:
>
>> > Do you mean running a multi-JVM 'cluster' on the single machine?
>> Yes, that's what I suggested.
>>
>> You can get some information here:
>> http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apach
>> e-spark-jobs-part-2/
>>
>> > How would that affect performance/memory-consumption? If a multi-JVM
>> setup can handle such a large input, then why can't a single-JVM break down
>> the job into smaller tasks?
>> I don't have an answer to these questions, it requires understanding of
>> Spark, JVM, and your setup internal.
>>
>> I ran into the same issue only once when I tried to read a gzipped file
>> which size was >16GiB. That's the only time I had to meet this
>> https://github.com/apache/spark/blob/5d84c7fd83502aeb55
>> 1d46a740502db4862508fe/core/src/main/java/org/apache/
>> spark/memory/TaskMemoryManager.java#L238-L243
>> In the end I had to recompress my file into bzip2 that is splittable to
>> be able to read it with spark.
>>
>>
>> I'd look into size of your files and if they're huge I'd try to connect
>> the error you got to the size of the files (but it's strange to me as a
>> block size of a Parquet file is 128MiB). I don't have any other
>> suggestions, I'm sorry.
>>
>>
>> On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour <ba...@gmail.com>
>> wrote:
>>
>>> Do you mean running a multi-JVM 'cluster' on the single machine? How
>>> would that affect performance/memory-consumption? If a multi-JVM setup
>>> can handle such a large input, then why can't a single-JVM break down the
>>> job into smaller tasks?
>>>
>>> I also found that SPARK-9411 mentions making the page_size configurable
>>> but it's hard-limited to ((1L << 31) - 1) * 8L [1]
>>>
>>> [1] https://github.com/apache/spark/blob/master/core/src/main/ja
>>> va/org/apache/spark/memory/TaskMemoryManager.java
>>>
>>> ​Spark-9452 also talks about larger page sizes but I don't know how that
>>> affects my use case.​ [2]
>>>
>>> [2] https://github.com/apache/spark/pull/7891
>>>
>>>
>>> ​The reason provided here is that the on-heap allocator's maximum page
>>> size is limited by the maximum amount of data that can be stored in a
>>> long[]​.
>>> Is it possible to force this specific operation to go off-heap so that
>>> it can possibly use a bigger page size?
>>>
>>>
>>>
>>> ​>Babak​
>>>
>>>
>>> *Babak Alipour ,*
>>> *University of Florida*
>>>
>>> On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <
>>> vadim.semenov@datadoghq.com> wrote:
>>>
>>>> Run more smaller executors: change `spark.executor.memory` to 32g and
>>>> `spark.executor.cores` to 2-4, for example.
>>>>
>>>> Changing driver's memory won't help because it doesn't participate in
>>>> execution.
>>>>
>>>> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <babak.alipour@gmail.com
>>>> > wrote:
>>>>
>>>>> Thank you for your replies.
>>>>>
>>>>> @Mich, using LIMIT 100 in the query prevents the exception but given
>>>>> the fact that there's enough memory, I don't think this should happen even
>>>>> without LIMIT.
>>>>>
>>>>> @Vadim, here's the full stack trace:
>>>>>
>>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>>>>> with more than 17179869176 bytes
>>>>>         at org.apache.spark.memory.TaskMe
>>>>> moryManager.allocatePage(TaskMemoryManager.java:241)
>>>>>         at org.apache.spark.memory.Memory
>>>>> Consumer.allocatePage(MemoryConsumer.java:121)
>>>>>         at org.apache.spark.util.collecti
>>>>> on.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessar
>>>>> y(UnsafeExternalSorter.java:374)
>>>>>         at org.apache.spark.util.collecti
>>>>> on.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExter
>>>>> nalSorter.java:396)
>>>>>         at org.apache.spark.sql.execution
>>>>> .UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
>>>>>         at org.apache.spark.sql.catalyst.
>>>>> expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>>>>> Source)
>>>>>         at org.apache.spark.sql.catalyst.
>>>>> expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>>>>> Source)
>>>>>         at org.apache.spark.sql.catalyst.
>>>>> expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>>>>> Source)
>>>>>         at org.apache.spark.sql.execution
>>>>> .BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>>>>         at org.apache.spark.sql.execution
>>>>> .WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStage
>>>>> CodegenExec.scala:370)
>>>>>         at scala.collection.Iterator$$ano
>>>>> n$11.hasNext(Iterator.scala:408)
>>>>>         at org.apache.spark.shuffle.sort.
>>>>> BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWri
>>>>> ter.java:125)
>>>>>         at org.apache.spark.scheduler.Shu
>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:79)
>>>>>         at org.apache.spark.scheduler.Shu
>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:47)
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>>>>         at org.apache.spark.executor.Exec
>>>>> utor$TaskRunner.run(Executor.scala:274)
>>>>>         at java.util.concurrent.ThreadPoo
>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>         at java.util.concurrent.ThreadPoo
>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> I'm running spark in local mode so there is only one executor, the
>>>>> driver and spark.driver.memory is set to 64g. Changing the driver's memory
>>>>> doesn't help.
>>>>>
>>>>> *Babak Alipour ,*
>>>>> *University of Florida*
>>>>>
>>>>> On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov <
>>>>> vadim.semenov@datadoghq.com> wrote:
>>>>>
>>>>>> Can you post the whole exception stack trace?
>>>>>> What are your executor memory settings?
>>>>>>
>>>>>> Right now I assume that it happens in UnsafeExternalRowSorter ->
>>>>>> UnsafeExternalSorter:insertRecord
>>>>>>
>>>>>> Running more executors with lower `spark.executor.memory` should help.
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 30, 2016 at 12:57 PM, Babak Alipour <
>>>>>> babak.alipour@gmail.com> wrote:
>>>>>>
>>>>>>> Greetings everyone,
>>>>>>>
>>>>>>> I'm trying to read a single field of a Hive table stored as Parquet
>>>>>>> in Spark (~140GB for the entire table, this single field should be just a
>>>>>>> few GB) and look at the sorted output using the following:
>>>>>>>
>>>>>>> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC")
>>>>>>>
>>>>>>> ​But this simple line of code gives:
>>>>>>>
>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a
>>>>>>> page with more than 17179869176 bytes
>>>>>>>
>>>>>>> Same error for:
>>>>>>>
>>>>>>> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
>>>>>>>
>>>>>>> and:
>>>>>>>
>>>>>>> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
>>>>>>>
>>>>>>>
>>>>>>> I'm running this on a machine with more than 200GB of RAM, running
>>>>>>> in local mode with spark.driver.memory set to 64g.
>>>>>>>
>>>>>>> I do not know why it cannot allocate a big enough page, and why is
>>>>>>> it trying to allocate such a big page in the first place?
>>>>>>>
>>>>>>> I hope someone with more knowledge of Spark can shed some light on
>>>>>>> this. Thank you!
>>>>>>>
>>>>>>>
>>>>>>> *​Best regards,​*
>>>>>>> *Babak Alipour ,*
>>>>>>> *University of Florida*
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

Posted by Vadim Semenov <va...@datadoghq.com>.
oh, and try to run even smaller executors, i.e. with
`spark.executor.memory` <= 16GiB. I wonder what result you're going to get.

On Sun, Oct 2, 2016 at 1:24 AM, Vadim Semenov <va...@datadoghq.com>
wrote:

> > Do you mean running a multi-JVM 'cluster' on the single machine?
> Yes, that's what I suggested.
>
> You can get some information here:
> http://blog.cloudera.com/blog/2015/03/how-to-tune-your-
> apache-spark-jobs-part-2/
>
> > How would that affect performance/memory-consumption? If a multi-JVM
> setup can handle such a large input, then why can't a single-JVM break down
> the job into smaller tasks?
> I don't have an answer to these questions, it requires understanding of
> Spark, JVM, and your setup internal.
>
> I ran into the same issue only once when I tried to read a gzipped file
> which size was >16GiB. That's the only time I had to meet this
> https://github.com/apache/spark/blob/5d84c7fd83502aeb551d46a740502d
> b4862508fe/core/src/main/java/org/apache/spark/memory/
> TaskMemoryManager.java#L238-L243
> In the end I had to recompress my file into bzip2 that is splittable to be
> able to read it with spark.
>
>
> I'd look into size of your files and if they're huge I'd try to connect
> the error you got to the size of the files (but it's strange to me as a
> block size of a Parquet file is 128MiB). I don't have any other
> suggestions, I'm sorry.
>
>
> On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour <ba...@gmail.com>
> wrote:
>
>> Do you mean running a multi-JVM 'cluster' on the single machine? How
>> would that affect performance/memory-consumption? If a multi-JVM setup
>> can handle such a large input, then why can't a single-JVM break down the
>> job into smaller tasks?
>>
>> I also found that SPARK-9411 mentions making the page_size configurable
>> but it's hard-limited to ((1L << 31) - 1) * 8L [1]
>>
>> [1] https://github.com/apache/spark/blob/master/core/src/main/
>> java/org/apache/spark/memory/TaskMemoryManager.java
>>
>> ​Spark-9452 also talks about larger page sizes but I don't know how that
>> affects my use case.​ [2]
>>
>> [2] https://github.com/apache/spark/pull/7891
>>
>>
>> ​The reason provided here is that the on-heap allocator's maximum page
>> size is limited by the maximum amount of data that can be stored in a
>> long[]​.
>> Is it possible to force this specific operation to go off-heap so that it
>> can possibly use a bigger page size?
>>
>>
>>
>> ​>Babak​
>>
>>
>> *Babak Alipour ,*
>> *University of Florida*
>>
>> On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <
>> vadim.semenov@datadoghq.com> wrote:
>>
>>> Run more smaller executors: change `spark.executor.memory` to 32g and
>>> `spark.executor.cores` to 2-4, for example.
>>>
>>> Changing driver's memory won't help because it doesn't participate in
>>> execution.
>>>
>>> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <ba...@gmail.com>
>>> wrote:
>>>
>>>> Thank you for your replies.
>>>>
>>>> @Mich, using LIMIT 100 in the query prevents the exception but given
>>>> the fact that there's enough memory, I don't think this should happen even
>>>> without LIMIT.
>>>>
>>>> @Vadim, here's the full stack trace:
>>>>
>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>>>> with more than 17179869176 bytes
>>>>         at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskM
>>>> emoryManager.java:241)
>>>>         at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryCo
>>>> nsumer.java:121)
>>>>         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>>> orter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
>>>>         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>>> orter.insertRecord(UnsafeExternalSorter.java:396)
>>>>         at org.apache.spark.sql.execution.UnsafeExternalRowSorter.inser
>>>> tRow(UnsafeExternalRowSorter.java:94)
>>>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>>> eratedIterator.sort_addToSorter$(Unknown Source)
>>>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>>> eratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
>>>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>>> eratedIterator.processNext(Unknown Source)
>>>>         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(B
>>>> ufferedRowIterator.java:43)
>>>>         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfu
>>>> n$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>>>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:40
>>>> 8)
>>>>         at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.w
>>>> rite(BypassMergeSortShuffleWriter.java:125)
>>>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>>> Task.scala:79)
>>>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>>> Task.scala:47)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>>>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.s
>>>> cala:274)
>>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>> Executor.java:1142)
>>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>> lExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm running spark in local mode so there is only one executor, the
>>>> driver and spark.driver.memory is set to 64g. Changing the driver's memory
>>>> doesn't help.
>>>>
>>>> *Babak Alipour ,*
>>>> *University of Florida*
>>>>
>>>> On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov <
>>>> vadim.semenov@datadoghq.com> wrote:
>>>>
>>>>> Can you post the whole exception stack trace?
>>>>> What are your executor memory settings?
>>>>>
>>>>> Right now I assume that it happens in UnsafeExternalRowSorter ->
>>>>> UnsafeExternalSorter:insertRecord
>>>>>
>>>>> Running more executors with lower `spark.executor.memory` should help.
>>>>>
>>>>>
>>>>> On Fri, Sep 30, 2016 at 12:57 PM, Babak Alipour <
>>>>> babak.alipour@gmail.com> wrote:
>>>>>
>>>>>> Greetings everyone,
>>>>>>
>>>>>> I'm trying to read a single field of a Hive table stored as Parquet
>>>>>> in Spark (~140GB for the entire table, this single field should be just a
>>>>>> few GB) and look at the sorted output using the following:
>>>>>>
>>>>>> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC")
>>>>>>
>>>>>> ​But this simple line of code gives:
>>>>>>
>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a
>>>>>> page with more than 17179869176 bytes
>>>>>>
>>>>>> Same error for:
>>>>>>
>>>>>> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
>>>>>>
>>>>>> and:
>>>>>>
>>>>>> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
>>>>>>
>>>>>>
>>>>>> I'm running this on a machine with more than 200GB of RAM, running in
>>>>>> local mode with spark.driver.memory set to 64g.
>>>>>>
>>>>>> I do not know why it cannot allocate a big enough page, and why is it
>>>>>> trying to allocate such a big page in the first place?
>>>>>>
>>>>>> I hope someone with more knowledge of Spark can shed some light on
>>>>>> this. Thank you!
>>>>>>
>>>>>>
>>>>>> *​Best regards,​*
>>>>>> *Babak Alipour ,*
>>>>>> *University of Florida*
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

Posted by Vadim Semenov <va...@datadoghq.com>.
> Do you mean running a multi-JVM 'cluster' on the single machine?
Yes, that's what I suggested.

You can get some information here:
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

> How would that affect performance/memory-consumption? If a multi-JVM
setup can handle such a large input, then why can't a single-JVM break down
the job into smaller tasks?
I don't have an answer to these questions, it requires understanding of
Spark, JVM, and your setup internal.

I ran into the same issue only once when I tried to read a gzipped file
which size was >16GiB. That's the only time I had to meet this
https://github.com/apache/spark/blob/5d84c7fd83502aeb551d46a740502db4862508fe/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L238-L243
In the end I had to recompress my file into bzip2 that is splittable to be
able to read it with spark.


I'd look into size of your files and if they're huge I'd try to connect the
error you got to the size of the files (but it's strange to me as a block
size of a Parquet file is 128MiB). I don't have any other suggestions, I'm
sorry.


On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour <ba...@gmail.com>
wrote:

> Do you mean running a multi-JVM 'cluster' on the single machine? How would
> that affect performance/memory-consumption? If a multi-JVM setup can
> handle such a large input, then why can't a single-JVM break down the job
> into smaller tasks?
>
> I also found that SPARK-9411 mentions making the page_size configurable
> but it's hard-limited to ((1L << 31) - 1) * 8L [1]
>
> [1] https://github.com/apache/spark/blob/master/core/src/
> main/java/org/apache/spark/memory/TaskMemoryManager.java
>
> ​Spark-9452 also talks about larger page sizes but I don't know how that
> affects my use case.​ [2]
>
> [2] https://github.com/apache/spark/pull/7891
>
>
> ​The reason provided here is that the on-heap allocator's maximum page
> size is limited by the maximum amount of data that can be stored in a
> long[]​.
> Is it possible to force this specific operation to go off-heap so that it
> can possibly use a bigger page size?
>
>
>
> ​>Babak​
>
>
> *Babak Alipour ,*
> *University of Florida*
>
> On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <
> vadim.semenov@datadoghq.com> wrote:
>
>> Run more smaller executors: change `spark.executor.memory` to 32g and
>> `spark.executor.cores` to 2-4, for example.
>>
>> Changing driver's memory won't help because it doesn't participate in
>> execution.
>>
>> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <ba...@gmail.com>
>> wrote:
>>
>>> Thank you for your replies.
>>>
>>> @Mich, using LIMIT 100 in the query prevents the exception but given the
>>> fact that there's enough memory, I don't think this should happen even
>>> without LIMIT.
>>>
>>> @Vadim, here's the full stack trace:
>>>
>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>>> with more than 17179869176 bytes
>>>         at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskM
>>> emoryManager.java:241)
>>>         at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryCo
>>> nsumer.java:121)
>>>         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>> orter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
>>>         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>> orter.insertRecord(UnsafeExternalSorter.java:396)
>>>         at org.apache.spark.sql.execution.UnsafeExternalRowSorter.inser
>>> tRow(UnsafeExternalRowSorter.java:94)
>>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>> eratedIterator.sort_addToSorter$(Unknown Source)
>>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>> eratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
>>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>> eratedIterator.processNext(Unknown Source)
>>>         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(B
>>> ufferedRowIterator.java:43)
>>>         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfu
>>> n$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:40
>>> 8)
>>>         at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.w
>>> rite(BypassMergeSortShuffleWriter.java:125)
>>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>> Task.scala:79)
>>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>> Task.scala:47)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.s
>>> cala:274)
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> I'm running spark in local mode so there is only one executor, the
>>> driver and spark.driver.memory is set to 64g. Changing the driver's memory
>>> doesn't help.
>>>
>>> *Babak Alipour ,*
>>> *University of Florida*
>>>
>>> On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov <
>>> vadim.semenov@datadoghq.com> wrote:
>>>
>>>> Can you post the whole exception stack trace?
>>>> What are your executor memory settings?
>>>>
>>>> Right now I assume that it happens in UnsafeExternalRowSorter ->
>>>> UnsafeExternalSorter:insertRecord
>>>>
>>>> Running more executors with lower `spark.executor.memory` should help.
>>>>
>>>>
>>>> On Fri, Sep 30, 2016 at 12:57 PM, Babak Alipour <
>>>> babak.alipour@gmail.com> wrote:
>>>>
>>>>> Greetings everyone,
>>>>>
>>>>> I'm trying to read a single field of a Hive table stored as Parquet in
>>>>> Spark (~140GB for the entire table, this single field should be just a few
>>>>> GB) and look at the sorted output using the following:
>>>>>
>>>>> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC")
>>>>>
>>>>> ​But this simple line of code gives:
>>>>>
>>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>>>>> with more than 17179869176 bytes
>>>>>
>>>>> Same error for:
>>>>>
>>>>> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
>>>>>
>>>>> and:
>>>>>
>>>>> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
>>>>>
>>>>>
>>>>> I'm running this on a machine with more than 200GB of RAM, running in
>>>>> local mode with spark.driver.memory set to 64g.
>>>>>
>>>>> I do not know why it cannot allocate a big enough page, and why is it
>>>>> trying to allocate such a big page in the first place?
>>>>>
>>>>> I hope someone with more knowledge of Spark can shed some light on
>>>>> this. Thank you!
>>>>>
>>>>>
>>>>> *​Best regards,​*
>>>>> *Babak Alipour ,*
>>>>> *University of Florida*
>>>>>
>>>>
>>>>
>>>
>>
>

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

Posted by Babak Alipour <ba...@gmail.com>.
To add one more note, I tried running more smaller executors each with
32-64g memory and executor.cores 2-4 (with 2 workers as well) and I'm still
getting the same exception:

java.lang.IllegalArgumentException: Cannot allocate a page with more than
17179869176 bytes
        at
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241)
        at
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)
        at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:343)
        at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:393)
        at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
        at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
Source)
        at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
        at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
        at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

>Babak

*Babak Alipour ,*
*University of Florida*

On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour <ba...@gmail.com>
wrote:

> Do you mean running a multi-JVM 'cluster' on the single machine? How would
> that affect performance/memory-consumption? If a multi-JVM setup can
> handle such a large input, then why can't a single-JVM break down the job
> into smaller tasks?
>
> I also found that SPARK-9411 mentions making the page_size configurable
> but it's hard-limited to ((1L << 31) - 1) * 8L [1]
>
> [1] https://github.com/apache/spark/blob/master/core/src/
> main/java/org/apache/spark/memory/TaskMemoryManager.java
>
> ​Spark-9452 also talks about larger page sizes but I don't know how that
> affects my use case.​ [2]
>
> [2] https://github.com/apache/spark/pull/7891
>
>
> ​The reason provided here is that the on-heap allocator's maximum page
> size is limited by the maximum amount of data that can be stored in a
> long[]​.
> Is it possible to force this specific operation to go off-heap so that it
> can possibly use a bigger page size?
>
>
>
> ​>Babak​
>
>
> *Babak Alipour ,*
> *University of Florida*
>
> On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <
> vadim.semenov@datadoghq.com> wrote:
>
>> Run more smaller executors: change `spark.executor.memory` to 32g and
>> `spark.executor.cores` to 2-4, for example.
>>
>> Changing driver's memory won't help because it doesn't participate in
>> execution.
>>
>> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <ba...@gmail.com>
>> wrote:
>>
>>> Thank you for your replies.
>>>
>>> @Mich, using LIMIT 100 in the query prevents the exception but given the
>>> fact that there's enough memory, I don't think this should happen even
>>> without LIMIT.
>>>
>>> @Vadim, here's the full stack trace:
>>>
>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>>> with more than 17179869176 bytes
>>>         at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskM
>>> emoryManager.java:241)
>>>         at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryCo
>>> nsumer.java:121)
>>>         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>> orter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
>>>         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>> orter.insertRecord(UnsafeExternalSorter.java:396)
>>>         at org.apache.spark.sql.execution.UnsafeExternalRowSorter.inser
>>> tRow(UnsafeExternalRowSorter.java:94)
>>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>> eratedIterator.sort_addToSorter$(Unknown Source)
>>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>> eratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
>>>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>> eratedIterator.processNext(Unknown Source)
>>>         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(B
>>> ufferedRowIterator.java:43)
>>>         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfu
>>> n$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:40
>>> 8)
>>>         at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.w
>>> rite(BypassMergeSortShuffleWriter.java:125)
>>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>> Task.scala:79)
>>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>> Task.scala:47)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.s
>>> cala:274)
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> I'm running spark in local mode so there is only one executor, the
>>> driver and spark.driver.memory is set to 64g. Changing the driver's memory
>>> doesn't help.
>>>
>>> *Babak Alipour ,*
>>> *University of Florida*
>>>
>>> On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov <
>>> vadim.semenov@datadoghq.com> wrote:
>>>
>>>> Can you post the whole exception stack trace?
>>>> What are your executor memory settings?
>>>>
>>>> Right now I assume that it happens in UnsafeExternalRowSorter ->
>>>> UnsafeExternalSorter:insertRecord
>>>>
>>>> Running more executors with lower `spark.executor.memory` should help.
>>>>
>>>>
>>>> On Fri, Sep 30, 2016 at 12:57 PM, Babak Alipour <
>>>> babak.alipour@gmail.com> wrote:
>>>>
>>>>> Greetings everyone,
>>>>>
>>>>> I'm trying to read a single field of a Hive table stored as Parquet in
>>>>> Spark (~140GB for the entire table, this single field should be just a few
>>>>> GB) and look at the sorted output using the following:
>>>>>
>>>>> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC")
>>>>>
>>>>> ​But this simple line of code gives:
>>>>>
>>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>>>>> with more than 17179869176 bytes
>>>>>
>>>>> Same error for:
>>>>>
>>>>> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
>>>>>
>>>>> and:
>>>>>
>>>>> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
>>>>>
>>>>>
>>>>> I'm running this on a machine with more than 200GB of RAM, running in
>>>>> local mode with spark.driver.memory set to 64g.
>>>>>
>>>>> I do not know why it cannot allocate a big enough page, and why is it
>>>>> trying to allocate such a big page in the first place?
>>>>>
>>>>> I hope someone with more knowledge of Spark can shed some light on
>>>>> this. Thank you!
>>>>>
>>>>>
>>>>> *​Best regards,​*
>>>>> *Babak Alipour ,*
>>>>> *University of Florida*
>>>>>
>>>>
>>>>
>>>
>>
>