You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sergey Ivanychev <se...@gmail.com> on 2021/11/03 14:23:19 UTC

PySpark: toPandas() vs collect() execution graph differences

Hi, 

Spark 3.1.2 K8s.

I encountered OOM error while trying to create a Pandas DataFrame from Spark DataFrame. My Spark driver has 60G of ram, but the executors are tiny compared to that (8G)

If I do `spark.table(…).limit(1000000).collect()` I get the following plan




If I do `spark.table(…).limit(1000000).toPandas()` I get a more complicated plan with an extra shuffle



IIUC, in the `toPandas` case all the data gets shuffled to a single executor that fails with OOM, which doesn’t happen in `collect` case. This does it work like that? How do I collect a large dataset that fits into memory of the driver?

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

I am copying Dr. Zaharia in this email as I am quoting from his book (once
again I may be wrong):
Chapter 5: Basic Structured Operations >> Creating Rows

You can create rows by manually instantiating a Row object with the values
that belong in each column. It’s important to note that only DataFrames
have schemas. Rows themselves do not have schemas. This means that if you
create a Row manually, you must specify the values in the same order as the
schema of the DataFrame to which they might be appended (we will see this
when we discuss creating DataFrames):

Chapter 6: Working with different types of data
Starting this Python process is expensive, but the real cost is in
serializing the data to Python. This is costly for two reasons: it is an
expensive computation, but also, after the data enters Python, Spark cannot
manage the memory of the worker. This means that you could potentially
cause a worker to fail if it becomes resource constrained (because both the
JVM and Python are competing for memory on the same machine).

Chapter 18: Monitoring and Debugging (as Sean was mentioning this is about
Driver OOM error)
Issues with JVMs running out of memory can happen if you are using another
language binding, such as Python, due to data conversion between the two
requiring too much memory in the JVM. Try to see whether your issue is
specific to your chosen language and bring back less data to the driver
node, or write it to a file instead of bringing it back as in-memory
objects.

Regards,
Gourav Sengupta


On Wed, Nov 3, 2021 at 10:09 PM Sergey Ivanychev <se...@gmail.com>
wrote:

> I want to further clarify the use case I have: an ML engineer collects
> data so as to use it for training an ML model. The driver is created within
> Jupiter notebook and has 64G of ram for fetching the training set and
> feeding it to the model. Naturally, in this case executors shouldn’t be as
> big as the driver.
>
> Currently, the best solution I found is to write the dataframe to S3, and
> then read it via pd.read_parquet.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mi...@gmail.com>
> написал(а):
>
> 
> Thanks for clarification on the koalas case.
>
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
> the data gets shuffled to a single executor that fails with OOM,....
>
> I still believe that this may be related to the way k8s handles shuffling.
> In a balanced k8s cluster this could be avoided which does not seem to be
> the case here as the so called driver node has 8 times more RAM than the
> other nodes.
>
> HTH
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sr...@gmail.com> wrote:
>
>> I think you're talking about koalas, which is in Spark 3.2, but that is
>> unrelated to toPandas(), nor to the question of how it differs from
>> collect().
>> Shuffle is also unrelated.
>>
>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> As I understood in the previous versions of Spark the data could not be
>>> processed and stored in Pandas data frames in a distributed mode as these
>>> data frames store data in RAM which is the driver in this case.
>>> However, I was under the impression that this limitation no longer
>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do you
>>> expect the process to confine itself to the master node? What will happen
>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>> cluster) and run the job again?
>>>
>>> Worth noting that the current Spark on k8s  does not support external
>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>> These are
>>>
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>
>>>
>>> The idea is to use dynamic resource allocation where the driver tracks
>>> the shuffle files and evicts only executors not storing active shuffle
>>> files. So in a nutshell these shuffle files are stored in the executors
>>> themselves in the absence of the external shuffle. The model works on the
>>> basis of the "one-container-per-Pod" model
>>> <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that for
>>> each node of the cluster there will be one node running the driver and each
>>> remaining node running one executor each.
>>>
>>>
>>>
>>> HTH
>>> ,
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Hyukjin Kwon <gu...@gmail.com>.
To add some more, if the number of rows to collect is large,
DataFrame.collect can be slower because it launches multiple Spark jobs
sequentially.

Given that DataFrame.toPandas does not take how many rows to collect, it's
controversial to apply the same optimization of DataFrame.collect to here.

We could have a configuration to enable and disable but the implementation
of this in DataFrame.toPandas would be complicated due to existing
optimization such as Arrow. Haven't taken a deeper look though but my guts
say it's not worthwhile.

On Sat, Nov 13, 2021 at 12:05 PM Hyukjin Kwon <gu...@gmail.com> wrote:

> Thanks for pinging me Sean.
>
> Yes, there's an optimization on DataFrame.collect which tries to collect
> few first partitioms and see if the number of rows are found (and repeat).
>
> DataFrame.toPandas does not have such optimization.
>
> I suspect that the shuffle isn't actual shuffle but just collects local
> limits on executors to one executor to calculate global limit.
>
> On Fri, Nov 12, 2021 at 11:16 PM Sean Owen <sr...@gmail.com> wrote:
>
>> Hyukjin can you weigh in?
>> Is this exchange due to something in your operations or clearly unique to
>> the toPandas operation?
>> I didn't think it worked that way, but maybe there is some good reason it
>> does.
>>
>> On Fri, Nov 12, 2021 at 7:34 AM Sergey Ivanychev <
>> sergeyivanychev@gmail.com> wrote:
>>
>>> Hi Sean,
>>>
>>> According to the plan I’m observing, this is what happens indeed.
>>> There’s exchange operation that sends data to a single partition/task in
>>> toPandas() + PyArrow enabled case.
>>>
>>> 12 нояб. 2021 г., в 16:31, Sean Owen <sr...@gmail.com> написал(а):
>>>
>>> Yes, none of the responses are addressing your question.
>>> I do not think it's a bug necessarily; do you end up with one partition
>>> in your execution somewhere?
>>>
>>> On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev <
>>> sergeyivanychev@gmail.com> wrote:
>>>
>>>> Of course if I give 64G of ram to each executor they will work. But
>>>> what’s the point? Collecting results in the driver should cause a high RAM
>>>> usage in the driver and that’s what is happening in collect() case. In the
>>>> case where pyarrow serialization is enabled all the data is being collected
>>>> on a single executor, which is clearly a wrong way to collect the result on
>>>> the driver.
>>>>
>>>> I guess I’ll open an issue about it in Spark Jira. It clearly looks
>>>> like a bug.
>>>>
>>>> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh <mi...@gmail.com>
>>>> написал(а):
>>>>
>>>> OK, your findings do not imply those settings are incorrect. Those
>>>> settings will work if you set-up your k8s cluster in peer-to-peer mode with
>>>> equal amounts of RAM for each node which is common practice.
>>>>
>>>> HTH
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev <
>>>> sergeyivanychev@gmail.com> wrote:
>>>>
>>>>> Yes, in fact those are the settings that cause this behaviour. If set
>>>>> to false, everything goes fine since the implementation in spark sources in
>>>>> this case is
>>>>>
>>>>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>>>>
>>>>> Best regards,
>>>>>
>>>>>
>>>>> Sergey Ivanychev
>>>>>
>>>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mi...@gmail.com>
>>>>> написал(а):
>>>>>
>>>>> 
>>>>> Have you tried the following settings:
>>>>>
>>>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>>>>
>>>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>>>>
>>>>> HTH
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Ok so it boils down on how spark does create toPandas() DF under the
>>>>>> bonnet. How many executors are involved in k8s cluster. In this model spark
>>>>>> will create executors = no of nodes - 1
>>>>>>
>>>>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <
>>>>>> sergeyivanychev@gmail.com> wrote:
>>>>>>
>>>>>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>>>>>
>>>>>>> I shared the screenshot with the plan in the first email. In the
>>>>>>> collect() case the data gets fetched to the driver without problems.
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>>
>>>>>>> Sergey Ivanychev
>>>>>>>
>>>>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com>
>>>>>>> написал(а):
>>>>>>>
>>>>>>> Just to confirm with Collect() alone, this is all on the driver?
>>>>>>>
>>>>>>> --
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Hyukjin Kwon <gu...@gmail.com>.
Thanks for pinging me Sean.

