You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Abhimanyu Kumar Singh <ab...@gmail.com> on 2022/01/26 15:32:24 UTC

[Spark UDF]: Where does UDF stores temporary Arrays/Sets

I'm doing some complex operations inside spark UDF (parsing huge XML).

Dataframe:
| value |
| Content of XML File 1 |
| Content of XML File 2 |
| Content of XML File N |

val df = Dataframe.select(UDF_to_parse_xml(value))

UDF looks something like:

val XMLelements : Array[MyClass1] = getXMLelements(xmlContent)
val myResult: Array[MyClass2] = XMLelements.map(myfunction).distinct

Parsing requires creation and de-duplication of arrays from the XML
containing
around 0.1 million elements (consisting of MyClass(Strings, Maps, Integers,
.... )).

In the Spark UI "executor memory used" is barely 60-70 MB. But still Spark
processing fails
with *ExecutorLostFailure *error for XMLs of size around 2GB.
When I increase the executor size (say 15GB to 25 GB) it works fine. One
partition can contain only
one XML file (with max size 2GB) and 1 task/executor runs in parallel.

*My question is which memory is being used by UDF for storing arrays, maps
or sets while parsing?*
*And how can I configure it?*

Should I increase *spark*.*memory*.*offHeap*.size,
spark.yarn.executor.memoryOverhead or spark.executor.memoryOverhead?

Thanks a lot,
Abhimanyu

PS: I know I shouldn't use UDF this way, but I don't have any other
alternative here.

Re: [Spark UDF]: Where does UDF stores temporary Arrays/Sets

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

Can you please try to see if you can increase the number of cores per task,
and therefore give each task more memory per executor?

I do not understand what is the XML, what is the data in it, and what is
the problem that you are trying to solve writing UDF's to parse XML. So
maybe we are not actually solving the problem and just addressing the issue.


Regards,
Gourav Sengupta

On Wed, Jan 26, 2022 at 4:07 PM Sean Owen <sr...@gmail.com> wrote:

> Really depends on what your UDF is doing. You could read 2GB of XML into
> much more than that as a DOM representation in memory.
> Remember 15GB of executor memory is shared across tasks.
> You need to get a handle on what memory your code is using to begin with
> to start to reason about whether that's enough, first.
>
> On Wed, Jan 26, 2022 at 10:03 AM Abhimanyu Kumar Singh <
> abhimanyu.kr.singh00@gmail.com> wrote:
>
>> Thanks for your quick response.
>>
>> For some reasons I can't use spark-xml (schema related issue).
>>
>> I've tried reducing number of tasks per executor by increasing the number
>> of executors, but it still throws same error.
>>
>> I can't understand why does even 15gb of executor memory is not
>> sufficient to parse just 2gb XML file.
>> How can I check the max amount of JVM memory utilised for each task?
>>
>> Do I need to tweak some other configurations for increasing JVM memory
>> rather than spark.executor.memory?
>>
>> On Wed, Jan 26, 2022, 9:23 PM Sean Owen <sr...@gmail.com> wrote:
>>
>>> Executor memory used shows data that is cached, not the VM usage. You're
>>> running out of memory somewhere, likely in your UDF, which probably parses
>>> massive XML docs as a DOM first or something. Use more memory, fewer tasks
>>> per executor, or consider using spark-xml if you are really just parsing
>>> pieces of it. It'll be more efficient.
>>>
>>> On Wed, Jan 26, 2022 at 9:47 AM Abhimanyu Kumar Singh <
>>> abhimanyu.kr.singh00@gmail.com> wrote:
>>>
>>>> I'm doing some complex operations inside spark UDF (parsing huge XML).
>>>>
>>>> Dataframe:
>>>> | value |
>>>> | Content of XML File 1 |
>>>> | Content of XML File 2 |
>>>> | Content of XML File N |
>>>>
>>>> val df = Dataframe.select(UDF_to_parse_xml(value))
>>>>
>>>> UDF looks something like:
>>>>
>>>> val XMLelements : Array[MyClass1] = getXMLelements(xmlContent)
>>>> val myResult: Array[MyClass2] = XMLelements.map(myfunction).distinct
>>>>
>>>> Parsing requires creation and de-duplication of arrays from the XML
>>>> containing
>>>> around 0.1 million elements (consisting of MyClass(Strings, Maps,
>>>> Integers, .... )).
>>>>
>>>> In the Spark UI "executor memory used" is barely 60-70 MB. But still
>>>> Spark processing fails
>>>> with *ExecutorLostFailure *error for XMLs of size around 2GB.
>>>> When I increase the executor size (say 15GB to 25 GB) it works fine.
>>>> One partition can contain only
>>>> one XML file (with max size 2GB) and 1 task/executor runs in parallel.
>>>>
>>>> *My question is which memory is being used by UDF for storing arrays,
>>>> maps or sets while parsing?*
>>>> *And how can I configure it?*
>>>>
>>>> Should I increase *spark*.*memory*.*offHeap*.size,
>>>> spark.yarn.executor.memoryOverhead or spark.executor.memoryOverhead?
>>>>
>>>> Thanks a lot,
>>>> Abhimanyu
>>>>
>>>> PS: I know I shouldn't use UDF this way, but I don't have any other
>>>> alternative here.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

