You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jean-Pascal Billaud <jp...@tellapart.com> on 2015/03/07 02:32:00 UTC

Spark streaming and executor object reusage

Hi,

Reading through the Spark Streaming Programming Guide, I read in the
"Design Patterns for using foreachRDD":

"Finally, this can be further optimized by reusing connection objects
across multiple RDDs/batches.
One can maintain a static pool of connection objects than can be reused as
RDDs of multiple batches are pushed to the external system"

I have this connection pool that might be more or less heavy to
instantiate. I don't use it as part of a foreachRDD but as part of regular
map operations to query some api service. I'd like to understand what
"multiple batches" means here. Is this across RDDs on a single DStream?
Across multiple DStreams?

I'd like to understand what's the context sharability across DStreams over
time. Is it expected that the executor initializing my Factory will keep
getting batches from my streaming job while using the same singleton
connection pool over and over? Or Spark resets executors states after each
DStream is completed to allocated executors to other streaming job
potentially?

Thanks,

Re: Spark streaming and executor object reusage

Posted by Jean-Pascal Billaud <jp...@tellapart.com>.
Thanks a lot.

Sent from my iPad

> On Mar 7, 2015, at 8:26 AM, Sean Owen <so...@cloudera.com> wrote:
> 
>> On Sat, Mar 7, 2015 at 4:17 PM, Jean-Pascal Billaud <jp...@tellapart.com> wrote:
>> So given this let's go a bit further. Imagine my static factory provides a stats collector that my various map() code would use to export some metrics while mapping tuples. This stats collector comes with a timer that flush the stats buffer once every 5mn for instance. Given that executor JVM should not be reinitialized over the life of the DStream in practice, it is totally reasonable to assume that this timer will be able to do his job. Right?
> 
> Ignoring failures, the executors should not be restarted over the life
> of the streaming app, yes. You will have, potentially, many executors
> at once of course.
> 
>> Given what you said earlier that totally makes sense. In general is there any spark architecture documentation other than the code that gives a good overview of the thing we talked about?
> 
> http://spark.apache.org/docs/latest/cluster-overview.html
> 
> This is entirely applicable to streaming too.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark streaming and executor object reusage

Posted by Sean Owen <so...@cloudera.com>.
On Sat, Mar 7, 2015 at 4:17 PM, Jean-Pascal Billaud <jp...@tellapart.com> wrote:
> So given this let's go a bit further. Imagine my static factory provides a stats collector that my various map() code would use to export some metrics while mapping tuples. This stats collector comes with a timer that flush the stats buffer once every 5mn for instance. Given that executor JVM should not be reinitialized over the life of the DStream in practice, it is totally reasonable to assume that this timer will be able to do his job. Right?

Ignoring failures, the executors should not be restarted over the life
of the streaming app, yes. You will have, potentially, many executors
at once of course.

> Given what you said earlier that totally makes sense. In general is there any spark architecture documentation other than the code that gives a good overview of the thing we talked about?

http://spark.apache.org/docs/latest/cluster-overview.html

This is entirely applicable to streaming too.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark streaming and executor object reusage

Posted by Jean-Pascal Billaud <jp...@tellapart.com>.
Thanks Sean this is really helpful. Please see comments line.

Sent from my iPad

> On Mar 7, 2015, at 4:45 AM, Sean Owen <so...@cloudera.com> wrote:
> 
> In the example with "createNewConnection()", a connection is created
> for every partition of every batch of input. You could take the idea
> further and share connections across partitions or batches. This
> requires them to have a lifecycle beyond foreachRDD. That's
> accomplishable with some kind of static / singleton connection,
> presumably connection pool.

Precisely. I have this static factory class that provides singleton connection for instance among other objects.

> 
> The pool would be per JVM, which means per executor. Although you're
> not guaranteed that this same executor would process many partitions
> of an RDD, or process a number of batches over time, in practice, both
> are true. So a pool can effectively be shared across partitions and
> batches.
> 
> Spark has no way to police, and therefore can't and doesn't, reset any
> state that you happen to create and use in your code.

So as long as an executor JVM is initialized once with my JAR then clearly my singleton will be created once over time. Obviously I am not considering failover scenario etc...

So given this let's go a bit further. Imagine my static factory provides a stats collector that my various map() code would use to export some metrics while mapping tuples. This stats collector comes with a timer that flush the stats buffer once every 5mn for instance. Given that executor JVM should not be reinitialized over the life of the DStream in practice, it is totally reasonable to assume that this timer will be able to do his job. Right?

> 
> An executor is per app though so would not be shared with another
> streaming job, no.

Given what you said earlier that totally makes sense. In general is there any spark architecture documentation other than the code that gives a good overview of the thing we talked about?

Thanks again for your help,

> 
>> On Sat, Mar 7, 2015 at 1:32 AM, Jean-Pascal Billaud <jp...@tellapart.com> wrote:
>> Hi,
>> 
>> Reading through the Spark Streaming Programming Guide, I read in the "Design
>> Patterns for using foreachRDD":
>> 
>> "Finally, this can be further optimized by reusing connection objects across
>> multiple RDDs/batches.
>> One can maintain a static pool of connection objects than can be reused as
>> RDDs of multiple batches are pushed to the external system"
>> 
>> I have this connection pool that might be more or less heavy to instantiate.
>> I don't use it as part of a foreachRDD but as part of regular map operations
>> to query some api service. I'd like to understand what "multiple batches"
>> means here. Is this across RDDs on a single DStream? Across multiple
>> DStreams?
>> 
>> I'd like to understand what's the context sharability across DStreams over
>> time. Is it expected that the executor initializing my Factory will keep
>> getting batches from my streaming job while using the same singleton
>> connection pool over and over? Or Spark resets executors states after each
>> DStream is completed to allocated executors to other streaming job
>> potentially?
>> 
>> Thanks,

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark streaming and executor object reusage

Posted by Sean Owen <so...@cloudera.com>.
In the example with "createNewConnection()", a connection is created
for every partition of every batch of input. You could take the idea
further and share connections across partitions or batches. This
requires them to have a lifecycle beyond foreachRDD. That's
accomplishable with some kind of static / singleton connection,
presumably connection pool.

The pool would be per JVM, which means per executor. Although you're
not guaranteed that this same executor would process many partitions
of an RDD, or process a number of batches over time, in practice, both
are true. So a pool can effectively be shared across partitions and
batches.

Spark has no way to police, and therefore can't and doesn't, reset any
state that you happen to create and use in your code.

An executor is per app though so would not be shared with another
streaming job, no.

On Sat, Mar 7, 2015 at 1:32 AM, Jean-Pascal Billaud <jp...@tellapart.com> wrote:
> Hi,
>
> Reading through the Spark Streaming Programming Guide, I read in the "Design
> Patterns for using foreachRDD":
>
> "Finally, this can be further optimized by reusing connection objects across
> multiple RDDs/batches.
> One can maintain a static pool of connection objects than can be reused as
> RDDs of multiple batches are pushed to the external system"
>
> I have this connection pool that might be more or less heavy to instantiate.
> I don't use it as part of a foreachRDD but as part of regular map operations
> to query some api service. I'd like to understand what "multiple batches"
> means here. Is this across RDDs on a single DStream? Across multiple
> DStreams?
>
> I'd like to understand what's the context sharability across DStreams over
> time. Is it expected that the executor initializing my Factory will keep
> getting batches from my streaming job while using the same singleton
> connection pool over and over? Or Spark resets executors states after each
> DStream is completed to allocated executors to other streaming job
> potentially?
>
> Thanks,

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org