Yes, there's an optimization on DataFrame.collect which tries to collect
few first partitioms and see if the number of rows are found (and repeat).

DataFrame.toPandas does not have such optimization.

I suspect that the shuffle isn't actual shuffle but just collects local
limits on executors to one executor to calculate global limit.

On Fri, Nov 12, 2021 at 11:16 PM Sean Owen <sr...@gmail.com> wrote:

> Hyukjin can you weigh in?
> Is this exchange due to something in your operations or clearly unique to
> the toPandas operation?
> I didn't think it worked that way, but maybe there is some good reason it
> does.
>
> On Fri, Nov 12, 2021 at 7:34 AM Sergey Ivanychev <
> sergeyivanychev@gmail.com> wrote:
>
>> Hi Sean,
>>
>> According to the plan I’m observing, this is what happens indeed. There’s
>> exchange operation that sends data to a single partition/task in toPandas()
>> + PyArrow enabled case.
>>
>> 12 нояб. 2021 г., в 16:31, Sean Owen <sr...@gmail.com> написал(а):
>>
>> Yes, none of the responses are addressing your question.
>> I do not think it's a bug necessarily; do you end up with one partition
>> in your execution somewhere?
>>
>> On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev <
>> sergeyivanychev@gmail.com> wrote:
>>
>>> Of course if I give 64G of ram to each executor they will work. But
>>> what’s the point? Collecting results in the driver should cause a high RAM
>>> usage in the driver and that’s what is happening in collect() case. In the
>>> case where pyarrow serialization is enabled all the data is being collected
>>> on a single executor, which is clearly a wrong way to collect the result on
>>> the driver.
>>>
>>> I guess I’ll open an issue about it in Spark Jira. It clearly looks like
>>> a bug.
>>>
>>> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh <mi...@gmail.com>
>>> написал(а):
>>>
>>> OK, your findings do not imply those settings are incorrect. Those
>>> settings will work if you set-up your k8s cluster in peer-to-peer mode with
>>> equal amounts of RAM for each node which is common practice.
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev <
>>> sergeyivanychev@gmail.com> wrote:
>>>
>>>> Yes, in fact those are the settings that cause this behaviour. If set
>>>> to false, everything goes fine since the implementation in spark sources in
>>>> this case is
>>>>
>>>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>>>
>>>> Best regards,
>>>>
>>>>
>>>> Sergey Ivanychev
>>>>
>>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mi...@gmail.com>
>>>> написал(а):
>>>>
>>>> 
>>>> Have you tried the following settings:
>>>>
>>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>>>
>>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>>>
>>>> HTH
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Ok so it boils down on how spark does create toPandas() DF under the
>>>>> bonnet. How many executors are involved in k8s cluster. In this model spark
>>>>> will create executors = no of nodes - 1
>>>>>
>>>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <
>>>>> sergeyivanychev@gmail.com> wrote:
>>>>>
>>>>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>>>>
>>>>>> I shared the screenshot with the plan in the first email. In the
>>>>>> collect() case the data gets fetched to the driver without problems.
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>>
>>>>>> Sergey Ivanychev
>>>>>>
>>>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com>
>>>>>> написал(а):
>>>>>>
>>>>>> Just to confirm with Collect() alone, this is all on the driver?
>>>>>>
>>>>>> --
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sean Owen <sr...@gmail.com>.
Hyukjin can you weigh in?
Is this exchange due to something in your operations or clearly unique to
the toPandas operation?
I didn't think it worked that way, but maybe there is some good reason it
does.

On Fri, Nov 12, 2021 at 7:34 AM Sergey Ivanychev <se...@gmail.com>
wrote:

> Hi Sean,
>
> According to the plan I’m observing, this is what happens indeed. There’s
> exchange operation that sends data to a single partition/task in toPandas()
> + PyArrow enabled case.
>
> 12 нояб. 2021 г., в 16:31, Sean Owen <sr...@gmail.com> написал(а):
>
> Yes, none of the responses are addressing your question.
> I do not think it's a bug necessarily; do you end up with one partition in
> your execution somewhere?
>
> On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev <
> sergeyivanychev@gmail.com> wrote:
>
>> Of course if I give 64G of ram to each executor they will work. But
>> what’s the point? Collecting results in the driver should cause a high RAM
>> usage in the driver and that’s what is happening in collect() case. In the
>> case where pyarrow serialization is enabled all the data is being collected
>> on a single executor, which is clearly a wrong way to collect the result on
>> the driver.
>>
>> I guess I’ll open an issue about it in Spark Jira. It clearly looks like
>> a bug.
>>
>> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh <mi...@gmail.com>
>> написал(а):
>>
>> OK, your findings do not imply those settings are incorrect. Those
>> settings will work if you set-up your k8s cluster in peer-to-peer mode with
>> equal amounts of RAM for each node which is common practice.
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev <se...@gmail.com>
>> wrote:
>>
>>> Yes, in fact those are the settings that cause this behaviour. If set to
>>> false, everything goes fine since the implementation in spark sources in
>>> this case is
>>>
>>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mi...@gmail.com>
>>> написал(а):
>>>
>>> 
>>> Have you tried the following settings:
>>>
>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>>
>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>>
>>> HTH
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mi...@gmail.com>
>>> wrote:
>>>
>>>> Ok so it boils down on how spark does create toPandas() DF under the
>>>> bonnet. How many executors are involved in k8s cluster. In this model spark
>>>> will create executors = no of nodes - 1
>>>>
>>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <
>>>> sergeyivanychev@gmail.com> wrote:
>>>>
>>>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>>>
>>>>> I shared the screenshot with the plan in the first email. In the
>>>>> collect() case the data gets fetched to the driver without problems.
>>>>>
>>>>> Best regards,
>>>>>
>>>>>
>>>>> Sergey Ivanychev
>>>>>
>>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com>
>>>>> написал(а):
>>>>>
>>>>> Just to confirm with Collect() alone, this is all on the driver?
>>>>>
>>>>> --
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>
>>
>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sergey Ivanychev <se...@gmail.com>.
Hi Sean,

According to the plan I’m observing, this is what happens indeed. There’s exchange operation that sends data to a single partition/task in toPandas() + PyArrow enabled case.

> 12 нояб. 2021 г., в 16:31, Sean Owen <sr...@gmail.com> написал(а):
> 
> Yes, none of the responses are addressing your question.
> I do not think it's a bug necessarily; do you end up with one partition in your execution somewhere?
> 
> On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev <sergeyivanychev@gmail.com <ma...@gmail.com>> wrote:
> Of course if I give 64G of ram to each executor they will work. But what’s the point? Collecting results in the driver should cause a high RAM usage in the driver and that’s what is happening in collect() case. In the case where pyarrow serialization is enabled all the data is being collected on a single executor, which is clearly a wrong way to collect the result on the driver.
> 
> I guess I’ll open an issue about it in Spark Jira. It clearly looks like a bug.
> 
>> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> написал(а):
>> 
>> OK, your findings do not imply those settings are incorrect. Those settings will work if you set-up your k8s cluster in peer-to-peer mode with equal amounts of RAM for each node which is common practice.
>> 
>> HTH
>> 
>> 
>>    view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>  
>> 
>> 
>> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev <sergeyivanychev@gmail.com <ma...@gmail.com>> wrote:
>> Yes, in fact those are the settings that cause this behaviour. If set to false, everything goes fine since the implementation in spark sources in this case is
>> 
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> написал(а):
>>> 
>>> 
>>> Have you tried the following settings:
>>> 
>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>> 
>>> HTH
>>> 
>>>    view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>  
>>> 
>>> 
>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> wrote:
>>> Ok so it boils down on how spark does create toPandas() DF under the bonnet. How many executors are involved in k8s cluster. In this model spark will create executors = no of nodes - 1
>>> 
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <sergeyivanychev@gmail.com <ma...@gmail.com>> wrote:
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>> 
>>> I shared the screenshot with the plan in the first email. In the collect() case the data gets fetched to the driver without problems.
>>> 
>>> Best regards,
>>> 
>>> 
>>> Sergey Ivanychev
>>> 
>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> написал(а):
>>>> 
>>> 
>>>> Just to confirm with Collect() alone, this is all on the driver?
>>> -- 
>>> 
>>> 
>>>    view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>  
> 


Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sean Owen <sr...@gmail.com>.
Yes, none of the responses are addressing your question.
I do not think it's a bug necessarily; do you end up with one partition in
your execution somewhere?

