You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gourav Sengupta <go...@gmail.com> on 2018/04/03 17:04:34 UTC

Re: How to pass sparkSession from driver to executor

Hi,

I think that what you are facing is documented in SPARK:
http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-


May I ask what are you trying to achieve here? From what I understand, you
have a list of JSON files which you want to read separately, as they have
different schemas. Is that right?

Can you please let me know the following details as well:
1. SPARK Version
2. The full code
3. The path to the files


Regards,
Gourav Sengupta

On Thu, Sep 21, 2017 at 4:55 PM, ayan guha <gu...@gmail.com> wrote:

> The point here is - spark session is not available in executors. So, you
> have to use appropriate storage clients.
>
> On Fri, Sep 22, 2017 at 1:44 AM, lucas.gary@gmail.com <
> lucas.gary@gmail.com> wrote:
>
>> I'm not sure what you're doing.  But I have in the past used spark to
>> consume a manifest file and then execute a .mapPartition on the result like
>> this:
>>
>>
>> def map_key_to_event(s3_events_data_lake):
>>
>>     def _map_key_to_event(event_key_list, s3_client=test_stub):
>>         print("Events in list")
>>         start = time.time()
>>
>>         return_list = []
>>
>>         if s3_client is None:
>>             s3_client = boto3.Session().client('s3')
>>
>>         for event_key in event_key_list:
>>           try:
>>             response = s3_client.get_object(Bucket=s3_events_data_lake,
>> Key=event_key)
>>             contents = response['Body'].read().decode('utf-8')
>>             entity = json.loads(contents)
>>             event_type = json.loads(entity["Message"])["type"]
>>             entity["Message"] = json.loads(entity["Message"])
>>             # json.dumps here because Spark doesn't have a good json
>> datatype.
>>             return_list.append((event_type, json.dumps(entity)))
>>           except Exception:
>>             print("Key: {k} did not yield a valid object:
>> {o}".format(k=event_key, o=contents))
>>
>>         end = time.time()
>>         print('time elapsed:')
>>         print(end - start)
>>
>>         return return_list
>>
>>     return _map_key_to_event
>>
>>
>> pkeys = spark.context.parallelize(full_list_for_time_slice, 32)
>> print("partitions: ")
>> print(pkeys.getNumPartitions())
>> events = pkeys.mapPartitions(map_func)
>>
>>
>>
>>
>>
>> In this case I'm loading heterogeneous json files with wildly different
>> schemas, then saving them into parquet file / event type (IE turning one
>> big heterogeneous dump into numerous smaller homogenous dumps)
>>
>> I'm sure this isn't the only or even best way to do it.
>>
>> The underlying issue is that you're trying to violate the programming
>> model.  The model in this case consists of telling the driver what you want
>> and then having the executors go do it.
>>
>> Spark Context is a driver level abstraction, it kind of doesn't make
>> sense in the executor context, the executor is acting on behalf of the
>> driver and shouldn't need a back reference to it.  You'd end up with some
>> interesting execution graphs.
>>
>> This is a common pattern in spark as far as I can tell.  IE calling a map
>> and and then doing something with the items in the executor, either
>> computing or enriching.  My case above is a bit weird and I'm not certain
>> it's the right mechanism in that I'm literally taking a manifest file and
>> turning it into 'n' actual records.
>>
>> Also, if you're going to be constructing a connection string / jdbc call
>> / s3 client... You really don't want to use a straight .map(func).  You'll
>> end up instantiating a connection on every iteration.
>>
>> Hope this is somewhat helpful.
>>
>> Gary
>>
>> On 21 September 2017 at 06:31, Weichen Xu <we...@databricks.com>
>> wrote:
>>
>>> Spark do not allow executor code using `sparkSession`.
>>> But I think you can move all json files to one directory, and them run:
>>>
>>> ```
>>> spark.read.json("/path/to/jsonFileDir")
>>> ```
>>> But if you want to get filename at the same time, you can use
>>> ```
>>> spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")...
>>> ```
>>>
>>> On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari <fe...@gmail.com>
>>> wrote:
>>>
>>>> Depends on your use-case however broadcasting
>>>> <https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables>
>>>> could be a better option.
>>>>
>>>> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu <
>>>> chaku.mitcs@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I want to know how to pass sparkSession from driver to executor.
>>>>>
>>>>> I have a spark program (batch job) which does following,
>>>>>
>>>>> #################
>>>>>
>>>>> val spark = SparkSession.builder().appName("SampleJob").config("spark.
>>>>> master", "local") .getOrCreate()
>>>>>
>>>>> val df = this is dataframe which has list of file names (hdfs)
>>>>>
>>>>> df.foreach { fileName =>
>>>>>
>>>>>       *spark.read.json(fileName)*
>>>>>
>>>>>       ...... some logic here....
>>>>> }
>>>>>
>>>>> #################
>>>>>
>>>>>
>>>>> *spark.read.json(fileName) --- this fails as it runs in executor. When
>>>>> I put it outside foreach, i.e. in driver, it works.*
>>>>>
>>>>> As I am trying to use spark (sparkSession) in executor which is not
>>>>> visible outside driver. But I want to read hdfs files inside foreach, how
>>>>> do I do it.
>>>>>
>>>>> Can someone help how to do this.
>>>>>
>>>>> Thanks,
>>>>> Chackra
>>>>>
>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Re: How to pass sparkSession from driver to executor

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

the other thing that you may try doing is use the following in your SQL and
then based on regular expressions filter out records based on which
directory they came from. But I would be very interested to know the
details which I have asked for in my earlier email.

input_file_name()


Regards,
Gourav


On Tue, Apr 3, 2018 at 6:04 PM, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi,
>
> I think that what you are facing is documented in SPARK:
> http://spark.apache.org/docs/latest/rdd-programming-guide.html#
> understanding-closures-
>
> May I ask what are you trying to achieve here? From what I understand, you
> have a list of JSON files which you want to read separately, as they have
> different schemas. Is that right?
>
> Can you please let me know the following details as well:
> 1. SPARK Version
> 2. The full code
> 3. The path to the files
>
>
> Regards,
> Gourav Sengupta
>
> On Thu, Sep 21, 2017 at 4:55 PM, ayan guha <gu...@gmail.com> wrote:
>
>> The point here is - spark session is not available in executors. So, you
>> have to use appropriate storage clients.
>>
>> On Fri, Sep 22, 2017 at 1:44 AM, lucas.gary@gmail.com <
>> lucas.gary@gmail.com> wrote:
>>
>>> I'm not sure what you're doing.  But I have in the past used spark to
>>> consume a manifest file and then execute a .mapPartition on the result like
>>> this:
>>>
>>>
>>> def map_key_to_event(s3_events_data_lake):
>>>
>>>     def _map_key_to_event(event_key_list, s3_client=test_stub):
>>>         print("Events in list")
>>>         start = time.time()
>>>
>>>         return_list = []
>>>
>>>         if s3_client is None:
>>>             s3_client = boto3.Session().client('s3')
>>>
>>>         for event_key in event_key_list:
>>>           try:
>>>             response = s3_client.get_object(Bucket=s3_events_data_lake,
>>> Key=event_key)
>>>             contents = response['Body'].read().decode('utf-8')
>>>             entity = json.loads(contents)
>>>             event_type = json.loads(entity["Message"])["type"]
>>>             entity["Message"] = json.loads(entity["Message"])
>>>             # json.dumps here because Spark doesn't have a good json
>>> datatype.
>>>             return_list.append((event_type, json.dumps(entity)))
>>>           except Exception:
>>>             print("Key: {k} did not yield a valid object:
>>> {o}".format(k=event_key, o=contents))
>>>
>>>         end = time.time()
>>>         print('time elapsed:')
>>>         print(end - start)
>>>
>>>         return return_list
>>>
>>>     return _map_key_to_event
>>>
>>>
>>> pkeys = spark.context.parallelize(full_list_for_time_slice, 32)
>>> print("partitions: ")
>>> print(pkeys.getNumPartitions())
>>> events = pkeys.mapPartitions(map_func)
>>>
>>>
>>>
>>>
>>>
>>> In this case I'm loading heterogeneous json files with wildly different
>>> schemas, then saving them into parquet file / event type (IE turning one
>>> big heterogeneous dump into numerous smaller homogenous dumps)
>>>
>>> I'm sure this isn't the only or even best way to do it.
>>>
>>> The underlying issue is that you're trying to violate the programming
>>> model.  The model in this case consists of telling the driver what you want
>>> and then having the executors go do it.
>>>
>>> Spark Context is a driver level abstraction, it kind of doesn't make
>>> sense in the executor context, the executor is acting on behalf of the
>>> driver and shouldn't need a back reference to it.  You'd end up with some
>>> interesting execution graphs.
>>>
>>> This is a common pattern in spark as far as I can tell.  IE calling a
>>> map and and then doing something with the items in the executor, either
>>> computing or enriching.  My case above is a bit weird and I'm not certain
>>> it's the right mechanism in that I'm literally taking a manifest file and
>>> turning it into 'n' actual records.
>>>
>>> Also, if you're going to be constructing a connection string / jdbc call
>>> / s3 client... You really don't want to use a straight .map(func).  You'll
>>> end up instantiating a connection on every iteration.
>>>
>>> Hope this is somewhat helpful.
>>>
>>> Gary
>>>
>>> On 21 September 2017 at 06:31, Weichen Xu <we...@databricks.com>
>>> wrote:
>>>
>>>> Spark do not allow executor code using `sparkSession`.
>>>> But I think you can move all json files to one directory, and them run:
>>>>
>>>> ```
>>>> spark.read.json("/path/to/jsonFileDir")
>>>> ```
>>>> But if you want to get filename at the same time, you can use
>>>> ```
>>>> spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")...
>>>> ```
>>>>
>>>> On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari <fe...@gmail.com>
>>>> wrote:
>>>>
>>>>> Depends on your use-case however broadcasting
>>>>> <https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables>
>>>>> could be a better option.
>>>>>
>>>>> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu <
>>>>> chaku.mitcs@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I want to know how to pass sparkSession from driver to executor.
>>>>>>
>>>>>> I have a spark program (batch job) which does following,
>>>>>>
>>>>>> #################
>>>>>>
>>>>>> val spark = SparkSession.builder().appName("SampleJob").config(
>>>>>> "spark.master", "local") .getOrCreate()
>>>>>>
>>>>>> val df = this is dataframe which has list of file names (hdfs)
>>>>>>
>>>>>> df.foreach { fileName =>
>>>>>>
>>>>>>       *spark.read.json(fileName)*
>>>>>>
>>>>>>       ...... some logic here....
>>>>>> }
>>>>>>
>>>>>> #################
>>>>>>
>>>>>>
>>>>>> *spark.read.json(fileName) --- this fails as it runs in executor.
>>>>>> When I put it outside foreach, i.e. in driver, it works.*
>>>>>>
>>>>>> As I am trying to use spark (sparkSession) in executor which is not
>>>>>> visible outside driver. But I want to read hdfs files inside foreach, how
>>>>>> do I do it.
>>>>>>
>>>>>> Can someone help how to do this.
>>>>>>
>>>>>> Thanks,
>>>>>> Chackra
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>