Re: [Spark UDF]: Where does UDF stores temporary Arrays/Sets

Posted by Sean Owen <sr...@gmail.com>.
Really depends on what your UDF is doing. You could read 2GB of XML into
much more than that as a DOM representation in memory.
Remember 15GB of executor memory is shared across tasks.
You need to get a handle on what memory your code is using to begin with to
start to reason about whether that's enough, first.

On Wed, Jan 26, 2022 at 10:03 AM Abhimanyu Kumar Singh <
abhimanyu.kr.singh00@gmail.com> wrote:

> Thanks for your quick response.
>
> For some reasons I can't use spark-xml (schema related issue).
>
> I've tried reducing number of tasks per executor by increasing the number
> of executors, but it still throws same error.
>
> I can't understand why does even 15gb of executor memory is not sufficient
> to parse just 2gb XML file.
> How can I check the max amount of JVM memory utilised for each task?
>
> Do I need to tweak some other configurations for increasing JVM memory
> rather than spark.executor.memory?
>
> On Wed, Jan 26, 2022, 9:23 PM Sean Owen <sr...@gmail.com> wrote:
>
>> Executor memory used shows data that is cached, not the VM usage. You're
>> running out of memory somewhere, likely in your UDF, which probably parses
>> massive XML docs as a DOM first or something. Use more memory, fewer tasks
>> per executor, or consider using spark-xml if you are really just parsing
>> pieces of it. It'll be more efficient.
>>
>> On Wed, Jan 26, 2022 at 9:47 AM Abhimanyu Kumar Singh <
>> abhimanyu.kr.singh00@gmail.com> wrote:
>>
>>> I'm doing some complex operations inside spark UDF (parsing huge XML).
>>>
>>> Dataframe:
>>> | value |
>>> | Content of XML File 1 |
>>> | Content of XML File 2 |
>>> | Content of XML File N |
>>>
>>> val df = Dataframe.select(UDF_to_parse_xml(value))
>>>
>>> UDF looks something like:
>>>
>>> val XMLelements : Array[MyClass1] = getXMLelements(xmlContent)
>>> val myResult: Array[MyClass2] = XMLelements.map(myfunction).distinct
>>>
>>> Parsing requires creation and de-duplication of arrays from the XML
>>> containing
>>> around 0.1 million elements (consisting of MyClass(Strings, Maps,
>>> Integers, .... )).
>>>
>>> In the Spark UI "executor memory used" is barely 60-70 MB. But still
>>> Spark processing fails
>>> with *ExecutorLostFailure *error for XMLs of size around 2GB.
>>> When I increase the executor size (say 15GB to 25 GB) it works fine. One
>>> partition can contain only
>>> one XML file (with max size 2GB) and 1 task/executor runs in parallel.
>>>
>>> *My question is which memory is being used by UDF for storing arrays,
>>> maps or sets while parsing?*
>>> *And how can I configure it?*
>>>
>>> Should I increase *spark*.*memory*.*offHeap*.size,
>>> spark.yarn.executor.memoryOverhead or spark.executor.memoryOverhead?
>>>
>>> Thanks a lot,
>>> Abhimanyu
>>>
>>> PS: I know I shouldn't use UDF this way, but I don't have any other
>>> alternative here.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>

Re: [Spark UDF]: Where does UDF stores temporary Arrays/Sets

Posted by Abhimanyu Kumar Singh <ab...@gmail.com>.
Thanks for your quick response.

For some reasons I can't use spark-xml (schema related issue).

I've tried reducing number of tasks per executor by increasing the number
of executors, but it still throws same error.