On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev <se...@gmail.com>
wrote:

> Of course if I give 64G of ram to each executor they will work. But what’s
> the point? Collecting results in the driver should cause a high RAM usage
> in the driver and that’s what is happening in collect() case. In the case
> where pyarrow serialization is enabled all the data is being collected on a
> single executor, which is clearly a wrong way to collect the result on the
> driver.
>
> I guess I’ll open an issue about it in Spark Jira. It clearly looks like a
> bug.
>
> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh <mi...@gmail.com>
> написал(а):
>
> OK, your findings do not imply those settings are incorrect. Those
> settings will work if you set-up your k8s cluster in peer-to-peer mode with
> equal amounts of RAM for each node which is common practice.
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev <se...@gmail.com>
> wrote:
>
>> Yes, in fact those are the settings that cause this behaviour. If set to
>> false, everything goes fine since the implementation in spark sources in
>> this case is
>>
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mi...@gmail.com>
>> написал(а):
>>
>> 
>> Have you tried the following settings:
>>
>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>
>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>
>> HTH
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Ok so it boils down on how spark does create toPandas() DF under the
>>> bonnet. How many executors are involved in k8s cluster. In this model spark
>>> will create executors = no of nodes - 1
>>>
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <se...@gmail.com>
>>> wrote:
>>>
>>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>>
>>>> I shared the screenshot with the plan in the first email. In the
>>>> collect() case the data gets fetched to the driver without problems.
>>>>
>>>> Best regards,
>>>>
>>>>
>>>> Sergey Ivanychev
>>>>
>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com>
>>>> написал(а):
>>>>
>>>> Just to confirm with Collect() alone, this is all on the driver?
>>>>
>>>> --
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>
>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sergey Ivanychev <se...@gmail.com>.
Of course if I give 64G of ram to each executor they will work. But what’s the point? Collecting results in the driver should cause a high RAM usage in the driver and that’s what is happening in collect() case. In the case where pyarrow serialization is enabled all the data is being collected on a single executor, which is clearly a wrong way to collect the result on the driver.

I guess I’ll open an issue about it in Spark Jira. It clearly looks like a bug.

> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh <mi...@gmail.com> написал(а):
> 
> OK, your findings do not imply those settings are incorrect. Those settings will work if you set-up your k8s cluster in peer-to-peer mode with equal amounts of RAM for each node which is common practice.
> 
> HTH
> 
> 
>    view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>  
> 
> 
> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev <sergeyivanychev@gmail.com <ma...@gmail.com>> wrote:
> Yes, in fact those are the settings that cause this behaviour. If set to false, everything goes fine since the implementation in spark sources in this case is
> 
> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
> 
> Best regards,
> 
> 
> Sergey Ivanychev
> 
>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> написал(а):
>> 
>> 
>> Have you tried the following settings:
>> 
>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>> 
>> HTH
>> 
>>    view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. 
>>  
>> 
>> 
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> wrote:
>> Ok so it boils down on how spark does create toPandas() DF under the bonnet. How many executors are involved in k8s cluster. In this model spark will create executors = no of nodes - 1
>> 
>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <sergeyivanychev@gmail.com <ma...@gmail.com>> wrote:
>> > Just to confirm with Collect() alone, this is all on the driver?
>> 
>> I shared the screenshot with the plan in the first email. In the collect() case the data gets fetched to the driver without problems.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> написал(а):
>>> 
>> 
>>> Just to confirm with Collect() alone, this is all on the driver?
>> -- 
>> 
>> 
>>    view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>  


Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Mich Talebzadeh <mi...@gmail.com>.
OK, your findings do not imply those settings are incorrect. Those settings
will work if you set-up your k8s cluster in peer-to-peer mode with equal
amounts of RAM for each node which is common practice.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev <se...@gmail.com>
wrote:

> Yes, in fact those are the settings that cause this behaviour. If set to
> false, everything goes fine since the implementation in spark sources in
> this case is
>
> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mi...@gmail.com>
> написал(а):
>
> 
> Have you tried the following settings:
>
> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Ok so it boils down on how spark does create toPandas() DF under the
>> bonnet. How many executors are involved in k8s cluster. In this model spark
>> will create executors = no of nodes - 1
>>
>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <se...@gmail.com>
>> wrote:
>>
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>
>>> I shared the screenshot with the plan in the first email. In the
>>> collect() case the data gets fetched to the driver without problems.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com>
>>> написал(а):
>>>
>>> Just to confirm with Collect() alone, this is all on the driver?
>>>
>>> --
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

Sorry

Regards,
Gourav Sengupta


On Fri, Nov 12, 2021 at 6:48 AM Sergey Ivanychev <se...@gmail.com>
wrote:

> Hi Gourav,
>
> Please, read my question thoroughly. My problem is with the plan of the
> execution and with the fact that toPandas collects all the data not on the
> driver but on an executor, not with the fact that there’s some memory
> overhead.
>
> I don’t understand how your excerpts answer my question. The chapters
> you’ve shared describe that serialization is costly, that workers can fail
> due to the memory constraints and inter-language serialization.
>
> This is irrelevant to my question — building pandas DataFrame using
> Spark’s collect() works fine and this operation itself involves much
> deserialization of Row objects.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 12 нояб. 2021 г., в 05:05, Gourav Sengupta <go...@gmail.com>
> написал(а):
>
> 
> Hi Sergey,
>
> Please read the excerpts from the book of Dr. Zaharia that I had sent,
> they explain these fundamentals clearly.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev <
> sergeyivanychev@gmail.com> wrote:
>
>> Yes, in fact those are the settings that cause this behaviour. If set to
>> false, everything goes fine since the implementation in spark sources in
>> this case is
>>
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mi...@gmail.com>
>> написал(а):
>>
>> 
>> Have you tried the following settings:
>>
>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>
>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Ok so it boils down on how spark does create toPandas() DF under the
>>> bonnet. How many executors are involved in k8s cluster. In this model spark
>>> will create executors = no of nodes - 1
>>>
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <se...@gmail.com>
>>> wrote:
>>>
>>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>>
>>>> I shared the screenshot with the plan in the first email. In the
>>>> collect() case the data gets fetched to the driver without problems.
>>>>
>>>> Best regards,
>>>>
>>>>
>>>> Sergey Ivanychev
>>>>
>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com>
>>>> написал(а):
>>>>
>>>> Just to confirm with Collect() alone, this is all on the driver?
>>>>
>>>> --
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Georg Heiler <ge...@gmail.com>.
https://stackoverflow.com/questions/46832394/spark-access-first-n-rows-take-vs-limit
might be related

Best,
Georg

Am Fr., 12. Nov. 2021 um 07:48 Uhr schrieb Sergey Ivanychev <
sergeyivanychev@gmail.com>:

> Hi Gourav,
>
> Please, read my question thoroughly. My problem is with the plan of the
> execution and with the fact that toPandas collects all the data not on the
> driver but on an executor, not with the fact that there’s some memory
> overhead.
>
> I don’t understand how your excerpts answer my question. The chapters
> you’ve shared describe that serialization is costly, that workers can fail
> due to the memory constraints and inter-language serialization.
>
> This is irrelevant to my question — building pandas DataFrame using
> Spark’s collect() works fine and this operation itself involves much
> deserialization of Row objects.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 12 нояб. 2021 г., в 05:05, Gourav Sengupta <go...@gmail.com>
> написал(а):
>
> 
> Hi Sergey,
>
> Please read the excerpts from the book of Dr. Zaharia that I had sent,
> they explain these fundamentals clearly.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev <
> sergeyivanychev@gmail.com> wrote:
>
>> Yes, in fact those are the settings that cause this behaviour. If set to
>> false, everything goes fine since the implementation in spark sources in
>> this case is
>>
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mi...@gmail.com>
>> написал(а):
>>
>> 
>> Have you tried the following settings:
>>
>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>
>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Ok so it boils down on how spark does create toPandas() DF under the
>>> bonnet. How many executors are involved in k8s cluster. In this model spark
>>> will create executors = no of nodes - 1
>>>
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <se...@gmail.com>
>>> wrote:
>>>
>>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>>
>>>> I shared the screenshot with the plan in the first email. In the
>>>> collect() case the data gets fetched to the driver without problems.
>>>>
>>>> Best regards,
>>>>
>>>>
>>>> Sergey Ivanychev
>>>>
>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com>
>>>> написал(а):
>>>>
>>>> Just to confirm with Collect() alone, this is all on the driver?
>>>>
>>>> --
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sergey Ivanychev <se...@gmail.com>.
Hi Gourav,

Please, read my question thoroughly. My problem is with the plan of the execution and with the fact that toPandas collects all the data not on the driver but on an executor, not with the fact that there’s some memory overhead.

I don’t understand how your excerpts answer my question. The chapters you’ve shared describe that serialization is costly, that workers can fail due to the memory constraints and inter-language serialization.

This is irrelevant to my question — building pandas DataFrame using Spark’s collect() works fine and this operation itself involves much deserialization of Row objects.

Best regards,


Sergey Ivanychev

> 12 нояб. 2021 г., в 05:05, Gourav Sengupta <go...@gmail.com> написал(а):
> 
> Hi Sergey,
> 
> Please read the excerpts from the book of Dr. Zaharia that I had sent, they explain these fundamentals clearly.
> 
> Regards,
> Gourav Sengupta
> 
> On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev <se...@gmail.com> wrote:
>> Yes, in fact those are the settings that cause this behaviour. If set to false, everything goes fine since the implementation in spark sources in this case is
>> 
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mi...@gmail.com> написал(а):
>>> 
>>> Have you tried the following settings:
>>> 
>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>> 
>>> HTH
>>> 
>>>    view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>  
>>> 
>>> 
>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mi...@gmail.com> wrote:
>>>> Ok so it boils down on how spark does create toPandas() DF under the bonnet. How many executors are involved in k8s cluster. In this model spark will create executors = no of nodes - 1
>>>> 
>>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <se...@gmail.com> wrote:
>>>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>>> 
>>>>> I shared the screenshot with the plan in the first email. In the collect() case the data gets fetched to the driver without problems.
>>>>> 
>>>>> Best regards,
>>>>> 
>>>>> 
>>>>> Sergey Ivanychev
>>>>> 
>>>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com> написал(а):
>>>>> 
>>>>>> Just to confirm with Collect() alone, this is all on the driver?
>>>> -- 
>>>> 
>>>> 
>>>>    view my Linkedin profile
>>>> 
>>>>  
>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Sergey,

Please read the excerpts from the book of Dr. Zaharia that I had sent, they
explain these fundamentals clearly.

Regards,
Gourav Sengupta

On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev <se...@gmail.com>
wrote:

> Yes, in fact those are the settings that cause this behaviour. If set to
> false, everything goes fine since the implementation in spark sources in
> this case is
>
> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mi...@gmail.com>
> написал(а):
>
> 
> Have you tried the following settings:
>
> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Ok so it boils down on how spark does create toPandas() DF under the
>> bonnet. How many executors are involved in k8s cluster. In this model spark
>> will create executors = no of nodes - 1
>>
>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <se...@gmail.com>
>> wrote:
>>
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>
>>> I shared the screenshot with the plan in the first email. In the
>>> collect() case the data gets fetched to the driver without problems.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com>
>>> написал(а):
>>>
>>> Just to confirm with Collect() alone, this is all on the driver?
>>>
>>> --
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sergey Ivanychev <se...@gmail.com>.
Yes, in fact those are the settings that cause this behaviour. If set to false, everything goes fine since the implementation in spark sources in this case is

pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

Best regards,


Sergey Ivanychev

> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh <mi...@gmail.com> написал(а):
> 
> 
> Have you tried the following settings:
> 
> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
> 
> HTH
> 
>    view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>  
> 
> 
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mi...@gmail.com> wrote:
>> Ok so it boils down on how spark does create toPandas() DF under the bonnet. How many executors are involved in k8s cluster. In this model spark will create executors = no of nodes - 1
>> 
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <se...@gmail.com> wrote:
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>> 
>>> I shared the screenshot with the plan in the first email. In the collect() case the data gets fetched to the driver without problems.
>>> 
>>> Best regards,
>>> 
>>> 
>>> Sergey Ivanychev
>>> 
>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com> написал(а):
>>>> 
>>> 
>>>> Just to confirm with Collect() alone, this is all on the driver?
>> -- 
>> 
>> 
>>    view my Linkedin profile
>> 
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>  

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Mich Talebzadeh <mi...@gmail.com>.
Have you tried the following settings:

spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")

HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh <mi...@gmail.com>
wrote:

> Ok so it boils down on how spark does create toPandas() DF under the
> bonnet. How many executors are involved in k8s cluster. In this model spark
> will create executors = no of nodes - 1
>
> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <se...@gmail.com>
> wrote:
>
>> > Just to confirm with Collect() alone, this is all on the driver?
>>
>> I shared the screenshot with the plan in the first email. In the
>> collect() case the data gets fetched to the driver without problems.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com>
>> написал(а):
>>
>> Just to confirm with Collect() alone, this is all on the driver?
>>
>> --
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Mich Talebzadeh <mi...@gmail.com>.
Ok so it boils down on how spark does create toPandas() DF under the
bonnet. How many executors are involved in k8s cluster. In this model spark
will create executors = no of nodes - 1

On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev <se...@gmail.com>
wrote:

> > Just to confirm with Collect() alone, this is all on the driver?
>
> I shared the screenshot with the plan in the first email. In the collect()
> case the data gets fetched to the driver without problems.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com>
> написал(а):
>
> Just to confirm with Collect() alone, this is all on the driver?
>
> --



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sergey Ivanychev <se...@gmail.com>.
> Just to confirm with Collect() alone, this is all on the driver?

I shared the screenshot with the plan in the first email. In the collect() case the data gets fetched to the driver without problems.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh <mi...@gmail.com> написал(а):
> 
> Just to confirm with Collect() alone, this is all on the driver?

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Mich Talebzadeh <mi...@gmail.com>.
Well evidently the indication is that this is happening on the executor and
not on the driver node as assumed. Just to confirm with Collect() alone,
this is all on the driver?

HTH

On Thu, 4 Nov 2021 at 16:10, Sergey Ivanychev <se...@gmail.com>
wrote:

