You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alexandre Rodrigues <al...@gmail.com> on 2015/07/02 18:32:18 UTC

Fwd: map vs foreach for sending data to external system

Hi Spark devs,

I'm coding a spark job and at a certain point in execution I need to send
some data present in an RDD to an external system.

val myRdd = ....

myRdd.foreach { record =>
  sendToWhtv(record)
}

The thing is that foreach forces materialization of the RDD and it seems to
be executed on the driver program, which is not very benefitial in my case.
So I changed the logic to a Map (mapWithParititons, but it's the same).

val newRdd = myRdd.map { record =>
  sendToWhtv(record)
}
newRdd.count()

My understanding is that map is a transformation operation and then I have
to force materialization by invoking some action (such as count). Is this
the correct way to do this kind of distributed foreach or is there any
other function to achieve this that doesn't necessarily imply a data
transformation or a returned RDD ?


Thanks,
Alex

Re: map vs foreach for sending data to external system

Posted by Alexandre Rodrigues <al...@gmail.com>.
What I'm doing in the RDD is parsing a text file and sending things to the
external system.. I guess that it does that immediately when the action
(count) is triggered instead of being a two step process.

So I guess I should have parsing logic + sending to external system inside
the foreach (with partitions) instead of transforming things into a case
class and then applying a foreach to the RDD[MyCaseClass].

Thanks,
Alex

On Thu, Jul 2, 2015 at 6:07 PM, Eugen Cepoi <ce...@gmail.com> wrote:

> Heh, an actions or materializaiton, means that it will trigger the
> computation over the RDD. A transformation like map, means that it will
> create the transformation chain that must be applied on the data, but it is
> actually not executed. It is executed only when an action is triggered over
> that RDD. That's why you have the impression the map is so fast, actually
> it doesn't do anything :)
>
> 2015-07-02 18:59 GMT+02:00 Alexandre Rodrigues <
> alex.jose.rodrigues@gmail.com>:
>
>> Foreach is listed as an action[1]. I guess an *action* just means that it
>> forces materialization of the RDD.
>>
>> I just noticed much faster executions with map although I don't like the
>> map approach. I'll look at it with new eyes if foreach is the way to go.
>>
>> [1] – https://spark.apache.org/docs/latest/programming-guide.html#actions
>>
>> Thanks guys!
>>
>>
>>
>>
>> --
>> Alexandre Rodrigues
>>
>> On Thu, Jul 2, 2015 at 5:37 PM, Eugen Cepoi <ce...@gmail.com>
>> wrote:
>>
>>>
>>>
>>> *"The thing is that foreach forces materialization of the RDD and it
>>> seems to be executed on the driver program"*
>>> What makes you think that? No, foreach is run in the executors
>>> (distributed) and not in the driver.
>>>
>>> 2015-07-02 18:32 GMT+02:00 Alexandre Rodrigues <
>>> alex.jose.rodrigues@gmail.com>:
>>>
>>>> Hi Spark devs,
>>>>
>>>> I'm coding a spark job and at a certain point in execution I need to
>>>> send some data present in an RDD to an external system.
>>>>
>>>> val myRdd = ....
>>>>
>>>> myRdd.foreach { record =>
>>>>   sendToWhtv(record)
>>>> }
>>>>
>>>> The thing is that foreach forces materialization of the RDD and it
>>>> seems to be executed on the driver program, which is not very benefitial in
>>>> my case. So I changed the logic to a Map (mapWithParititons, but it's the
>>>> same).
>>>>
>>>> val newRdd = myRdd.map { record =>
>>>>   sendToWhtv(record)
>>>> }
>>>> newRdd.count()
>>>>
>>>> My understanding is that map is a transformation operation and then I
>>>> have to force materialization by invoking some action (such as count). Is
>>>> this the correct way to do this kind of distributed foreach or is there any
>>>> other function to achieve this that doesn't necessarily imply a data
>>>> transformation or a returned RDD ?
>>>>
>>>>
>>>> Thanks,
>>>> Alex
>>>>
>>>>
>>>
>>
>

Re: map vs foreach for sending data to external system

Posted by Eugen Cepoi <ce...@gmail.com>.
Heh, an actions or materializaiton, means that it will trigger the
computation over the RDD. A transformation like map, means that it will
create the transformation chain that must be applied on the data, but it is
actually not executed. It is executed only when an action is triggered over
that RDD. That's why you have the impression the map is so fast, actually
it doesn't do anything :)

2015-07-02 18:59 GMT+02:00 Alexandre Rodrigues <
alex.jose.rodrigues@gmail.com>:

> Foreach is listed as an action[1]. I guess an *action* just means that it
> forces materialization of the RDD.
>
> I just noticed much faster executions with map although I don't like the
> map approach. I'll look at it with new eyes if foreach is the way to go.
>
> [1] – https://spark.apache.org/docs/latest/programming-guide.html#actions
>
> Thanks guys!
>
>
>
>
> --
> Alexandre Rodrigues
>
> On Thu, Jul 2, 2015 at 5:37 PM, Eugen Cepoi <ce...@gmail.com> wrote:
>
>>
>>
>> *"The thing is that foreach forces materialization of the RDD and it
>> seems to be executed on the driver program"*
>> What makes you think that? No, foreach is run in the executors
>> (distributed) and not in the driver.
>>
>> 2015-07-02 18:32 GMT+02:00 Alexandre Rodrigues <
>> alex.jose.rodrigues@gmail.com>:
>>
>>> Hi Spark devs,
>>>
>>> I'm coding a spark job and at a certain point in execution I need to
>>> send some data present in an RDD to an external system.
>>>
>>> val myRdd = ....
>>>
>>> myRdd.foreach { record =>
>>>   sendToWhtv(record)
>>> }
>>>
>>> The thing is that foreach forces materialization of the RDD and it seems
>>> to be executed on the driver program, which is not very benefitial in my
>>> case. So I changed the logic to a Map (mapWithParititons, but it's the
>>> same).
>>>
>>> val newRdd = myRdd.map { record =>
>>>   sendToWhtv(record)
>>> }
>>> newRdd.count()
>>>
>>> My understanding is that map is a transformation operation and then I
>>> have to force materialization by invoking some action (such as count). Is
>>> this the correct way to do this kind of distributed foreach or is there any
>>> other function to achieve this that doesn't necessarily imply a data
>>> transformation or a returned RDD ?
>>>
>>>
>>> Thanks,
>>> Alex
>>>
>>>
>>
>

Re: map vs foreach for sending data to external system

Posted by Alexandre Rodrigues <al...@gmail.com>.
Foreach is listed as an action[1]. I guess an *action* just means that it
forces materialization of the RDD.

I just noticed much faster executions with map although I don't like the
map approach. I'll look at it with new eyes if foreach is the way to go.

[1] – https://spark.apache.org/docs/latest/programming-guide.html#actions

Thanks guys!




--
Alexandre Rodrigues

On Thu, Jul 2, 2015 at 5:37 PM, Eugen Cepoi <ce...@gmail.com> wrote:

>
>
> *"The thing is that foreach forces materialization of the RDD and it seems
> to be executed on the driver program"*
> What makes you think that? No, foreach is run in the executors
> (distributed) and not in the driver.
>
> 2015-07-02 18:32 GMT+02:00 Alexandre Rodrigues <
> alex.jose.rodrigues@gmail.com>:
>
>> Hi Spark devs,
>>
>> I'm coding a spark job and at a certain point in execution I need to send
>> some data present in an RDD to an external system.
>>
>> val myRdd = ....
>>
>> myRdd.foreach { record =>
>>   sendToWhtv(record)
>> }
>>
>> The thing is that foreach forces materialization of the RDD and it seems
>> to be executed on the driver program, which is not very benefitial in my
>> case. So I changed the logic to a Map (mapWithParititons, but it's the
>> same).
>>
>> val newRdd = myRdd.map { record =>
>>   sendToWhtv(record)
>> }
>> newRdd.count()
>>
>> My understanding is that map is a transformation operation and then I
>> have to force materialization by invoking some action (such as count). Is
>> this the correct way to do this kind of distributed foreach or is there any
>> other function to achieve this that doesn't necessarily imply a data
>> transformation or a returned RDD ?
>>
>>
>> Thanks,
>> Alex
>>
>>
>

Re: map vs foreach for sending data to external system

Posted by Eugen Cepoi <ce...@gmail.com>.
*"The thing is that foreach forces materialization of the RDD and it seems
to be executed on the driver program"*
What makes you think that? No, foreach is run in the executors
(distributed) and not in the driver.

2015-07-02 18:32 GMT+02:00 Alexandre Rodrigues <
alex.jose.rodrigues@gmail.com>:

> Hi Spark devs,
>
> I'm coding a spark job and at a certain point in execution I need to send
> some data present in an RDD to an external system.
>
> val myRdd = ....
>
> myRdd.foreach { record =>
>   sendToWhtv(record)
> }
>
> The thing is that foreach forces materialization of the RDD and it seems
> to be executed on the driver program, which is not very benefitial in my
> case. So I changed the logic to a Map (mapWithParititons, but it's the
> same).
>
> val newRdd = myRdd.map { record =>
>   sendToWhtv(record)
> }
> newRdd.count()
>
> My understanding is that map is a transformation operation and then I have
> to force materialization by invoking some action (such as count). Is this
> the correct way to do this kind of distributed foreach or is there any
> other function to achieve this that doesn't necessarily imply a data
> transformation or a returned RDD ?
>
>
> Thanks,
> Alex
>
>

Re: map vs foreach for sending data to external system

Posted by Silvio Fiorito <si...@granturing.com>.
foreach absolutely runs on the executors. For sending data to an external system you should likely use foreachPartition in order to batch the output. Also if you want to limit the parallelism of the output action then you can use coalesce.

What makes you think foreach is running on the driver?

From: Alexandre Rodrigues
Date: Thursday, July 2, 2015 at 12:32 PM
To: "user@spark.apache.org<ma...@spark.apache.org>"
Subject: Fwd: map vs foreach for sending data to external system

Hi Spark devs,

I'm coding a spark job and at a certain point in execution I need to send some data present in an RDD to an external system.

val myRdd = ....

myRdd.foreach { record =>
  sendToWhtv(record)
}

The thing is that foreach forces materialization of the RDD and it seems to be executed on the driver program, which is not very benefitial in my case. So I changed the logic to a Map (mapWithParititons, but it's the same).

val newRdd = myRdd.map { record =>
  sendToWhtv(record)
}
newRdd.count()

My understanding is that map is a transformation operation and then I have to force materialization by invoking some action (such as count). Is this the correct way to do this kind of distributed foreach or is there any other function to achieve this that doesn't necessarily imply a data transformation or a returned RDD ?


Thanks,
Alex