You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dhrubajyoti Hati <dh...@gmail.com> on 2019/09/20 06:22:49 UTC

Collections passed from driver to executors

Hi,

I have a question regarding passing a dictionary from driver to executors
in spark on yarn. This dictionary is needed in an udf. I am using pyspark.

As I understand this can be passed in two ways:

1. Broadcast the variable and then use it in the udfs

2. Pass the dictionary in the udf itself, in something like this:

  def udf1(col1, dict):
   ..
  def udf1_fn(dict):
    return udf(lambda col_data: udf1(col_data, dict))

  df.withColumn("column_new", udf1_fn(dict)("old_column"))

Well I have tested with both the ways and it works both ways.

Now I am wondering what is fundamentally different between the two. I
understand how broadcast work but I am not sure how the data is passed
across in the 2nd way. Is the dictionary passed to each executor every time
when new task is running on that executor or they are passed only once.
Also how the data is passed to the python processes. They are python udfs
so I think they are executed natively in python.(Plz correct me if I am
wrong). So the data will be serialised and passed to python.

So in summary my question is which will be better/efficient way to write
the whole thing and why?

Thank you!

Regards,
Dhrub

Re: Collections passed from driver to executors

Posted by Reynold Xin <rx...@databricks.com>.
It's was done 2014 by yours truly https://github.com/apache/spark/pull/1498

so any modern version would have it.

On Mon, Sep 23, 2019 at 9:04 PM, Dhrubajyoti Hati < dhruba.work@gmail.com > wrote:

> 
> Thanks. Could you please let me know which version of spark its changed.
> We are still at 2.2.
> 
> On Tue, 24 Sep, 2019, 9:17 AM Reynold Xin, < rxin@ databricks. com (
> rxin@databricks.com ) > wrote:
> 
> 
>> A while ago we changed it so the task gets broadcasted too, so I think the
>> two are fairly similar.
>> 
>> 
>> 
>> 
>> 
>> 
>> On Mon, Sep 23, 2019 at 8:17 PM, Dhrubajyoti Hati < dhruba. work@ gmail. com
>> ( dhruba.work@gmail.com ) > wrote:
>> 
>>> I was wondering if anyone could help with this question.
>>> 
>>> On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, < dhruba. work@ gmail. com
>>> ( dhruba.work@gmail.com ) > wrote:
>>> 
>>> 
>>>> Hi,
>>>> 
>>>> 
>>>> I have a question regarding passing a dictionary from driver to executors
>>>> in spark on yarn. This dictionary is needed in an udf. I am using pyspark.
>>>> 
>>>> 
>>>> As I understand this can be passed in two ways:
>>>> 
>>>> 
>>>> 1. Broadcast the variable and then use it in the udfs
>>>> 
>>>> 
>>>> 2. Pass the dictionary in the udf itself, in something like this:
>>>> 
>>>> 
>>>>   def udf1(col1, dict):
>>>>    ..
>>>>   def udf 1 _ fn (dict):
>>>>     return udf(lambda col_ data : udf1( col_data, dict ))
>>>> 
>>>> 
>>>>   df.withColumn("column_new", udf 1 _ fn (dict)("old_column"))
>>>> 
>>>> 
>>>> Well I have tested with both the ways and it works both ways.
>>>> 
>>>> 
>>>> Now I am wondering what is fundamentally different between the two. I
>>>> understand how broadcast work but I am not sure how the data is passed
>>>> across in the 2nd way. Is the dictionary passed to each executor every
>>>> time when new task is running on that executor or they are passed only
>>>> once. Also how the data is passed to the python processes. They are python
>>>> udfs so I think they are executed natively in python.(Plz correct me if I
>>>> am wrong). So the data will be serialised and passed to python.
>>>> 
>>>> So in summary my question is which will be better/efficient way to write
>>>> the whole thing and why?
>>>> 
>>>> 
>>>> Thank you!
>>>> 
>>>> 
>>>> R egards,
>>>> Dhrub
>>>> 
>>> 
>>> 
>> 
>> 
> 
>

Re: Collections passed from driver to executors

Posted by Dhrubajyoti Hati <dh...@gmail.com>.
Thanks. Could you please let me know which version of spark its changed. We
are still at 2.2.