> I’m sure that its running in clientele mode. I don’t want to have the same
> amount of RAM on drivers and executors since there’s no point in giving 64G
> of ram to executors in my case.
> My question is why collect and toPandas actions produce so different
> plans, which cause toPandas to fail on executors.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh <mi...@gmail.com>
> написал(а):
>
> 
>
>
> From your notes ".. IIUC, in the `toPandas` case all the data gets
> shuffled to a single executor that fails with OOM, which doesn’t happen in
> `collect` case. This does it work like that? How do I collect a large
> dataset that fits into memory of the driver?.
>
> The acid test would be to use pandas and ensure that all nodes have the
> same amount of RAM. The assumption here is that the master node has a
> larger amount of RAM that in theory should handle the work. for Jupiter
> with Pandas. You can easily find out which mode Spark is deploying by
> looking at Spark GUI page.
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev <se...@gmail.com>
> wrote:
>
>> I will follow up with the output, but I suppose Jupyter runs in client
>> mode since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same
>> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that
>> difference in execution plans.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh <mi...@gmail.com>
>> написал(а):
>>
>> 
>> Do you have the output for executors from spark GUI, the one that
>> eventually ends up with OOM?
>>
>> Also what does
>>
>> kubectl get pods -n $NAMESPACE
>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
>> $1}'`
>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>
>>
>> say?
>>
>>
>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage
>> spark-submit under the bonnet. The job starts on the driver where the
>> Jupyter notebook is on but the actual job runs on the cluster itself in
>> cluster mode. If your assertion is right (executors don't play much of a
>> role), just run the whole thing in local mode!
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev <se...@gmail.com>
>> wrote:
>>
>>> I want to further clarify the use case I have: an ML engineer collects
>>> data so as to use it for training an ML model. The driver is created within
>>> Jupiter notebook and has 64G of ram for fetching the training set and
>>> feeding it to the model. Naturally, in this case executors shouldn’t be as
>>> big as the driver.
>>>
>>> Currently, the best solution I found is to write the dataframe to S3,
>>> and then read it via pd.read_parquet.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mi...@gmail.com>
>>> написал(а):
>>>
>>> 
>>> Thanks for clarification on the koalas case.
>>>
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
>>> the data gets shuffled to a single executor that fails with OOM,....
>>>
>>> I still believe that this may be related to the way k8s handles
>>> shuffling. In a balanced k8s cluster this could be avoided which does not
>>> seem to be the case here as the so called driver node has 8 times more
>>> RAM than the other nodes.
>>>
>>> HTH
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sr...@gmail.com> wrote:
>>>
>>>> I think you're talking about koalas, which is in Spark 3.2, but that is
>>>> unrelated to toPandas(), nor to the question of how it differs from
>>>> collect().
>>>> Shuffle is also unrelated.
>>>>
>>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> As I understood in the previous versions of Spark the data could not
>>>>> be processed and stored in Pandas data frames in a distributed mode as
>>>>> these data frames store data in RAM which is the driver in this case.
>>>>> However, I was under the impression that this limitation no longer
>>>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do you
>>>>> expect the process to confine itself to the master node? What will happen
>>>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>>>> cluster) and run the job again?
>>>>>
>>>>> Worth noting that the current Spark on k8s  does not support external
>>>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>>>> These are
>>>>>
>>>>>  --conf spark.dynamicAllocation.enabled=true \
>>>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>>>
>>>>>
>>>>> The idea is to use dynamic resource allocation where the driver
>>>>> tracks the shuffle files and evicts only executors not storing active
>>>>> shuffle files. So in a nutshell these shuffle files are stored in the
>>>>> executors themselves in the absence of the external shuffle. The model
>>>>> works on the basis of the "one-container-per-Pod" model
>>>>> <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that
>>>>> for each node of the cluster there will be one node running the driver and
>>>>> each remaining node running one executor each.
>>>>>
>>>>>
>>>>>
>>>>> HTH
>>>>> ,
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>> --



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sergey Ivanychev <se...@gmail.com>.
> did you get to read the excerpts from the book of Dr. Zaharia?

I read what you have shared but didn’t manage to get your point.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 20:38, Gourav Sengupta <go...@gmail.com> написал(а):
> 
> did you get to read the excerpts from the book of Dr. Zaharia?

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

did you get to read the excerpts from the book of Dr. Zaharia?

Regards,
Gourav

On Thu, Nov 4, 2021 at 4:11 PM Sergey Ivanychev <se...@gmail.com>
wrote:

> I’m sure that its running in client mode. I don’t want to have the same
> amount of RAM on drivers and executors since there’s no point in giving 64G
> of ram to executors in my case.
>
> My question is why collect and toPandas actions produce so different
> plans, which cause toPandas to fail on executors.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh <mi...@gmail.com>
> написал(а):
>
> 
>
> From your notes ".. IIUC, in the `toPandas` case all the data gets
> shuffled to a single executor that fails with OOM, which doesn’t happen in
> `collect` case. This does it work like that? How do I collect a large
> dataset that fits into memory of the driver?.
>
> The acid test would be to use pandas and ensure that all nodes have the
> same amount of RAM. The assumption here is that the master node has a
> larger amount of RAM that in theory should handle the work. for Jupiter
> with Pandas. You can easily find out which mode Spark is deploying by
> looking at Spark GUI page.
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev <se...@gmail.com>
> wrote:
>
>> I will follow up with the output, but I suppose Jupyter runs in client
>> mode since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same
>> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that
>> difference in execution plans.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh <mi...@gmail.com>
>> написал(а):
>>
>> 
>> Do you have the output for executors from spark GUI, the one that
>> eventually ends up with OOM?
>>
>> Also what does
>>
>> kubectl get pods -n $NAMESPACE
>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
>> $1}'`
>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>
>>
>> say?
>>
>>
>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage
>> spark-submit under the bonnet. The job starts on the driver where the
>> Jupyter notebook is on but the actual job runs on the cluster itself in
>> cluster mode. If your assertion is right (executors don't play much of a
>> role), just run the whole thing in local mode!
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev <se...@gmail.com>
>> wrote:
>>
>>> I want to further clarify the use case I have: an ML engineer collects
>>> data so as to use it for training an ML model. The driver is created within
>>> Jupiter notebook and has 64G of ram for fetching the training set and
>>> feeding it to the model. Naturally, in this case executors shouldn’t be as
>>> big as the driver.
>>>
>>> Currently, the best solution I found is to write the dataframe to S3,
>>> and then read it via pd.read_parquet.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mi...@gmail.com>
>>> написал(а):
>>>
>>> 
>>> Thanks for clarification on the koalas case.
>>>
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
>>> the data gets shuffled to a single executor that fails with OOM,....
>>>
>>> I still believe that this may be related to the way k8s handles
>>> shuffling. In a balanced k8s cluster this could be avoided which does not
>>> seem to be the case here as the so called driver node has 8 times more
>>> RAM than the other nodes.
>>>
>>> HTH
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sr...@gmail.com> wrote:
>>>
>>>> I think you're talking about koalas, which is in Spark 3.2, but that is
>>>> unrelated to toPandas(), nor to the question of how it differs from
>>>> collect().
>>>> Shuffle is also unrelated.
>>>>
>>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> As I understood in the previous versions of Spark the data could not
>>>>> be processed and stored in Pandas data frames in a distributed mode as
>>>>> these data frames store data in RAM which is the driver in this case.
>>>>> However, I was under the impression that this limitation no longer
>>>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do you
>>>>> expect the process to confine itself to the master node? What will happen
>>>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>>>> cluster) and run the job again?
>>>>>
>>>>> Worth noting that the current Spark on k8s  does not support external
>>>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>>>> These are
>>>>>
>>>>>  --conf spark.dynamicAllocation.enabled=true \
>>>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>>>
>>>>>
>>>>> The idea is to use dynamic resource allocation where the driver
>>>>> tracks the shuffle files and evicts only executors not storing active
>>>>> shuffle files. So in a nutshell these shuffle files are stored in the
>>>>> executors themselves in the absence of the external shuffle. The model
>>>>> works on the basis of the "one-container-per-Pod" model
>>>>> <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that
>>>>> for each node of the cluster there will be one node running the driver and
>>>>> each remaining node running one executor each.
>>>>>
>>>>>
>>>>>
>>>>> HTH
>>>>> ,
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sergey Ivanychev <se...@gmail.com>.
I’m sure that its running in client mode. I don’t want to have the same amount of RAM on drivers and executors since there’s no point in giving 64G of ram to executors in my case.

