You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tobias Pfeiffer <tg...@preferred.jp> on 2014/11/14 05:58:13 UTC

Communication between Driver and Executors

Hi,

(this is related to my previous question about stopping the
StreamingContext)

is there any way to send a message from the driver to the executors? There
is all this Akka machinery running, so it should be easy to have something
like

  sendToAllExecutors(message)

on the driver and

  handleMessage {
    case _ => ...
  }

on the executors, right? Surely at least for Broadcast.unpersist() such a
thing must exist, so can I use it somehow (dirty way is also ok) to send a
message to my Spark nodes?

Thanks
Tobias

Re: Communication between Driver and Executors

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

so I didn't manage to get the Broadcast variable with a new value
distributed to my executors in YARN mode. In local mode it worked fine, but
when running on YARN either nothing happened (when unpersist() was called
on the driver) or I got a TimeoutException (when called on the executor).
I finally dropped the use of broadcast variables and added a HTTP polling
mechanism from the executors to the driver. I find that a bit suboptimal,
in particular since there is this whole Akka infrastructure already running
and I should be able to just send messages around. However, Spark does not
seem to encourage this. (In general I find that "private" is a bit overused
in the Spark codebase...)

Thanks
Tobias

Re: Communication between Driver and Executors

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi again,

On Mon, Nov 17, 2014 at 8:16 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:
>
> I have been trying to mis-use broadcast as in
> - create a class with a boolean var, set to true
> - query this boolean on the executors as a prerequisite to process the
> next item
> - when I want to shutdown, I set the boolean to false and unpersist the
> broadcast variable (which will trigger re-delivery).
> This is very dirty, but it works with a "local[*]" master. Unfortunately,
> when deployed on YARN, the new value will never arrive at my executors.
>

In fact, it seems as if "change mutable object (like mutable list) and
unpersist in order to trigger redeploy" only works locally. When running on
YARN, even after an unpersist, the value will always be identical to what I
shipped first. Now I wonder what unpersist actually does in that case. Must
I call unpersist from an executor or from the driver?

Thanks
Tobias

Re: Communication between Driver and Executors

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

On Fri, Nov 14, 2014 at 3:20 PM, Mayur Rustagi <ma...@gmail.com>
wrote:

> I wonder if SparkConf is dynamically updated on all worker nodes or only
> during initialization. It can be used to piggyback information.
> Otherwise I guess you are stuck with Broadcast.
> Primarily I have had these issues moving legacy MR operators to Spark
> where MR piggybacks on Hadoop conf pretty  heavily, in spark Native
> application its rarely required. Do you have a usecase like that?
>

My "usecase" is
http://apache-spark-user-list.1001560.n3.nabble.com/StreamingContext-does-not-stop-td18826.html
– that is, notifying my Spark executors that the StreamingContext has been
shut down. (Even with non-graceful shutdown, Spark doesn't seem to end the
actual execution, just all the Spark-internal timers etc.) I need to do
this properly or processing will go on for a very long time.

I have been trying to mis-use broadcast as in
- create a class with a boolean var, set to true
- query this boolean on the executors as a prerequisite to process the next
item
- when I want to shutdown, I set the boolean to false and unpersist the
broadcast variable (which will trigger re-delivery).
This is very dirty, but it works with a "local[*]" master. Unfortunately,
when deployed on YARN, the new value will never arrive at my executors.

Any idea what could go wrong on YARN with this approach – or what is a
"good" way to do this?

Thanks
Tobias

Re: Communication between Driver and Executors

Posted by Mayur Rustagi <ma...@gmail.com>.
I wonder if SparkConf is dynamically updated on all worker nodes or only
during initialization. It can be used to piggyback information.
Otherwise I guess you are stuck with Broadcast.
Primarily I have had these issues moving legacy MR operators to Spark where
MR piggybacks on Hadoop conf pretty  heavily, in spark Native application
its rarely required. Do you have a usecase like that?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>


On Fri, Nov 14, 2014 at 10:28 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Hi,
>
> (this is related to my previous question about stopping the
> StreamingContext)
>
> is there any way to send a message from the driver to the executors? There
> is all this Akka machinery running, so it should be easy to have something
> like
>
>   sendToAllExecutors(message)
>
> on the driver and
>
>   handleMessage {
>     case _ => ...
>   }
>
> on the executors, right? Surely at least for Broadcast.unpersist() such a
> thing must exist, so can I use it somehow (dirty way is also ok) to send a
> message to my Spark nodes?
>
> Thanks
> Tobias
>