I can't understand why does even 15gb of executor memory is not sufficient
to parse just 2gb XML file.
How can I check the max amount of JVM memory utilised for each task?

Do I need to tweak some other configurations for increasing JVM memory
rather than spark.executor.memory?

On Wed, Jan 26, 2022, 9:23 PM Sean Owen <sr...@gmail.com> wrote:

> Executor memory used shows data that is cached, not the VM usage. You're
> running out of memory somewhere, likely in your UDF, which probably parses
> massive XML docs as a DOM first or something. Use more memory, fewer tasks
> per executor, or consider using spark-xml if you are really just parsing
> pieces of it. It'll be more efficient.
>
> On Wed, Jan 26, 2022 at 9:47 AM Abhimanyu Kumar Singh <
> abhimanyu.kr.singh00@gmail.com> wrote:
>
>> I'm doing some complex operations inside spark UDF (parsing huge XML).
>>
>> Dataframe:
>> | value |
>> | Content of XML File 1 |
>> | Content of XML File 2 |
>> | Content of XML File N |
>>
>> val df = Dataframe.select(UDF_to_parse_xml(value))
>>
>> UDF looks something like:
>>
>> val XMLelements : Array[MyClass1] = getXMLelements(xmlContent)
>> val myResult: Array[MyClass2] = XMLelements.map(myfunction).distinct
>>
>> Parsing requires creation and de-duplication of arrays from the XML
>> containing
>> around 0.1 million elements (consisting of MyClass(Strings, Maps,
>> Integers, .... )).
>>
>> In the Spark UI "executor memory used" is barely 60-70 MB. But still
>> Spark processing fails
>> with *ExecutorLostFailure *error for XMLs of size around 2GB.
>> When I increase the executor size (say 15GB to 25 GB) it works fine. One
>> partition can contain only
>> one XML file (with max size 2GB) and 1 task/executor runs in parallel.
>>
>> *My question is which memory is being used by UDF for storing arrays,
>> maps or sets while parsing?*
>> *And how can I configure it?*
>>
>> Should I increase *spark*.*memory*.*offHeap*.size,
>> spark.yarn.executor.memoryOverhead or spark.executor.memoryOverhead?
>>
>> Thanks a lot,
>> Abhimanyu
>>
>> PS: I know I shouldn't use UDF this way, but I don't have any other
>> alternative here.
>>
>>
>>
>>
>>
>>
>>
>>

Re: [Spark UDF]: Where does UDF stores temporary Arrays/Sets

Posted by Sean Owen <sr...@gmail.com>.
Executor memory used shows data that is cached, not the VM usage. You're
running out of memory somewhere, likely in your UDF, which probably parses
massive XML docs as a DOM first or something. Use more memory, fewer tasks
per executor, or consider using spark-xml if you are really just parsing
pieces of it. It'll be more efficient.

On Wed, Jan 26, 2022 at 9:47 AM Abhimanyu Kumar Singh <
abhimanyu.kr.singh00@gmail.com> wrote:

> I'm doing some complex operations inside spark UDF (parsing huge XML).
>
> Dataframe:
> | value |
> | Content of XML File 1 |
> | Content of XML File 2 |
> | Content of XML File N |
>
> val df = Dataframe.select(UDF_to_parse_xml(value))
>
> UDF looks something like:
>
> val XMLelements : Array[MyClass1] = getXMLelements(xmlContent)
> val myResult: Array[MyClass2] = XMLelements.map(myfunction).distinct
>
> Parsing requires creation and de-duplication of arrays from the XML
> containing
> around 0.1 million elements (consisting of MyClass(Strings, Maps,
> Integers, .... )).
>
> In the Spark UI "executor memory used" is barely 60-70 MB. But still Spark
> processing fails
> with *ExecutorLostFailure *error for XMLs of size around 2GB.
> When I increase the executor size (say 15GB to 25 GB) it works fine. One
> partition can contain only
> one XML file (with max size 2GB) and 1 task/executor runs in parallel.
>
> *My question is which memory is being used by UDF for storing arrays, maps
> or sets while parsing?*
> *And how can I configure it?*
>
> Should I increase *spark*.*memory*.*offHeap*.size,
> spark.yarn.executor.memoryOverhead or spark.executor.memoryOverhead?
>
> Thanks a lot,
> Abhimanyu
>
> PS: I know I shouldn't use UDF this way, but I don't have any other
> alternative here.
>
>
>
>
>
>
>
>