My question is why collect and toPandas actions produce so different plans, which cause toPandas to fail on executors.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh <mi...@gmail.com> написал(а):
> 
> 
> 
> From your notes ".. IIUC, in the `toPandas` case all the data gets shuffled to a single executor that fails with OOM, which doesn’t happen in `collect` case. This does it work like that? How do I collect a large dataset that fits into memory of the driver?.
> 
> The acid test would be to use pandas and ensure that all nodes have the same amount of RAM. The assumption here is that the master node has a larger amount of RAM that in theory should handle the work. for Jupiter with Pandas. You can easily find out which mode Spark is deploying by looking at Spark GUI page.
> 
> HTH
> 
> 
> 
>    view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>  
> 
> 
>> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev <se...@gmail.com> wrote:
>> I will follow up with the output, but I suppose Jupyter runs in client mode since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same conditions (Jupyter client mode), so IIUC your theory doesn’t explain that difference in execution plans.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>>> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh <mi...@gmail.com> написал(а):
>>>> 
>>> 
>>> Do you have the output for executors from spark GUI, the one that eventually ends up with OOM?
>>> 
>>> Also what does 
>>> 
>>> kubectl get pods -n $NAMESPACE 
>>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print $1}'`
>>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>> 
>>> say?
>>> 
>>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage spark-submit under the bonnet. The job starts on the driver where the Jupyter notebook is on but the actual job runs on the cluster itself in cluster mode. If your assertion is right (executors don't play much of a role), just run the whole thing in local mode!
>>> 
>>> HTH
>>> 
>>>    view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>  
>>> 
>>> 
>>>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev <se...@gmail.com> wrote:
>>>> I want to further clarify the use case I have: an ML engineer collects data so as to use it for training an ML model. The driver is created within Jupiter notebook and has 64G of ram for fetching the training set and feeding it to the model. Naturally, in this case executors shouldn’t be as big as the driver.
>>>> 
>>>> Currently, the best solution I found is to write the dataframe to S3, and then read it via pd.read_parquet.
>>>> 
>>>> Best regards,
>>>> 
>>>> 
>>>> Sergey Ivanychev
>>>> 
>>>>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mi...@gmail.com> написал(а):
>>>>>> 
>>>>> 
>>>>> Thanks for clarification on the koalas case.
>>>>> 
>>>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all the data gets shuffled to a single executor that fails with OOM,.... 
>>>>> 
>>>>> I still believe that this may be related to the way k8s handles shuffling. In a balanced k8s cluster this could be avoided which does not seem to be the case here as the so called driver node has 8 times more RAM than the other nodes. 
>>>>> 
>>>>> HTH
>>>>> 
>>>>>    view my Linkedin profile
>>>>>  
>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>>>  
>>>>> 
>>>>> 
>>>>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sr...@gmail.com> wrote:
>>>>>> I think you're talking about koalas, which is in Spark 3.2, but that is unrelated to toPandas(), nor to the question of how it differs from collect().
>>>>>> Shuffle is also unrelated.
>>>>>> 
>>>>>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <mi...@gmail.com> wrote:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> As I understood in the previous versions of Spark the data could not be processed and stored in Pandas data frames in a distributed mode as these data frames store data in RAM which is the driver in this case. 
>>>>>>> However, I was under the impression that this limitation no longer exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB of RAM for others, and PySpark running in cluster mode,  how do you expect the process to confine itself to the master node? What will happen if you increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster) and run the job again?
>>>>>>> 
>>>>>>> Worth noting that the current Spark on k8s  does not support external shuffle. For now we have two parameters for Dynamic Resource Allocation. These are 
>>>>>>> 
>>>>>>>  --conf spark.dynamicAllocation.enabled=true \
>>>>>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>>>>>  
>>>>>>> The idea is to use dynamic resource allocation where the driver tracks the shuffle files and evicts only executors not storing active shuffle files. So in a nutshell these shuffle files are stored in the executors themselves in the absence of the external shuffle. The model works on the basis of the "one-container-per-Pod" model  meaning that for each node of the cluster there will be one node running the driver and each remaining node running one executor each. 
>>>>>>> 
>>>>>>> 
>>>>>>> HTH
>>>>>>> , 
>>>>>>>    view my Linkedin profile
>>>>>>> 
>>>>>>>  
>>>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>>>>>  
>>>>>>> 

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Mich Talebzadeh <mi...@gmail.com>.
From your notes ".. IIUC, in the `toPandas` case all the data gets shuffled
to a single executor that fails with OOM, which doesn’t happen in `collect`
case. This does it work like that? How do I collect a large dataset that
fits into memory of the driver?.

The acid test would be to use pandas and ensure that all nodes have the
same amount of RAM. The assumption here is that the master node has a
larger amount of RAM that in theory should handle the work. for Jupiter
with Pandas. You can easily find out which mode Spark is deploying by
looking at Spark GUI page.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev <se...@gmail.com>
wrote:

> I will follow up with the output, but I suppose Jupyter runs in client
> mode since it’s created via getOrCreate with a K8s api server as master.
> Also note that I tried both “collect” and “toPandas” in the same
> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that
> difference in execution plans.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh <mi...@gmail.com>
> написал(а):
>
> 
> Do you have the output for executors from spark GUI, the one that
> eventually ends up with OOM?
>
> Also what does
>
> kubectl get pods -n $NAMESPACE
> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
> $1}'`
> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>
>
> say?
>
>
> My guess is that Jupyter notebook like Zeppelin notebook does a two stage
> spark-submit under the bonnet. The job starts on the driver where the
> Jupyter notebook is on but the actual job runs on the cluster itself in
> cluster mode. If your assertion is right (executors don't play much of a
> role), just run the whole thing in local mode!
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev <se...@gmail.com>
> wrote:
>
>> I want to further clarify the use case I have: an ML engineer collects
>> data so as to use it for training an ML model. The driver is created within
>> Jupiter notebook and has 64G of ram for fetching the training set and
>> feeding it to the model. Naturally, in this case executors shouldn’t be as
>> big as the driver.
>>
>> Currently, the best solution I found is to write the dataframe to S3, and
>> then read it via pd.read_parquet.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mi...@gmail.com>
>> написал(а):
>>
>> 
>> Thanks for clarification on the koalas case.
>>
>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
>> the data gets shuffled to a single executor that fails with OOM,....
>>
>> I still believe that this may be related to the way k8s handles
>> shuffling. In a balanced k8s cluster this could be avoided which does not
>> seem to be the case here as the so called driver node has 8 times more
>> RAM than the other nodes.
>>
>> HTH
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sr...@gmail.com> wrote:
>>
>>> I think you're talking about koalas, which is in Spark 3.2, but that is
>>> unrelated to toPandas(), nor to the question of how it differs from
>>> collect().
>>> Shuffle is also unrelated.
>>>
>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> As I understood in the previous versions of Spark the data could not
>>>> be processed and stored in Pandas data frames in a distributed mode as
>>>> these data frames store data in RAM which is the driver in this case.
>>>> However, I was under the impression that this limitation no longer
>>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do you
>>>> expect the process to confine itself to the master node? What will happen
>>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>>> cluster) and run the job again?
>>>>
>>>> Worth noting that the current Spark on k8s  does not support external
>>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>>> These are
>>>>
>>>>  --conf spark.dynamicAllocation.enabled=true \
>>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>>
>>>>
>>>> The idea is to use dynamic resource allocation where the driver tracks
>>>> the shuffle files and evicts only executors not storing active shuffle
>>>> files. So in a nutshell these shuffle files are stored in the executors
>>>> themselves in the absence of the external shuffle. The model works on the
>>>> basis of the "one-container-per-Pod" model
>>>> <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that for
>>>> each node of the cluster there will be one node running the driver and each
>>>> remaining node running one executor each.
>>>>
>>>>
>>>>
>>>> HTH
>>>> ,
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sergey Ivanychev <se...@gmail.com>.
I will follow up with the output, but I suppose Jupyter runs in client mode since it’s created via getOrCreate with a K8s api server as master.
Also note that I tried both “collect” and “toPandas” in the same conditions (Jupyter client mode), so IIUC your theory doesn’t explain that difference in execution plans.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh <mi...@gmail.com> написал(а):
> 
> 
> Do you have the output for executors from spark GUI, the one that eventually ends up with OOM?
> 
> Also what does 
> 
> kubectl get pods -n $NAMESPACE 
> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print $1}'`
> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
> 
> say?
> 
> My guess is that Jupyter notebook like Zeppelin notebook does a two stage spark-submit under the bonnet. The job starts on the driver where the Jupyter notebook is on but the actual job runs on the cluster itself in cluster mode. If your assertion is right (executors don't play much of a role), just run the whole thing in local mode!
> 
> HTH
> 
>    view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev <se...@gmail.com> wrote:
>> I want to further clarify the use case I have: an ML engineer collects data so as to use it for training an ML model. The driver is created within Jupiter notebook and has 64G of ram for fetching the training set and feeding it to the model. Naturally, in this case executors shouldn’t be as big as the driver.
>> 
>> Currently, the best solution I found is to write the dataframe to S3, and then read it via pd.read_parquet.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mi...@gmail.com> написал(а):
>>>> 
>>> 
>>> Thanks for clarification on the koalas case.
>>> 
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all the data gets shuffled to a single executor that fails with OOM,.... 
>>> 
>>> I still believe that this may be related to the way k8s handles shuffling. In a balanced k8s cluster this could be avoided which does not seem to be the case here as the so called driver node has 8 times more RAM than the other nodes. 
>>> 
>>> HTH
>>> 
>>>    view my Linkedin profile
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>  
>>> 
>>> 
>>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sr...@gmail.com> wrote:
>>>> I think you're talking about koalas, which is in Spark 3.2, but that is unrelated to toPandas(), nor to the question of how it differs from collect().
>>>> Shuffle is also unrelated.
>>>> 
>>>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <mi...@gmail.com> wrote:
>>>>> Hi,
>>>>> 
>>>>> As I understood in the previous versions of Spark the data could not be processed and stored in Pandas data frames in a distributed mode as these data frames store data in RAM which is the driver in this case. 
>>>>> However, I was under the impression that this limitation no longer exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB of RAM for others, and PySpark running in cluster mode,  how do you expect the process to confine itself to the master node? What will happen if you increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster) and run the job again?
>>>>> 
>>>>> Worth noting that the current Spark on k8s  does not support external shuffle. For now we have two parameters for Dynamic Resource Allocation. These are 
>>>>> 
>>>>>  --conf spark.dynamicAllocation.enabled=true \
>>>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>>>  
>>>>> The idea is to use dynamic resource allocation where the driver tracks the shuffle files and evicts only executors not storing active shuffle files. So in a nutshell these shuffle files are stored in the executors themselves in the absence of the external shuffle. The model works on the basis of the "one-container-per-Pod" model  meaning that for each node of the cluster there will be one node running the driver and each remaining node running one executor each. 
>>>>> 
>>>>> 
>>>>> HTH
>>>>> , 
>>>>>    view my Linkedin profile
>>>>> 
>>>>>  
>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>>>  
>>>>> 

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Mich Talebzadeh <mi...@gmail.com>.
Do you have the output for executors from spark GUI, the one that
eventually ends up with OOM?