On Tue, 24 Sep, 2019, 9:17 AM Reynold Xin, <rx...@databricks.com> wrote:

> A while ago we changed it so the task gets broadcasted too, so I think the
> two are fairly similar.
>
>
>
> On Mon, Sep 23, 2019 at 8:17 PM, Dhrubajyoti Hati <dh...@gmail.com>
> wrote:
>
>> I was wondering if anyone could help with this question.
>>
>> On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, <dh...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a question regarding passing a dictionary from driver to
>>> executors in spark on yarn. This dictionary is needed in an udf. I am using
>>> pyspark.
>>>
>>> As I understand this can be passed in two ways:
>>>
>>> 1. Broadcast the variable and then use it in the udfs
>>>
>>> 2. Pass the dictionary in the udf itself, in something like this:
>>>
>>>   def udf1(col1, dict):
>>>    ..
>>>   def udf1_fn(dict):
>>>     return udf(lambda col_data: udf1(col_data, dict))
>>>
>>>   df.withColumn("column_new", udf1_fn(dict)("old_column"))
>>>
>>> Well I have tested with both the ways and it works both ways.
>>>
>>> Now I am wondering what is fundamentally different between the two. I
>>> understand how broadcast work but I am not sure how the data is passed
>>> across in the 2nd way. Is the dictionary passed to each executor every time
>>> when new task is running on that executor or they are passed only once.
>>> Also how the data is passed to the python processes. They are python udfs
>>> so I think they are executed natively in python.(Plz correct me if I am
>>> wrong). So the data will be serialised and passed to python.
>>>
>>> So in summary my question is which will be better/efficient way to write
>>> the whole thing and why?
>>>
>>> Thank you!
>>>
>>> Regards,
>>> Dhrub
>>>
>>
>

Re: Collections passed from driver to executors

Posted by Reynold Xin <rx...@databricks.com>.
A while ago we changed it so the task gets broadcasted too, so I think the two are fairly similar.

On Mon, Sep 23, 2019 at 8:17 PM, Dhrubajyoti Hati < dhruba.work@gmail.com > wrote:

> 
> I was wondering if anyone could help with this question.
> 
> On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, < dhruba. work@ gmail. com
> ( dhruba.work@gmail.com ) > wrote:
> 
> 
>> Hi,
>> 
>> 
>> I have a question regarding passing a dictionary from driver to executors
>> in spark on yarn. This dictionary is needed in an udf. I am using pyspark.
>> 
>> 
>> As I understand this can be passed in two ways:
>> 
>> 
>> 1. Broadcast the variable and then use it in the udfs
>> 
>> 
>> 2. Pass the dictionary in the udf itself, in something like this:
>> 
>> 
>>   def udf1(col1, dict):
>>    ..
>>   def udf 1 _ fn (dict):
>>     return udf(lambda col_ data : udf1( col_data, dict ))
>> 
>> 
>>   df.withColumn("column_new", udf 1 _ fn (dict)("old_column"))
>> 
>> 
>> Well I have tested with both the ways and it works both ways.
>> 
>> 
>> Now I am wondering what is fundamentally different between the two. I
>> understand how broadcast work but I am not sure how the data is passed
>> across in the 2nd way. Is the dictionary passed to each executor every
>> time when new task is running on that executor or they are passed only
>> once. Also how the data is passed to the python processes. They are python
>> udfs so I think they are executed natively in python.(Plz correct me if I
>> am wrong). So the data will be serialised and passed to python.
>> 
>> So in summary my question is which will be better/efficient way to write
>> the whole thing and why?
>> 
>> 
>> Thank you!
>> 
>> 
>> R egards,
>> Dhrub
>> 
> 
>

Re: Collections passed from driver to executors

Posted by Reynold Xin <rx...@databricks.com>.
A while ago we changed it so the task gets broadcasted too, so I think the two are fairly similar.

On Mon, Sep 23, 2019 at 8:17 PM, Dhrubajyoti Hati < dhruba.work@gmail.com > wrote:

> 
> I was wondering if anyone could help with this question.
> 
> On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, < dhruba. work@ gmail. com
> ( dhruba.work@gmail.com ) > wrote:
> 
> 
>> Hi,
>> 
>> 
>> I have a question regarding passing a dictionary from driver to executors
>> in spark on yarn. This dictionary is needed in an udf. I am using pyspark.
>> 
>> 
>> As I understand this can be passed in two ways:
>> 
>> 
>> 1. Broadcast the variable and then use it in the udfs
>> 
>> 
>> 2. Pass the dictionary in the udf itself, in something like this:
>> 
>> 
>>   def udf1(col1, dict):
>>    ..
>>   def udf 1 _ fn (dict):
>>     return udf(lambda col_ data : udf1( col_data, dict ))
>> 
>> 
>>   df.withColumn("column_new", udf 1 _ fn (dict)("old_column"))
>> 
>> 
>> Well I have tested with both the ways and it works both ways.
>> 
>> 
>> Now I am wondering what is fundamentally different between the two. I
>> understand how broadcast work but I am not sure how the data is passed
>> across in the 2nd way. Is the dictionary passed to each executor every
>> time when new task is running on that executor or they are passed only
>> once. Also how the data is passed to the python processes. They are python
>> udfs so I think they are executed natively in python.(Plz correct me if I
>> am wrong). So the data will be serialised and passed to python.
>> 
>> So in summary my question is which will be better/efficient way to write
>> the whole thing and why?
>> 
>> 
>> Thank you!
>> 
>> 
>> R egards,
>> Dhrub
>> 
> 
>

Re: Collections passed from driver to executors

Posted by Dhrubajyoti Hati <dh...@gmail.com>.
I was wondering if anyone could help with this question.

On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, <dh...@gmail.com>
wrote:

> Hi,
>
> I have a question regarding passing a dictionary from driver to executors
> in spark on yarn. This dictionary is needed in an udf. I am using pyspark.
>
> As I understand this can be passed in two ways:
>
> 1. Broadcast the variable and then use it in the udfs
>
> 2. Pass the dictionary in the udf itself, in something like this:
>
>   def udf1(col1, dict):
>    ..
>   def udf1_fn(dict):
>     return udf(lambda col_data: udf1(col_data, dict))
>
>   df.withColumn("column_new", udf1_fn(dict)("old_column"))
>
> Well I have tested with both the ways and it works both ways.
>
> Now I am wondering what is fundamentally different between the two. I
> understand how broadcast work but I am not sure how the data is passed
> across in the 2nd way. Is the dictionary passed to each executor every time
> when new task is running on that executor or they are passed only once.
> Also how the data is passed to the python processes. They are python udfs
> so I think they are executed natively in python.(Plz correct me if I am
> wrong). So the data will be serialised and passed to python.
>
> So in summary my question is which will be better/efficient way to write
> the whole thing and why?
>
> Thank you!
>
> Regards,
> Dhrub
>

Re: Collections passed from driver to executors

Posted by Dhrubajyoti Hati <dh...@gmail.com>.
I was wondering if anyone could help with this question.

On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, <dh...@gmail.com>
wrote:

> Hi,
>
> I have a question regarding passing a dictionary from driver to executors
> in spark on yarn. This dictionary is needed in an udf. I am using pyspark.
>
> As I understand this can be passed in two ways:
>
> 1. Broadcast the variable and then use it in the udfs
>
> 2. Pass the dictionary in the udf itself, in something like this:
>
>   def udf1(col1, dict):
>    ..
>   def udf1_fn(dict):
>     return udf(lambda col_data: udf1(col_data, dict))
>
>   df.withColumn("column_new", udf1_fn(dict)("old_column"))
>
> Well I have tested with both the ways and it works both ways.
>
> Now I am wondering what is fundamentally different between the two. I
> understand how broadcast work but I am not sure how the data is passed
> across in the 2nd way. Is the dictionary passed to each executor every time
> when new task is running on that executor or they are passed only once.
> Also how the data is passed to the python processes. They are python udfs
> so I think they are executed natively in python.(Plz correct me if I am
> wrong). So the data will be serialised and passed to python.
>
> So in summary my question is which will be better/efficient way to write
> the whole thing and why?
>
> Thank you!
>
> Regards,
> Dhrub
>