Also what does

kubectl get pods -n $NAMESPACE
DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
$1}'`
kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE


say?


My guess is that Jupyter notebook like Zeppelin notebook does a two stage
spark-submit under the bonnet. The job starts on the driver where the
Jupyter notebook is on but the actual job runs on the cluster itself in
cluster mode. If your assertion is right (executors don't play much of a
role), just run the whole thing in local mode!


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev <se...@gmail.com>
wrote:

> I want to further clarify the use case I have: an ML engineer collects
> data so as to use it for training an ML model. The driver is created within
> Jupiter notebook and has 64G of ram for fetching the training set and
> feeding it to the model. Naturally, in this case executors shouldn’t be as
> big as the driver.
>
> Currently, the best solution I found is to write the dataframe to S3, and
> then read it via pd.read_parquet.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mi...@gmail.com>
> написал(а):
>
> 
> Thanks for clarification on the koalas case.
>
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
> the data gets shuffled to a single executor that fails with OOM,....
>
> I still believe that this may be related to the way k8s handles shuffling.
> In a balanced k8s cluster this could be avoided which does not seem to be
> the case here as the so called driver node has 8 times more RAM than the
> other nodes.
>
> HTH
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sr...@gmail.com> wrote:
>
>> I think you're talking about koalas, which is in Spark 3.2, but that is
>> unrelated to toPandas(), nor to the question of how it differs from
>> collect().
>> Shuffle is also unrelated.
>>
>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> As I understood in the previous versions of Spark the data could not be
>>> processed and stored in Pandas data frames in a distributed mode as these
>>> data frames store data in RAM which is the driver in this case.
>>> However, I was under the impression that this limitation no longer
>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do you
>>> expect the process to confine itself to the master node? What will happen
>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>> cluster) and run the job again?
>>>
>>> Worth noting that the current Spark on k8s  does not support external
>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>> These are
>>>
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>
>>>
>>> The idea is to use dynamic resource allocation where the driver tracks
>>> the shuffle files and evicts only executors not storing active shuffle
>>> files. So in a nutshell these shuffle files are stored in the executors
>>> themselves in the absence of the external shuffle. The model works on the
>>> basis of the "one-container-per-Pod" model
>>> <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that for
>>> each node of the cluster there will be one node running the driver and each
>>> remaining node running one executor each.
>>>
>>>
>>>
>>> HTH
>>> ,
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sergey Ivanychev <se...@gmail.com>.
I want to further clarify the use case I have: an ML engineer collects data so as to use it for training an ML model. The driver is created within Jupiter notebook and has 64G of ram for fetching the training set and feeding it to the model. Naturally, in this case executors shouldn’t be as big as the driver.

Currently, the best solution I found is to write the dataframe to S3, and then read it via pd.read_parquet.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mi...@gmail.com> написал(а):
> 
> 
> Thanks for clarification on the koalas case.
> 
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all the data gets shuffled to a single executor that fails with OOM,.... 
> 
> I still believe that this may be related to the way k8s handles shuffling. In a balanced k8s cluster this could be avoided which does not seem to be the case here as the so called driver node has 8 times more RAM than the other nodes. 
> 
> HTH
> 
>    view my Linkedin profile
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sr...@gmail.com> wrote:
>> I think you're talking about koalas, which is in Spark 3.2, but that is unrelated to toPandas(), nor to the question of how it differs from collect().
>> Shuffle is also unrelated.
>> 
>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <mi...@gmail.com> wrote:
>>> Hi,
>>> 
>>> As I understood in the previous versions of Spark the data could not be processed and stored in Pandas data frames in a distributed mode as these data frames store data in RAM which is the driver in this case. 
>>> However, I was under the impression that this limitation no longer exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB of RAM for others, and PySpark running in cluster mode,  how do you expect the process to confine itself to the master node? What will happen if you increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster) and run the job again?
>>> 
>>> Worth noting that the current Spark on k8s  does not support external shuffle. For now we have two parameters for Dynamic Resource Allocation. These are 
>>> 
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>  
>>> The idea is to use dynamic resource allocation where the driver tracks the shuffle files and evicts only executors not storing active shuffle files. So in a nutshell these shuffle files are stored in the executors themselves in the absence of the external shuffle. The model works on the basis of the "one-container-per-Pod" model  meaning that for each node of the cluster there will be one node running the driver and each remaining node running one executor each. 
>>> 
>>> 
>>> HTH
>>> , 
>>>    view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>  
>>> 

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks for clarification on the koalas case.

The thread owner states and I quote: .. IIUC, in the `toPandas` case all
the data gets shuffled to a single executor that fails with OOM,....

I still believe that this may be related to the way k8s handles shuffling.
In a balanced k8s cluster this could be avoided which does not seem to be
the case here as the so called driver node has 8 times more RAM than the
other nodes.

HTH

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 3 Nov 2021 at 21:00, Sean Owen <sr...@gmail.com> wrote:

> I think you're talking about koalas, which is in Spark 3.2, but that is
> unrelated to toPandas(), nor to the question of how it differs from
> collect().
> Shuffle is also unrelated.
>
> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Hi,
>>
>> As I understood in the previous versions of Spark the data could not be
>> processed and stored in Pandas data frames in a distributed mode as these
>> data frames store data in RAM which is the driver in this case.
>> However, I was under the impression that this limitation no longer exists
>> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB
>> of RAM for others, and PySpark running in cluster mode,  how do you expect
>> the process to confine itself to the master node? What will happen if you
>> increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster)
>> and run the job again?
>>
>> Worth noting that the current Spark on k8s  does not support external
>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>> These are
>>
>>  --conf spark.dynamicAllocation.enabled=true \
>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>
>>
>> The idea is to use dynamic resource allocation where the driver tracks
>> the shuffle files and evicts only executors not storing active shuffle
>> files. So in a nutshell these shuffle files are stored in the executors
>> themselves in the absence of the external shuffle. The model works on the
>> basis of the "one-container-per-Pod" model
>> <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that for
>> each node of the cluster there will be one node running the driver and each
>> remaining node running one executor each.
>>
>>
>>
>> HTH
>> ,
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sean Owen <sr...@gmail.com>.
I think you're talking about koalas, which is in Spark 3.2, but that is
unrelated to toPandas(), nor to the question of how it differs from
collect().
Shuffle is also unrelated.

On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Hi,
>
> As I understood in the previous versions of Spark the data could not be
> processed and stored in Pandas data frames in a distributed mode as these
> data frames store data in RAM which is the driver in this case.
> However, I was under the impression that this limitation no longer exists
> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB
> of RAM for others, and PySpark running in cluster mode,  how do you expect
> the process to confine itself to the master node? What will happen if you
> increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster)
> and run the job again?
>
> Worth noting that the current Spark on k8s  does not support external
> shuffle. For now we have two parameters for Dynamic Resource Allocation.
> These are
>
>  --conf spark.dynamicAllocation.enabled=true \
>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>
>
> The idea is to use dynamic resource allocation where the driver tracks
> the shuffle files and evicts only executors not storing active shuffle
> files. So in a nutshell these shuffle files are stored in the executors
> themselves in the absence of the external shuffle. The model works on the
> basis of the "one-container-per-Pod" model
> <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that for
> each node of the cluster there will be one node running the driver and each
> remaining node running one executor each.
>
>
>
> HTH
> ,
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi,

As I understood in the previous versions of Spark the data could not be
processed and stored in Pandas data frames in a distributed mode as these
data frames store data in RAM which is the driver in this case.
However, I was under the impression that this limitation no longer exists
in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB
of RAM for others, and PySpark running in cluster mode,  how do you expect
the process to confine itself to the master node? What will happen if you
increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster)
and run the job again?

Worth noting that the current Spark on k8s  does not support external
shuffle. For now we have two parameters for Dynamic Resource Allocation.
These are

 --conf spark.dynamicAllocation.enabled=true \
 --conf spark.dynamicAllocation.shuffleTracking.enabled=true \


The idea is to use dynamic resource allocation where the driver tracks the
shuffle files and evicts only executors not storing active shuffle files.
So in a nutshell these shuffle files are stored in the executors themselves
in the absence of the external shuffle. The model works on the basis
of the "one-container-per-Pod"
model  <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that
for each node of the cluster there will be one node running the driver and
each remaining node running one executor each.



HTH
,

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 3 Nov 2021 at 14:24, Sergey Ivanychev <se...@gmail.com>
wrote:

> Hi,
>
> Spark 3.1.2 K8s.
>
> I encountered OOM error while trying to create a Pandas DataFrame from
> Spark DataFrame. My Spark driver has 60G of ram, but the executors are tiny
> compared to that (8G)
>
> If I do `spark.table(…).limit(1000000).collect()` I get the following plan
>
>
>
> If I do `spark.table(…).limit(1000000).toPandas()` I get a more
> complicated plan with an extra shuffle
>
>
> IIUC, in the `toPandas` case all the data gets shuffled to a single
> executor that fails with OOM, which doesn’t happen in `collect` case. This
> does it work like that? How do I collect a large dataset that fits into
> memory of the driver?
>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sergey Ivanychev <se...@gmail.com>.
I’m pretty sure

WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 20) (10.20.167.28 executor 2): java.lang.OutOfMemoryError
	at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
If you look at the «toPandas» you can see the exchange stage that doesn’t occur in the «collect» example. I suppose it tries to pull all the data to a single executor for some reason.

> 3 нояб. 2021 г., в 21:34, Sean Owen <sr...@gmail.com> написал(а):
> 
> No, you can collect() a DataFrame. You get Rows. collect() must create an in-memory representation - how else could it work? Those aren't differences.
> Are you sure it's the executor that's OOM, not the driver? 
> 
> On Wed, Nov 3, 2021 at 1:32 PM Gourav Sengupta <gourav.sengupta@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> 
> I might be wrong but toPandas() works with dataframes, where as collect works at RDD. Also toPandas() converts to Python objects in memory I do not think that collect does it.
> 
> Regards,
> Gourav
> 
> On Wed, Nov 3, 2021 at 2:24 PM Sergey Ivanychev <sergeyivanychev@gmail.com <ma...@gmail.com>> wrote:
> Hi, 
> 
> Spark 3.1.2 K8s.
> 
> I encountered OOM error while trying to create a Pandas DataFrame from Spark DataFrame. My Spark driver has 60G of ram, but the executors are tiny compared to that (8G)
> 
> If I do `spark.table(…).limit(1000000).collect()` I get the following plan
> 
> <2021-11-03_17-07-55.png>
> 
> 
> If I do `spark.table(…).limit(1000000).toPandas()` I get a more complicated plan with an extra shuffle
> 
> <2021-11-03_17-08-31.png>
> 
> IIUC, in the `toPandas` case all the data gets shuffled to a single executor that fails with OOM, which doesn’t happen in `collect` case. This does it work like that? How do I collect a large dataset that fits into memory of the driver?


Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Sean Owen <sr...@gmail.com>.
No, you can collect() a DataFrame. You get Rows. collect() must create an
in-memory representation - how else could it work? Those aren't differences.
Are you sure it's the executor that's OOM, not the driver?

On Wed, Nov 3, 2021 at 1:32 PM Gourav Sengupta <go...@gmail.com>
wrote:

> Hi,
>
> I might be wrong but toPandas() works with dataframes, where as collect
> works at RDD. Also toPandas() converts to Python objects in memory I do not
> think that collect does it.
>
> Regards,
> Gourav
>
> On Wed, Nov 3, 2021 at 2:24 PM Sergey Ivanychev <se...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Spark 3.1.2 K8s.
>>
>> I encountered OOM error while trying to create a Pandas DataFrame from
>> Spark DataFrame. My Spark driver has 60G of ram, but the executors are tiny
>> compared to that (8G)
>>
>> If I do `spark.table(…).limit(1000000).collect()` I get the following plan
>>
>>
>>
>> If I do `spark.table(…).limit(1000000).toPandas()` I get a more
>> complicated plan with an extra shuffle
>>
>>
>> IIUC, in the `toPandas` case all the data gets shuffled to a single
>> executor that fails with OOM, which doesn’t happen in `collect` case. This
>> does it work like that? How do I collect a large dataset that fits into
>> memory of the driver?
>>
>

Re: PySpark: toPandas() vs collect() execution graph differences

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

I might be wrong but toPandas() works with dataframes, where as collect
works at RDD. Also toPandas() converts to Python objects in memory I do not
think that collect does it.

Regards,
Gourav

On Wed, Nov 3, 2021 at 2:24 PM Sergey Ivanychev <se...@gmail.com>
wrote:

> Hi,
>
> Spark 3.1.2 K8s.
>
> I encountered OOM error while trying to create a Pandas DataFrame from
> Spark DataFrame. My Spark driver has 60G of ram, but the executors are tiny
> compared to that (8G)
>
> If I do `spark.table(…).limit(1000000).collect()` I get the following plan
>
>
>
> If I do `spark.table(…).limit(1000000).toPandas()` I get a more
> complicated plan with an extra shuffle
>
>
> IIUC, in the `toPandas` case all the data gets shuffled to a single
> executor that fails with OOM, which doesn’t happen in `collect` case. This
> does it work like that? How do I collect a large dataset that fits into
> memory of the driver?
>