You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Neil Ferguson <nf...@gmail.com> on 2014/07/21 23:10:25 UTC

"Dynamic variables" in Spark

Hi all

I have been adding some metrics to the ADAM project
https://github.com/bigdatagenomics/adam, which runs on Spark, and have a
proposal for an enhancement to Spark that would make this work cleaner and
easier.

I need to pass some Accumulators around, which will aggregate metrics
(timing stats and other metrics) across the cluster. However, it is
cumbersome to have to explicitly pass some "context" containing these
accumulators around everywhere that might need them. I can use Scala
implicits, which help slightly, but I'd still need to modify every method
in the call stack to take an implicit variable.

So, I'd like to propose that we add the ability to have "dynamic variables"
(basically thread-local variables) to Spark. This would avoid having to
pass the Accumulators around explicitly.

My proposed approach is to add a method to the SparkContext class as
follows:

/**
 * Sets the value of a "dynamic variable". This value is made available to
jobs
 * without having to be passed around explicitly. During execution of a
Spark job
 * this value can be obtained from the [[SparkDynamic]] object.
 */
def setDynamicVariableValue(value: Any)

Then, when a job is executing the SparkDynamic can be accessed to obtain
the value of the dynamic variable. The implementation of this object is as
follows:

object SparkDynamic {
  private val dynamicVariable = new DynamicVariable[Any]()
  /**
   * Gets the value of the "dynamic variable" that has been set in the
[[SparkContext]]
   */
  def getValue: Option[Any] = {
    Option(dynamicVariable.value)
  }
  private[spark] def withValue[S](threadValue: Option[Any])(thunk: => S): S
= {
    dynamicVariable.withValue(threadValue.orNull)(thunk)
  }
}

The change involves modifying the Task object to serialize the value of the
dynamic variable, and modifying the TaskRunner class to deserialize the
value and make it available in the thread that is running the task (using
the SparkDynamic.withValue method).

I have done a quick prototype of this in this commit:
https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
and it seems to work fine in my (limited) testing. It needs more testing,
tidy-up and documentation though.

One drawback is that the dynamic variable will be serialized for every Task
whether it needs it or not. For my use case this might not be too much of a
problem, as serializing and deserializing Accumulators looks fairly
lightweight -- however we should certainly warn users against setting a
dynamic variable containing lots of data. I thought about using broadcast
tables here, but I don't think it's possible to put Accumulators in a
broadcast table (as I understand it, they're intended for purely read-only
data).

What do people think about this proposal? My use case aside, it seems like
it would be a generally useful enhancment to be able to pass certain data
around without having to explicitly pass it everywhere.

Neil

Re: "Dynamic variables" in Spark

Posted by Neil Ferguson <nf...@gmail.com>.
Hi Christopher

Thanks for your reply. I'll try and address your points -- please let me
know if I missed anything.

Regarding clarifying the problem statement, let me try and do that with a
real-world example. I have a method that I want to measure the performance
of, which has the following signature at the moment:

def merge(target: IndelRealignmentTarget): IndelRealignmentTarget = { //
impl }

Let's assume I have a Timers class, which contains various timers (that are
internally implemented using Accumulators). Each timer lets me instrument a
function call using its "time" method. Let's imagine that I add that as an
implicit parameter to the above method, as follows:

def merge(target: IndelRealignmentTarget)(implicit timers: Timers):
IndelRealignmentTarget = timers.mergeTarget.time { // impl }

This is not a big problem -- I've had to add an extra parameter to the
method, but it's not a big deal. However, the call stack of this method
looks something like the following:

IndelRealignmentTarget.merge
|-RealignmentTargetFinder.joinTargets
  |-RealignmentTargetFinder.findTargets
    |-RealignmentTargetFinder$.apply
      |-RealignIndels.realignIndels
        |-RealignIndels$.apply
          |-ADAMRecordRDDFunctions.adamRealignIndels
            |-Transform.run

So, I'd have to change every one of these methods to take the extra
parameter, which is pretty cumbersome. More importantly, when developers
want to add additional metrics to the code they'll have to think about how
to get an instance of Timers to the code they're developing.

So I'd really like the Timers object to be available in a thread-local
variable when I need it, without having to pass it around.

Regarding the implications of adding additional variables to SparkContext
-- I'm not sure I understand why this change would make it more difficult
to have multiple SparkContexts in the future. Could you clarify please?
Bear in mind that I'm not proposing adding any thread-local data to
SparkContext. The SparkContext merely holds the data, which is added to a
thread-local variable at task execution time.

Regarding having multiple clients of the SparkContext -- are you talking
about having multiple applications all sharing the same SparkContext? It
seems like there's data in SparkContext that is specific to a particular
application at the moment, like the JAR files for example, so this doesn't
seem inconsistent with that. Perhaps I'm misunderstanding here.

Neil


On Tue, Jul 22, 2014 at 1:54 AM, Christopher Nguyen <ct...@adatao.com> wrote:

> Hi Neil, first off, I'm generally a sympathetic advocate for making changes
> to Spark internals to make it easier/better/faster/more awesome.
>
> In this case, I'm (a) not clear about what you're trying to accomplish, and
> (b) a bit worried about the proposed solution.
>
> On (a): it is stated that you want to pass some Accumulators around. Yet
> the proposed solution is for some "shared" variable that may be set and
> "mapped out" and possibly "reduced back", but without any accompanying
> accumulation semantics. And yet it doesn't seem like you only want just the
> broadcast property. Can you clarify the problem statement with some
> before/after client code examples?
>
> On (b): you're right that adding variables to SparkContext should be done
> with caution, as it may have unintended consequences beyond just serdes
> payload size. For example, there is a stated intention of supporting
> multiple SparkContexts in the future, and this proposed solution can make
> it a bigger challenge to do so. Indeed, we had a gut-wrenching call to make
> a while back on a subject related to this (see
> https://github.com/mesos/spark/pull/779). Furthermore, even in a single
> SparkContext application, there may be multiple "clients" (of that
> application) whose intent to use the proposed "SparkDynamic" would not
> necessarily be coordinated.
>
> So, considering a ratio of a/b (benefit/cost), it's not clear to me that
> the benefits are significant enough to warrant the costs. Do I
> misunderstand that the benefit is to save one explicit parameter (the
> "context") in the signature/closure code?
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nf...@gmail.com>
> wrote:
>
> > Hi all
> >
> > I have been adding some metrics to the ADAM project
> > https://github.com/bigdatagenomics/adam, which runs on Spark, and have a
> > proposal for an enhancement to Spark that would make this work cleaner
> and
> > easier.
> >
> > I need to pass some Accumulators around, which will aggregate metrics
> > (timing stats and other metrics) across the cluster. However, it is
> > cumbersome to have to explicitly pass some "context" containing these
> > accumulators around everywhere that might need them. I can use Scala
> > implicits, which help slightly, but I'd still need to modify every method
> > in the call stack to take an implicit variable.
> >
> > So, I'd like to propose that we add the ability to have "dynamic
> variables"
> > (basically thread-local variables) to Spark. This would avoid having to
> > pass the Accumulators around explicitly.
> >
> > My proposed approach is to add a method to the SparkContext class as
> > follows:
> >
> > /**
> >  * Sets the value of a "dynamic variable". This value is made available
> to
> > jobs
> >  * without having to be passed around explicitly. During execution of a
> > Spark job
> >  * this value can be obtained from the [[SparkDynamic]] object.
> >  */
> > def setDynamicVariableValue(value: Any)
> >
> > Then, when a job is executing the SparkDynamic can be accessed to obtain
> > the value of the dynamic variable. The implementation of this object is
> as
> > follows:
> >
> > object SparkDynamic {
> >   private val dynamicVariable = new DynamicVariable[Any]()
> >   /**
> >    * Gets the value of the "dynamic variable" that has been set in the
> > [[SparkContext]]
> >    */
> >   def getValue: Option[Any] = {
> >     Option(dynamicVariable.value)
> >   }
> >   private[spark] def withValue[S](threadValue: Option[Any])(thunk: =>
> S): S
> > = {
> >     dynamicVariable.withValue(threadValue.orNull)(thunk)
> >   }
> > }
> >
> > The change involves modifying the Task object to serialize the value of
> the
> > dynamic variable, and modifying the TaskRunner class to deserialize the
> > value and make it available in the thread that is running the task (using
> > the SparkDynamic.withValue method).
> >
> > I have done a quick prototype of this in this commit:
> >
> >
> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
> > and it seems to work fine in my (limited) testing. It needs more testing,
> > tidy-up and documentation though.
> >
> > One drawback is that the dynamic variable will be serialized for every
> Task
> > whether it needs it or not. For my use case this might not be too much
> of a
> > problem, as serializing and deserializing Accumulators looks fairly
> > lightweight -- however we should certainly warn users against setting a
> > dynamic variable containing lots of data. I thought about using broadcast
> > tables here, but I don't think it's possible to put Accumulators in a
> > broadcast table (as I understand it, they're intended for purely
> read-only
> > data).
> >
> > What do people think about this proposal? My use case aside, it seems
> like
> > it would be a generally useful enhancment to be able to pass certain data
> > around without having to explicitly pass it everywhere.
> >
> > Neil
> >
>

Re: "Dynamic variables" in Spark

Posted by Neil Ferguson <nf...@gmail.com>.
I've opened SPARK-3051 (https://issues.apache.org/jira/browse/SPARK-3051)
based on this thread.

Neil


On Thu, Jul 24, 2014 at 10:30 PM, Neil Ferguson <nf...@gmail.com> wrote:

> That would work well for me! Do you think it would be necessary to specify
> which accumulators should be available in the registry, or would we just
> broadcast all named accumulators registered in SparkContext and make them
> available in the registry?
>
> Anyway, I'm happy to make the necessary changes (unless someone else wants
> to).
>
>
> On Thu, Jul 24, 2014 at 10:17 PM, Patrick Wendell <pw...@gmail.com>
> wrote:
>
>> What if we have a registry for accumulators, where you can access them
>> statically by name?
>>
>> - Patrick
>>
>> On Thu, Jul 24, 2014 at 1:51 PM, Neil Ferguson <nf...@gmail.com>
>> wrote:
>> > I realised that my last reply wasn't very clear -- let me try and
>> clarify.
>> >
>> > The patch for named accumulators looks very useful, however in
>> Shivaram's
>> > example he was able to retrieve the named task metrics (statically)
>> from a
>> > TaskMetrics object, as follows:
>> >
>> > TaskMetrics.get("f1-time")
>> >
>> > However, I don't think this would be possible with the named
>> accumulators
>> > -- I believe they'd need to be passed to every function that needs
>> them,
>> > which I think would be cumbersome in any application of reasonable
>> > complexity.
>> >
>> > This is what I was trying to solve with my proposal for dynamic
>> variables
>> > in Spark. However, the ability to retrieve named accumulators from a
>> > thread-local would work just as well for my use case. I'd be happy to
>> > implement either solution if there's interest.
>> >
>> > Alternatively, if I'm missing some other way to accomplish this please
>> let
>> > me know.
>> >
>> > On a (slight) aside, I now think it would be possible to implement
>> dynamic
>> > variables by broadcasting them. I was looking at Reynold's PR [1] to
>> > broadcast the RDD object, and I think it would be possible to take a
>> > similar approach -- that is, broadcast the serialized form, and
>> deserialize
>> > when executing each task.
>> >
>> > [1] https://github.com/apache/spark/pull/1498
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Wed, Jul 23, 2014 at 8:30 AM, Neil Ferguson <nf...@gmail.com>
>> wrote:
>> >
>> >> Hi Patrick.
>> >>
>> >> That looks very useful. The thing that seems to be missing from
>> Shivaram's
>> >> example is the ability to access TaskMetrics statically (this is the
>> same
>> >> problem that I am trying to solve with dynamic variables).
>> >>
>> >>
>> >>
>> >> You mention defining an accumulator on the RDD. Perhaps I am missing
>> >> something here, but my understanding was that accumulators are defined
>> in
>> >> SparkContext and are not part of the RDD. Is that correct?
>> >>
>> >> Neil
>> >>
>> >> On Tue, Jul 22, 2014 at 22:21, Patrick Wendell <pwendell@gmail.com
>> >> ="mailto:pwendell@gmail.com">> wrote:
>> >>
>> >>> Shivaram,
>> >>>
>> >>> You should take a look at this patch which adds support for naming
>> >>> accumulators - this is likely to get merged in soon. I actually
>> >>> started this patch by supporting named TaskMetrics similar to what
>> you
>> >>> have there, but then I realized there is too much semantic overlap
>> >>> with accumulators, so I just went that route.
>> >>>
>> >>> For instance, it would be nice if any user-defined metrics are
>> >>> accessible at the driver program.
>> >>>
>> >>> https://github.com/apache/spark/pull/1309
>> >>>
>> >>> In your example, you could just define an accumulator here on the RDD
>> >>> and you'd see the incremental update in the web UI automatically.
>> >>>
>> >>> - Patrick
>> >>>
>> >>> On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman
>> >>> <sh...@eecs.berkeley.edu> wrote:
>> >>> > From reading Neil's first e-mail, I think the motivation is to get
>> some
>> >>> > metrics in ADAM ? -- I've run into a similar use-case with having
>> >>> > user-defined metrics in long-running tasks and I think a nice way
>> to
>> >>> solve
>> >>> > this would be to have user-defined TaskMetrics.
>> >>> >
>> >>> > To state my problem more clearly, lets say you have two functions
>> you
>> >>> use
>> >>> > in a map call and want to measure how much time each of them takes.
>> For
>> >>> > example, if you have a code block like the one below and you want
>> to
>> >>> > measure how much time f1 takes as a fraction of the task.
>> >>> >
>> >>> > a.map { l =>
>> >>> > val f = f1(l)
>> >>> > ... some work here ...
>> >>> > }
>> >>> >
>> >>> > It would be really cool if we could do something like
>> >>> >
>> >>> > a.map { l =>
>> >>> > val start = System.nanoTime
>> >>> > val f = f1(l)
>> >>> > TaskMetrics.get("f1-time").add(System.nanoTime - start)
>> >>> > }
>> >>> >
>> >>> > These task metrics have a different purpose from Accumulators in
>> the
>> >>> sense
>> >>> > that we don't need to track lineage, perform commutative operations
>> >>> etc.
>> >>> > Further we also have a bunch of code in place to aggregate task
>> metrics
>> >>> > across a stage etc. So it would be great if we could also populate
>> >>> these in
>> >>> > the UI and show median/max etc.
>> >>> > I think counters [1] in Hadoop served a similar purpose.
>> >>> >
>> >>> > Thanks
>> >>> > Shivaram
>> >>> >
>> >>> > [1]
>> >>> >
>> >>>
>> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters
>> >>> >
>> >>> >
>> >>> >
>> >>> > On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nf...@gmail.com>
>>
>> >>> wrote:
>> >>> >
>> >>> >> Hi Reynold
>> >>> >>
>> >>> >> Thanks for your reply.
>> >>> >>
>> >>> >> Accumulators are, of course, stored in the Accumulators object as
>> >>> >> thread-local variables. However, the Accumulators object isn't
>> public,
>> >>> so
>> >>> >> when a Task is executing there's no way to get the set of
>> accumulators
>> >>> for
>> >>> >> the current thread -- accumulators still have to be passed to
>> every
>> >>> method
>> >>> >> that needs them.
>> >>> >>
>> >>> >> Additionally, unless an accumulator is explicitly referenced it
>> won't
>> >>> be
>> >>> >> serialized as part of a Task, and won't make it into the
>> Accumulators
>> >>> >> object in the first place.
>> >>> >>
>> >>> >> I should also note that what I'm proposing is not specific to
>> >>> Accumulators
>> >>> >> -- I am proposing that any data can be stored in a thread-local
>> >>> variable. I
>> >>> >> think there are probably many other use cases other than my one.
>> >>> >>
>> >>> >> Neil
>> >>> >>
>> >>> >>
>> >>> >> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <rx...@databricks.com>
>>
>> >>> wrote:
>> >>> >>
>> >>> >> > Thanks for the thoughtful email, Neil and Christopher.
>> >>> >> >
>> >>> >> > If I understand this correctly, it seems like the dynamic
>> variable
>> >>> is
>> >>> >> just
>> >>> >> > a variant of the accumulator (a static one since it is a global
>> >>> object).
>> >>> >> > Accumulators are already implemented using thread-local
>> variables
>> >>> under
>> >>> >> the
>> >>> >> > hood. Am I misunderstanding something?
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <
>> ctn@adatao.com>
>> >>>
>> >>> >> > wrote:
>> >>> >> >
>> >>> >> > > Hi Neil, first off, I'm generally a sympathetic advocate for
>> >>> making
>> >>> >> > changes
>> >>> >> > > to Spark internals to make it easier/better/faster/more
>> awesome.
>> >>> >> > >
>> >>> >> > > In this case, I'm (a) not clear about what you're trying to
>> >>> accomplish,
>> >>> >> > and
>> >>> >> > > (b) a bit worried about the proposed solution.
>> >>> >> > >
>> >>> >> > > On (a): it is stated that you want to pass some Accumulators
>> >>> around.
>> >>> >> Yet
>> >>> >> > > the proposed solution is for some "shared" variable that may
>> be
>> >>> set and
>> >>> >> > > "mapped out" and possibly "reduced back", but without any
>> >>> accompanying
>> >>> >> > > accumulation semantics. And yet it doesn't seem like you only
>> want
>> >>> just
>> >>> >> > the
>> >>> >> > > broadcast property. Can you clarify the problem statement with
>> >>> some
>> >>> >> > > before/after client code examples?
>> >>> >> > >
>> >>> >> > > On (b): you're right that adding variables to SparkContext
>> should
>> >>> be
>> >>> >> done
>> >>> >> > > with caution, as it may have unintended consequences beyond
>> just
>> >>> serdes
>> >>> >> > > payload size. For example, there is a stated intention of
>> >>> supporting
>> >>> >> > > multiple SparkContexts in the future, and this proposed
>> solution
>> >>> can
>> >>> >> make
>> >>> >> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching
>> >>> call to
>> >>> >> > make
>> >>> >> > > a while back on a subject related to this (see
>> >>> >> > > https://github.com/mesos/spark/pull/779). Furthermore, even
>> in a
>> >>> >> single
>> >>> >> > > SparkContext application, there may be multiple "clients" (of
>> that
>> >>> >> > > application) whose intent to use the proposed "SparkDynamic"
>> would
>> >>> not
>> >>> >> > > necessarily be coordinated.
>> >>> >> > >
>> >>> >> > > So, considering a ratio of a/b (benefit/cost), it's not clear
>> to
>> >>> me
>> >>> >> that
>> >>> >> > > the benefits are significant enough to warrant the costs. Do I
>> >>> >> > > misunderstand that the benefit is to save one explicit
>> parameter
>> >>> (the
>> >>> >> > > "context") in the signature/closure code?
>> >>> >> > >
>> >>> >> > > --
>> >>> >> > > Christopher T. Nguyen
>> >>> >> > > Co-founder & CEO, Adatao <http://adatao.com>
>> >>> >> > > linkedin.com/in/ctnguyen
>> >>> >> > >
>> >>> >> > >
>> >>> >> > >
>> >>> >> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <
>> >>> nferguson@gmail.com>
>> >>> >> > > wrote:
>> >>> >> > >
>> >>> >> > > > Hi all
>> >>> >> > > >
>> >>> >> > > > I have been adding some metrics to the ADAM project
>> >>> >> > > > https://github.com/bigdatagenomics/adam, which runs on
>> Spark,
>> >>> and
>> >>> >> > have a
>> >>> >> > > > proposal for an enhancement to Spark that would make this
>> work
>> >>> >> cleaner
>> >>> >> > > and
>> >>> >> > > > easier.
>> >>> >> > > >
>> >>> >> > > > I need to pass some Accumulators around, which will
>> aggregate
>> >>> metrics
>> >>> >> > > > (timing stats and other metrics) across the cluster.
>> However, it
>> >>> is
>> >>> >> > > > cumbersome to have to explicitly pass some "context"
>> containing
>> >>> these
>> >>> >> > > > accumulators around everywhere that might need them. I can
>> use
>> >>> Scala
>> >>> >> > > > implicits, which help slightly, but I'd still need to modify
>> >>> every
>> >>> >> > method
>> >>> >> > > > in the call stack to take an implicit variable.
>> >>> >> > > >
>> >>> >> > > > So, I'd like to propose that we add the ability to have
>> "dynamic
>> >>> >> > > variables"
>> >>> >> > > > (basically thread-local variables) to Spark. This would
>> avoid
>> >>> having
>> >>> >> to
>> >>> >> > > > pass the Accumulators around explicitly.
>> >>> >> > > >
>> >>> >> > > > My proposed approach is to add a method to the SparkContext
>> >>> class as
>> >>> >> > > > follows:
>> >>> >> > > >
>> >>> >> > > > /**
>> >>> >> > > > * Sets the value of a "dynamic variable". This value is made
>> >>> >> available
>> >>> >> > > to
>> >>> >> > > > jobs
>> >>> >> > > > * without having to be passed around explicitly. During
>> >>> execution
>> >>> >> of a
>> >>> >> > > > Spark job
>> >>> >> > > > * this value can be obtained from the [[SparkDynamic]]
>> object.
>> >>> >> > > > */
>> >>> >> > > > def setDynamicVariableValue(value: Any)
>> >>> >> > > >
>> >>> >> > > > Then, when a job is executing the SparkDynamic can be
>> accessed
>> >>> to
>> >>> >> > obtain
>> >>> >> > > > the value of the dynamic variable. The implementation of
>> this
>> >>> object
>> >>> >> is
>> >>> >> > > as
>> >>> >> > > > follows:
>> >>> >> > > >
>> >>> >> > > > object SparkDynamic {
>> >>> >> > > > private val dynamicVariable = new DynamicVariable[Any]()
>> >>> >> > > > /**
>> >>> >> > > > * Gets the value of the "dynamic variable" that has been set
>> in
>> >>> >> the
>> >>> >> > > > [[SparkContext]]
>> >>> >> > > > */
>> >>> >> > > > def getValue: Option[Any] = {
>> >>> >> > > > Option(dynamicVariable.value)
>> >>> >> > > > }
>> >>> >> > > > private[spark] def withValue[S](threadValue:
>> Option[Any])(thunk:
>> >>> =>
>> >>> >> > > S): S
>> >>> >> > > > = {
>> >>> >> > > > dynamicVariable.withValue(threadValue.orNull)(thunk)
>> >>> >> > > > }
>> >>> >> > > > }
>> >>> >> > > >
>> >>> >> > > > The change involves modifying the Task object to serialize
>> the
>> >>> value
>> >>> >> of
>> >>> >> > > the
>> >>> >> > > > dynamic variable, and modifying the TaskRunner class to
>> >>> deserialize
>> >>> >> the
>> >>> >> > > > value and make it available in the thread that is running
>> the
>> >>> task
>> >>> >> > (using
>> >>> >> > > > the SparkDynamic.withValue method).
>> >>> >> > > >
>> >>> >> > > > I have done a quick prototype of this in this commit:
>> >>> >> > > >
>> >>> >> > > >
>> >>> >> > >
>> >>> >> >
>> >>> >>
>> >>>
>> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
>> >>> >> > > > and it seems to work fine in my (limited) testing. It needs
>> more
>> >>> >> > testing,
>> >>> >> > > > tidy-up and documentation though.
>> >>> >> > > >
>> >>> >> > > > One drawback is that the dynamic variable will be serialized
>> for
>> >>> >> every
>> >>> >> > > Task
>> >>> >> > > > whether it needs it or not. For my use case this might not
>> be
>> >>> too
>> >>> >> much
>> >>> >> > > of a
>> >>> >> > > > problem, as serializing and deserializing Accumulators looks
>> >>> fairly
>> >>> >> > > > lightweight -- however we should certainly warn users
>> against
>> >>> >> setting a
>> >>> >> > > > dynamic variable containing lots of data. I thought about
>> using
>> >>> >> > broadcast
>> >>> >> > > > tables here, but I don't think it's possible to put
>> Accumulators
>> >>> in a
>> >>> >> > > > broadcast table (as I understand it, they're intended for
>> purely
>> >>> >> > > read-only
>> >>> >> > > > data).
>> >>> >> > > >
>> >>> >> > > > What do people think about this proposal? My use case aside,
>> it
>> >>> seems
>> >>> >> > > like
>> >>> >> > > > it would be a generally useful enhancment to be able to pass
>> >>> certain
>> >>> >> > data
>> >>> >> > > > around without having to explicitly pass it everywhere.
>> >>> >> > > >
>> >>> >> > > > Neil
>> >>> >> > > >
>> >>> >> > >
>> >>> >> >
>> >>> >>
>> >>>
>> >>
>>
>
>

Re: "Dynamic variables" in Spark

Posted by Neil Ferguson <nf...@gmail.com>.
That would work well for me! Do you think it would be necessary to specify which accumulators should be available in the registry, or would we just broadcast all named accumulators registered in SparkContext and make them available in the registry?




Anyway, I'm happy to make the necessary changes (unless someone else wants to).

On Thu, Jul 24, 2014 at 10:17 PM, Patrick Wendell <pw...@gmail.com>
wrote:

> What if we have a registry for accumulators, where you can access them
> statically by name?
> - Patrick
> On Thu, Jul 24, 2014 at 1:51 PM, Neil Ferguson <nf...@gmail.com> wrote:
>> I realised that my last reply wasn't very clear -- let me try and clarify.
>>
>> The patch for named accumulators looks very useful, however in Shivaram's
>> example he was able to retrieve the named task metrics (statically) from a
>> TaskMetrics object, as follows:
>>
>> TaskMetrics.get("f1-time")
>>
>> However, I don't think this would be possible with the named accumulators
>> -- I believe they'd need to be passed to every function that needs them,
>> which I think would be cumbersome in any application of reasonable
>> complexity.
>>
>> This is what I was trying to solve with my proposal for dynamic variables
>> in Spark. However, the ability to retrieve named accumulators from a
>> thread-local would work just as well for my use case. I'd be happy to
>> implement either solution if there's interest.
>>
>> Alternatively, if I'm missing some other way to accomplish this please let
>> me know.
>>
>> On a (slight) aside, I now think it would be possible to implement dynamic
>> variables by broadcasting them. I was looking at Reynold's PR [1] to
>> broadcast the RDD object, and I think it would be possible to take a
>> similar approach -- that is, broadcast the serialized form, and deserialize
>> when executing each task.
>>
>> [1] https://github.com/apache/spark/pull/1498
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Jul 23, 2014 at 8:30 AM, Neil Ferguson <nf...@gmail.com> wrote:
>>
>>>  Hi Patrick.
>>>
>>> That looks very useful. The thing that seems to be missing from Shivaram's
>>> example is the ability to access TaskMetrics statically (this is the same
>>> problem that I am trying to solve with dynamic variables).
>>>
>>>
>>>
>>> You mention defining an accumulator on the RDD. Perhaps I am missing
>>> something here, but my understanding was that accumulators are defined in
>>> SparkContext and are not part of the RDD. Is that correct?
>>>
>>> Neil
>>>
>>> On Tue, Jul 22, 2014 at 22:21, Patrick Wendell <pwendell@gmail.com
>>> ="mailto:pwendell@gmail.com">> wrote:
>>>
>>>> Shivaram,
>>>>
>>>> You should take a look at this patch which adds support for naming
>>>> accumulators - this is likely to get merged in soon. I actually
>>>> started this patch by supporting named TaskMetrics similar to what you
>>>> have there, but then I realized there is too much semantic overlap
>>>> with accumulators, so I just went that route.
>>>>
>>>> For instance, it would be nice if any user-defined metrics are
>>>> accessible at the driver program.
>>>>
>>>> https://github.com/apache/spark/pull/1309
>>>>
>>>> In your example, you could just define an accumulator here on the RDD
>>>> and you'd see the incremental update in the web UI automatically.
>>>>
>>>> - Patrick
>>>>
>>>> On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman
>>>> <sh...@eecs.berkeley.edu> wrote:
>>>> > From reading Neil's first e-mail, I think the motivation is to get some
>>>> > metrics in ADAM ? -- I've run into a similar use-case with having
>>>> > user-defined metrics in long-running tasks and I think a nice way to
>>>> solve
>>>> > this would be to have user-defined TaskMetrics.
>>>> >
>>>> > To state my problem more clearly, lets say you have two functions you
>>>> use
>>>> > in a map call and want to measure how much time each of them takes. For
>>>> > example, if you have a code block like the one below and you want to
>>>> > measure how much time f1 takes as a fraction of the task.
>>>> >
>>>> > a.map { l =>
>>>> > val f = f1(l)
>>>> > ... some work here ...
>>>> > }
>>>> >
>>>> > It would be really cool if we could do something like
>>>> >
>>>> > a.map { l =>
>>>> > val start = System.nanoTime
>>>> > val f = f1(l)
>>>> > TaskMetrics.get("f1-time").add(System.nanoTime - start)
>>>> > }
>>>> >
>>>> > These task metrics have a different purpose from Accumulators in the
>>>> sense
>>>> > that we don't need to track lineage, perform commutative operations
>>>> etc.
>>>> > Further we also have a bunch of code in place to aggregate task metrics
>>>> > across a stage etc. So it would be great if we could also populate
>>>> these in
>>>> > the UI and show median/max etc.
>>>> > I think counters [1] in Hadoop served a similar purpose.
>>>> >
>>>> > Thanks
>>>> > Shivaram
>>>> >
>>>> > [1]
>>>> >
>>>> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters
>>>> >
>>>> >
>>>> >
>>>> > On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nf...@gmail.com>
>>>> wrote:
>>>> >
>>>> >> Hi Reynold
>>>> >>
>>>> >> Thanks for your reply.
>>>> >>
>>>> >> Accumulators are, of course, stored in the Accumulators object as
>>>> >> thread-local variables. However, the Accumulators object isn't public,
>>>> so
>>>> >> when a Task is executing there's no way to get the set of accumulators
>>>> for
>>>> >> the current thread -- accumulators still have to be passed to every
>>>> method
>>>> >> that needs them.
>>>> >>
>>>> >> Additionally, unless an accumulator is explicitly referenced it won't
>>>> be
>>>> >> serialized as part of a Task, and won't make it into the Accumulators
>>>> >> object in the first place.
>>>> >>
>>>> >> I should also note that what I'm proposing is not specific to
>>>> Accumulators
>>>> >> -- I am proposing that any data can be stored in a thread-local
>>>> variable. I
>>>> >> think there are probably many other use cases other than my one.
>>>> >>
>>>> >> Neil
>>>> >>
>>>> >>
>>>> >> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <rx...@databricks.com>
>>>> wrote:
>>>> >>
>>>> >> > Thanks for the thoughtful email, Neil and Christopher.
>>>> >> >
>>>> >> > If I understand this correctly, it seems like the dynamic variable
>>>> is
>>>> >> just
>>>> >> > a variant of the accumulator (a static one since it is a global
>>>> object).
>>>> >> > Accumulators are already implemented using thread-local variables
>>>> under
>>>> >> the
>>>> >> > hood. Am I misunderstanding something?
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <ct...@adatao.com>
>>>>
>>>> >> > wrote:
>>>> >> >
>>>> >> > > Hi Neil, first off, I'm generally a sympathetic advocate for
>>>> making
>>>> >> > changes
>>>> >> > > to Spark internals to make it easier/better/faster/more awesome.
>>>> >> > >
>>>> >> > > In this case, I'm (a) not clear about what you're trying to
>>>> accomplish,
>>>> >> > and
>>>> >> > > (b) a bit worried about the proposed solution.
>>>> >> > >
>>>> >> > > On (a): it is stated that you want to pass some Accumulators
>>>> around.
>>>> >> Yet
>>>> >> > > the proposed solution is for some "shared" variable that may be
>>>> set and
>>>> >> > > "mapped out" and possibly "reduced back", but without any
>>>> accompanying
>>>> >> > > accumulation semantics. And yet it doesn't seem like you only want
>>>> just
>>>> >> > the
>>>> >> > > broadcast property. Can you clarify the problem statement with
>>>> some
>>>> >> > > before/after client code examples?
>>>> >> > >
>>>> >> > > On (b): you're right that adding variables to SparkContext should
>>>> be
>>>> >> done
>>>> >> > > with caution, as it may have unintended consequences beyond just
>>>> serdes
>>>> >> > > payload size. For example, there is a stated intention of
>>>> supporting
>>>> >> > > multiple SparkContexts in the future, and this proposed solution
>>>> can
>>>> >> make
>>>> >> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching
>>>> call to
>>>> >> > make
>>>> >> > > a while back on a subject related to this (see
>>>> >> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a
>>>> >> single
>>>> >> > > SparkContext application, there may be multiple "clients" (of that
>>>> >> > > application) whose intent to use the proposed "SparkDynamic" would
>>>> not
>>>> >> > > necessarily be coordinated.
>>>> >> > >
>>>> >> > > So, considering a ratio of a/b (benefit/cost), it's not clear to
>>>> me
>>>> >> that
>>>> >> > > the benefits are significant enough to warrant the costs. Do I
>>>> >> > > misunderstand that the benefit is to save one explicit parameter
>>>> (the
>>>> >> > > "context") in the signature/closure code?
>>>> >> > >
>>>> >> > > --
>>>> >> > > Christopher T. Nguyen
>>>> >> > > Co-founder & CEO, Adatao <http://adatao.com>
>>>> >> > > linkedin.com/in/ctnguyen
>>>> >> > >
>>>> >> > >
>>>> >> > >
>>>> >> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <
>>>> nferguson@gmail.com>
>>>> >> > > wrote:
>>>> >> > >
>>>> >> > > > Hi all
>>>> >> > > >
>>>> >> > > > I have been adding some metrics to the ADAM project
>>>> >> > > > https://github.com/bigdatagenomics/adam, which runs on Spark,
>>>> and
>>>> >> > have a
>>>> >> > > > proposal for an enhancement to Spark that would make this work
>>>> >> cleaner
>>>> >> > > and
>>>> >> > > > easier.
>>>> >> > > >
>>>> >> > > > I need to pass some Accumulators around, which will aggregate
>>>> metrics
>>>> >> > > > (timing stats and other metrics) across the cluster. However, it
>>>> is
>>>> >> > > > cumbersome to have to explicitly pass some "context" containing
>>>> these
>>>> >> > > > accumulators around everywhere that might need them. I can use
>>>> Scala
>>>> >> > > > implicits, which help slightly, but I'd still need to modify
>>>> every
>>>> >> > method
>>>> >> > > > in the call stack to take an implicit variable.
>>>> >> > > >
>>>> >> > > > So, I'd like to propose that we add the ability to have "dynamic
>>>> >> > > variables"
>>>> >> > > > (basically thread-local variables) to Spark. This would avoid
>>>> having
>>>> >> to
>>>> >> > > > pass the Accumulators around explicitly.
>>>> >> > > >
>>>> >> > > > My proposed approach is to add a method to the SparkContext
>>>> class as
>>>> >> > > > follows:
>>>> >> > > >
>>>> >> > > > /**
>>>> >> > > > * Sets the value of a "dynamic variable". This value is made
>>>> >> available
>>>> >> > > to
>>>> >> > > > jobs
>>>> >> > > > * without having to be passed around explicitly. During
>>>> execution
>>>> >> of a
>>>> >> > > > Spark job
>>>> >> > > > * this value can be obtained from the [[SparkDynamic]] object.
>>>> >> > > > */
>>>> >> > > > def setDynamicVariableValue(value: Any)
>>>> >> > > >
>>>> >> > > > Then, when a job is executing the SparkDynamic can be accessed
>>>> to
>>>> >> > obtain
>>>> >> > > > the value of the dynamic variable. The implementation of this
>>>> object
>>>> >> is
>>>> >> > > as
>>>> >> > > > follows:
>>>> >> > > >
>>>> >> > > > object SparkDynamic {
>>>> >> > > > private val dynamicVariable = new DynamicVariable[Any]()
>>>> >> > > > /**
>>>> >> > > > * Gets the value of the "dynamic variable" that has been set in
>>>> >> the
>>>> >> > > > [[SparkContext]]
>>>> >> > > > */
>>>> >> > > > def getValue: Option[Any] = {
>>>> >> > > > Option(dynamicVariable.value)
>>>> >> > > > }
>>>> >> > > > private[spark] def withValue[S](threadValue: Option[Any])(thunk:
>>>> =>
>>>> >> > > S): S
>>>> >> > > > = {
>>>> >> > > > dynamicVariable.withValue(threadValue.orNull)(thunk)
>>>> >> > > > }
>>>> >> > > > }
>>>> >> > > >
>>>> >> > > > The change involves modifying the Task object to serialize the
>>>> value
>>>> >> of
>>>> >> > > the
>>>> >> > > > dynamic variable, and modifying the TaskRunner class to
>>>> deserialize
>>>> >> the
>>>> >> > > > value and make it available in the thread that is running the
>>>> task
>>>> >> > (using
>>>> >> > > > the SparkDynamic.withValue method).
>>>> >> > > >
>>>> >> > > > I have done a quick prototype of this in this commit:
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
>>>> >> > > > and it seems to work fine in my (limited) testing. It needs more
>>>> >> > testing,
>>>> >> > > > tidy-up and documentation though.
>>>> >> > > >
>>>> >> > > > One drawback is that the dynamic variable will be serialized for
>>>> >> every
>>>> >> > > Task
>>>> >> > > > whether it needs it or not. For my use case this might not be
>>>> too
>>>> >> much
>>>> >> > > of a
>>>> >> > > > problem, as serializing and deserializing Accumulators looks
>>>> fairly
>>>> >> > > > lightweight -- however we should certainly warn users against
>>>> >> setting a
>>>> >> > > > dynamic variable containing lots of data. I thought about using
>>>> >> > broadcast
>>>> >> > > > tables here, but I don't think it's possible to put Accumulators
>>>> in a
>>>> >> > > > broadcast table (as I understand it, they're intended for purely
>>>> >> > > read-only
>>>> >> > > > data).
>>>> >> > > >
>>>> >> > > > What do people think about this proposal? My use case aside, it
>>>> seems
>>>> >> > > like
>>>> >> > > > it would be a generally useful enhancment to be able to pass
>>>> certain
>>>> >> > data
>>>> >> > > > around without having to explicitly pass it everywhere.
>>>> >> > > >
>>>> >> > > > Neil
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>>
>>>

Re: "Dynamic variables" in Spark

Posted by Patrick Wendell <pw...@gmail.com>.
What if we have a registry for accumulators, where you can access them
statically by name?

- Patrick

On Thu, Jul 24, 2014 at 1:51 PM, Neil Ferguson <nf...@gmail.com> wrote:
> I realised that my last reply wasn't very clear -- let me try and clarify.
>
> The patch for named accumulators looks very useful, however in Shivaram's
> example he was able to retrieve the named task metrics (statically) from a
> TaskMetrics object, as follows:
>
> TaskMetrics.get("f1-time")
>
> However, I don't think this would be possible with the named accumulators
> -- I believe they'd need to be passed to every function that needs them,
> which I think would be cumbersome in any application of reasonable
> complexity.
>
> This is what I was trying to solve with my proposal for dynamic variables
> in Spark. However, the ability to retrieve named accumulators from a
> thread-local would work just as well for my use case. I'd be happy to
> implement either solution if there's interest.
>
> Alternatively, if I'm missing some other way to accomplish this please let
> me know.
>
> On a (slight) aside, I now think it would be possible to implement dynamic
> variables by broadcasting them. I was looking at Reynold's PR [1] to
> broadcast the RDD object, and I think it would be possible to take a
> similar approach -- that is, broadcast the serialized form, and deserialize
> when executing each task.
>
> [1] https://github.com/apache/spark/pull/1498
>
>
>
>
>
>
>
> On Wed, Jul 23, 2014 at 8:30 AM, Neil Ferguson <nf...@gmail.com> wrote:
>
>>  Hi Patrick.
>>
>> That looks very useful. The thing that seems to be missing from Shivaram's
>> example is the ability to access TaskMetrics statically (this is the same
>> problem that I am trying to solve with dynamic variables).
>>
>>
>>
>> You mention defining an accumulator on the RDD. Perhaps I am missing
>> something here, but my understanding was that accumulators are defined in
>> SparkContext and are not part of the RDD. Is that correct?
>>
>> Neil
>>
>> On Tue, Jul 22, 2014 at 22:21, Patrick Wendell <pwendell@gmail.com
>> ="mailto:pwendell@gmail.com">> wrote:
>>
>>> Shivaram,
>>>
>>> You should take a look at this patch which adds support for naming
>>> accumulators - this is likely to get merged in soon. I actually
>>> started this patch by supporting named TaskMetrics similar to what you
>>> have there, but then I realized there is too much semantic overlap
>>> with accumulators, so I just went that route.
>>>
>>> For instance, it would be nice if any user-defined metrics are
>>> accessible at the driver program.
>>>
>>> https://github.com/apache/spark/pull/1309
>>>
>>> In your example, you could just define an accumulator here on the RDD
>>> and you'd see the incremental update in the web UI automatically.
>>>
>>> - Patrick
>>>
>>> On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman
>>> <sh...@eecs.berkeley.edu> wrote:
>>> > From reading Neil's first e-mail, I think the motivation is to get some
>>> > metrics in ADAM ? -- I've run into a similar use-case with having
>>> > user-defined metrics in long-running tasks and I think a nice way to
>>> solve
>>> > this would be to have user-defined TaskMetrics.
>>> >
>>> > To state my problem more clearly, lets say you have two functions you
>>> use
>>> > in a map call and want to measure how much time each of them takes. For
>>> > example, if you have a code block like the one below and you want to
>>> > measure how much time f1 takes as a fraction of the task.
>>> >
>>> > a.map { l =>
>>> > val f = f1(l)
>>> > ... some work here ...
>>> > }
>>> >
>>> > It would be really cool if we could do something like
>>> >
>>> > a.map { l =>
>>> > val start = System.nanoTime
>>> > val f = f1(l)
>>> > TaskMetrics.get("f1-time").add(System.nanoTime - start)
>>> > }
>>> >
>>> > These task metrics have a different purpose from Accumulators in the
>>> sense
>>> > that we don't need to track lineage, perform commutative operations
>>> etc.
>>> > Further we also have a bunch of code in place to aggregate task metrics
>>> > across a stage etc. So it would be great if we could also populate
>>> these in
>>> > the UI and show median/max etc.
>>> > I think counters [1] in Hadoop served a similar purpose.
>>> >
>>> > Thanks
>>> > Shivaram
>>> >
>>> > [1]
>>> >
>>> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters
>>> >
>>> >
>>> >
>>> > On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nf...@gmail.com>
>>> wrote:
>>> >
>>> >> Hi Reynold
>>> >>
>>> >> Thanks for your reply.
>>> >>
>>> >> Accumulators are, of course, stored in the Accumulators object as
>>> >> thread-local variables. However, the Accumulators object isn't public,
>>> so
>>> >> when a Task is executing there's no way to get the set of accumulators
>>> for
>>> >> the current thread -- accumulators still have to be passed to every
>>> method
>>> >> that needs them.
>>> >>
>>> >> Additionally, unless an accumulator is explicitly referenced it won't
>>> be
>>> >> serialized as part of a Task, and won't make it into the Accumulators
>>> >> object in the first place.
>>> >>
>>> >> I should also note that what I'm proposing is not specific to
>>> Accumulators
>>> >> -- I am proposing that any data can be stored in a thread-local
>>> variable. I
>>> >> think there are probably many other use cases other than my one.
>>> >>
>>> >> Neil
>>> >>
>>> >>
>>> >> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>> >>
>>> >> > Thanks for the thoughtful email, Neil and Christopher.
>>> >> >
>>> >> > If I understand this correctly, it seems like the dynamic variable
>>> is
>>> >> just
>>> >> > a variant of the accumulator (a static one since it is a global
>>> object).
>>> >> > Accumulators are already implemented using thread-local variables
>>> under
>>> >> the
>>> >> > hood. Am I misunderstanding something?
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <ct...@adatao.com>
>>>
>>> >> > wrote:
>>> >> >
>>> >> > > Hi Neil, first off, I'm generally a sympathetic advocate for
>>> making
>>> >> > changes
>>> >> > > to Spark internals to make it easier/better/faster/more awesome.
>>> >> > >
>>> >> > > In this case, I'm (a) not clear about what you're trying to
>>> accomplish,
>>> >> > and
>>> >> > > (b) a bit worried about the proposed solution.
>>> >> > >
>>> >> > > On (a): it is stated that you want to pass some Accumulators
>>> around.
>>> >> Yet
>>> >> > > the proposed solution is for some "shared" variable that may be
>>> set and
>>> >> > > "mapped out" and possibly "reduced back", but without any
>>> accompanying
>>> >> > > accumulation semantics. And yet it doesn't seem like you only want
>>> just
>>> >> > the
>>> >> > > broadcast property. Can you clarify the problem statement with
>>> some
>>> >> > > before/after client code examples?
>>> >> > >
>>> >> > > On (b): you're right that adding variables to SparkContext should
>>> be
>>> >> done
>>> >> > > with caution, as it may have unintended consequences beyond just
>>> serdes
>>> >> > > payload size. For example, there is a stated intention of
>>> supporting
>>> >> > > multiple SparkContexts in the future, and this proposed solution
>>> can
>>> >> make
>>> >> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching
>>> call to
>>> >> > make
>>> >> > > a while back on a subject related to this (see
>>> >> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a
>>> >> single
>>> >> > > SparkContext application, there may be multiple "clients" (of that
>>> >> > > application) whose intent to use the proposed "SparkDynamic" would
>>> not
>>> >> > > necessarily be coordinated.
>>> >> > >
>>> >> > > So, considering a ratio of a/b (benefit/cost), it's not clear to
>>> me
>>> >> that
>>> >> > > the benefits are significant enough to warrant the costs. Do I
>>> >> > > misunderstand that the benefit is to save one explicit parameter
>>> (the
>>> >> > > "context") in the signature/closure code?
>>> >> > >
>>> >> > > --
>>> >> > > Christopher T. Nguyen
>>> >> > > Co-founder & CEO, Adatao <http://adatao.com>
>>> >> > > linkedin.com/in/ctnguyen
>>> >> > >
>>> >> > >
>>> >> > >
>>> >> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <
>>> nferguson@gmail.com>
>>> >> > > wrote:
>>> >> > >
>>> >> > > > Hi all
>>> >> > > >
>>> >> > > > I have been adding some metrics to the ADAM project
>>> >> > > > https://github.com/bigdatagenomics/adam, which runs on Spark,
>>> and
>>> >> > have a
>>> >> > > > proposal for an enhancement to Spark that would make this work
>>> >> cleaner
>>> >> > > and
>>> >> > > > easier.
>>> >> > > >
>>> >> > > > I need to pass some Accumulators around, which will aggregate
>>> metrics
>>> >> > > > (timing stats and other metrics) across the cluster. However, it
>>> is
>>> >> > > > cumbersome to have to explicitly pass some "context" containing
>>> these
>>> >> > > > accumulators around everywhere that might need them. I can use
>>> Scala
>>> >> > > > implicits, which help slightly, but I'd still need to modify
>>> every
>>> >> > method
>>> >> > > > in the call stack to take an implicit variable.
>>> >> > > >
>>> >> > > > So, I'd like to propose that we add the ability to have "dynamic
>>> >> > > variables"
>>> >> > > > (basically thread-local variables) to Spark. This would avoid
>>> having
>>> >> to
>>> >> > > > pass the Accumulators around explicitly.
>>> >> > > >
>>> >> > > > My proposed approach is to add a method to the SparkContext
>>> class as
>>> >> > > > follows:
>>> >> > > >
>>> >> > > > /**
>>> >> > > > * Sets the value of a "dynamic variable". This value is made
>>> >> available
>>> >> > > to
>>> >> > > > jobs
>>> >> > > > * without having to be passed around explicitly. During
>>> execution
>>> >> of a
>>> >> > > > Spark job
>>> >> > > > * this value can be obtained from the [[SparkDynamic]] object.
>>> >> > > > */
>>> >> > > > def setDynamicVariableValue(value: Any)
>>> >> > > >
>>> >> > > > Then, when a job is executing the SparkDynamic can be accessed
>>> to
>>> >> > obtain
>>> >> > > > the value of the dynamic variable. The implementation of this
>>> object
>>> >> is
>>> >> > > as
>>> >> > > > follows:
>>> >> > > >
>>> >> > > > object SparkDynamic {
>>> >> > > > private val dynamicVariable = new DynamicVariable[Any]()
>>> >> > > > /**
>>> >> > > > * Gets the value of the "dynamic variable" that has been set in
>>> >> the
>>> >> > > > [[SparkContext]]
>>> >> > > > */
>>> >> > > > def getValue: Option[Any] = {
>>> >> > > > Option(dynamicVariable.value)
>>> >> > > > }
>>> >> > > > private[spark] def withValue[S](threadValue: Option[Any])(thunk:
>>> =>
>>> >> > > S): S
>>> >> > > > = {
>>> >> > > > dynamicVariable.withValue(threadValue.orNull)(thunk)
>>> >> > > > }
>>> >> > > > }
>>> >> > > >
>>> >> > > > The change involves modifying the Task object to serialize the
>>> value
>>> >> of
>>> >> > > the
>>> >> > > > dynamic variable, and modifying the TaskRunner class to
>>> deserialize
>>> >> the
>>> >> > > > value and make it available in the thread that is running the
>>> task
>>> >> > (using
>>> >> > > > the SparkDynamic.withValue method).
>>> >> > > >
>>> >> > > > I have done a quick prototype of this in this commit:
>>> >> > > >
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
>>> >> > > > and it seems to work fine in my (limited) testing. It needs more
>>> >> > testing,
>>> >> > > > tidy-up and documentation though.
>>> >> > > >
>>> >> > > > One drawback is that the dynamic variable will be serialized for
>>> >> every
>>> >> > > Task
>>> >> > > > whether it needs it or not. For my use case this might not be
>>> too
>>> >> much
>>> >> > > of a
>>> >> > > > problem, as serializing and deserializing Accumulators looks
>>> fairly
>>> >> > > > lightweight -- however we should certainly warn users against
>>> >> setting a
>>> >> > > > dynamic variable containing lots of data. I thought about using
>>> >> > broadcast
>>> >> > > > tables here, but I don't think it's possible to put Accumulators
>>> in a
>>> >> > > > broadcast table (as I understand it, they're intended for purely
>>> >> > > read-only
>>> >> > > > data).
>>> >> > > >
>>> >> > > > What do people think about this proposal? My use case aside, it
>>> seems
>>> >> > > like
>>> >> > > > it would be a generally useful enhancment to be able to pass
>>> certain
>>> >> > data
>>> >> > > > around without having to explicitly pass it everywhere.
>>> >> > > >
>>> >> > > > Neil
>>> >> > > >
>>> >> > >
>>> >> >
>>> >>
>>>
>>

Re: "Dynamic variables" in Spark

Posted by Neil Ferguson <nf...@gmail.com>.
I realised that my last reply wasn't very clear -- let me try and clarify.

The patch for named accumulators looks very useful, however in Shivaram's
example he was able to retrieve the named task metrics (statically) from a
TaskMetrics object, as follows:

TaskMetrics.get("f1-time")

However, I don't think this would be possible with the named accumulators
-- I believe they'd need to be passed to every function that needs them,
which I think would be cumbersome in any application of reasonable
complexity.

This is what I was trying to solve with my proposal for dynamic variables
in Spark. However, the ability to retrieve named accumulators from a
thread-local would work just as well for my use case. I'd be happy to
implement either solution if there's interest.

Alternatively, if I'm missing some other way to accomplish this please let
me know.

On a (slight) aside, I now think it would be possible to implement dynamic
variables by broadcasting them. I was looking at Reynold's PR [1] to
broadcast the RDD object, and I think it would be possible to take a
similar approach -- that is, broadcast the serialized form, and deserialize
when executing each task.

[1] https://github.com/apache/spark/pull/1498







On Wed, Jul 23, 2014 at 8:30 AM, Neil Ferguson <nf...@gmail.com> wrote:

>  Hi Patrick.
>
> That looks very useful. The thing that seems to be missing from Shivaram's
> example is the ability to access TaskMetrics statically (this is the same
> problem that I am trying to solve with dynamic variables).
>
>
>
> You mention defining an accumulator on the RDD. Perhaps I am missing
> something here, but my understanding was that accumulators are defined in
> SparkContext and are not part of the RDD. Is that correct?
>
> Neil
>
> On Tue, Jul 22, 2014 at 22:21, Patrick Wendell <pwendell@gmail.com
> ="mailto:pwendell@gmail.com">> wrote:
>
>> Shivaram,
>>
>> You should take a look at this patch which adds support for naming
>> accumulators - this is likely to get merged in soon. I actually
>> started this patch by supporting named TaskMetrics similar to what you
>> have there, but then I realized there is too much semantic overlap
>> with accumulators, so I just went that route.
>>
>> For instance, it would be nice if any user-defined metrics are
>> accessible at the driver program.
>>
>> https://github.com/apache/spark/pull/1309
>>
>> In your example, you could just define an accumulator here on the RDD
>> and you'd see the incremental update in the web UI automatically.
>>
>> - Patrick
>>
>> On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman
>> <sh...@eecs.berkeley.edu> wrote:
>> > From reading Neil's first e-mail, I think the motivation is to get some
>> > metrics in ADAM ? -- I've run into a similar use-case with having
>> > user-defined metrics in long-running tasks and I think a nice way to
>> solve
>> > this would be to have user-defined TaskMetrics.
>> >
>> > To state my problem more clearly, lets say you have two functions you
>> use
>> > in a map call and want to measure how much time each of them takes. For
>> > example, if you have a code block like the one below and you want to
>> > measure how much time f1 takes as a fraction of the task.
>> >
>> > a.map { l =>
>> > val f = f1(l)
>> > ... some work here ...
>> > }
>> >
>> > It would be really cool if we could do something like
>> >
>> > a.map { l =>
>> > val start = System.nanoTime
>> > val f = f1(l)
>> > TaskMetrics.get("f1-time").add(System.nanoTime - start)
>> > }
>> >
>> > These task metrics have a different purpose from Accumulators in the
>> sense
>> > that we don't need to track lineage, perform commutative operations
>> etc.
>> > Further we also have a bunch of code in place to aggregate task metrics
>> > across a stage etc. So it would be great if we could also populate
>> these in
>> > the UI and show median/max etc.
>> > I think counters [1] in Hadoop served a similar purpose.
>> >
>> > Thanks
>> > Shivaram
>> >
>> > [1]
>> >
>> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters
>> >
>> >
>> >
>> > On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nf...@gmail.com>
>> wrote:
>> >
>> >> Hi Reynold
>> >>
>> >> Thanks for your reply.
>> >>
>> >> Accumulators are, of course, stored in the Accumulators object as
>> >> thread-local variables. However, the Accumulators object isn't public,
>> so
>> >> when a Task is executing there's no way to get the set of accumulators
>> for
>> >> the current thread -- accumulators still have to be passed to every
>> method
>> >> that needs them.
>> >>
>> >> Additionally, unless an accumulator is explicitly referenced it won't
>> be
>> >> serialized as part of a Task, and won't make it into the Accumulators
>> >> object in the first place.
>> >>
>> >> I should also note that what I'm proposing is not specific to
>> Accumulators
>> >> -- I am proposing that any data can be stored in a thread-local
>> variable. I
>> >> think there are probably many other use cases other than my one.
>> >>
>> >> Neil
>> >>
>> >>
>> >> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <rx...@databricks.com>
>> wrote:
>> >>
>> >> > Thanks for the thoughtful email, Neil and Christopher.
>> >> >
>> >> > If I understand this correctly, it seems like the dynamic variable
>> is
>> >> just
>> >> > a variant of the accumulator (a static one since it is a global
>> object).
>> >> > Accumulators are already implemented using thread-local variables
>> under
>> >> the
>> >> > hood. Am I misunderstanding something?
>> >> >
>> >> >
>> >> >
>> >> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <ct...@adatao.com>
>>
>> >> > wrote:
>> >> >
>> >> > > Hi Neil, first off, I'm generally a sympathetic advocate for
>> making
>> >> > changes
>> >> > > to Spark internals to make it easier/better/faster/more awesome.
>> >> > >
>> >> > > In this case, I'm (a) not clear about what you're trying to
>> accomplish,
>> >> > and
>> >> > > (b) a bit worried about the proposed solution.
>> >> > >
>> >> > > On (a): it is stated that you want to pass some Accumulators
>> around.
>> >> Yet
>> >> > > the proposed solution is for some "shared" variable that may be
>> set and
>> >> > > "mapped out" and possibly "reduced back", but without any
>> accompanying
>> >> > > accumulation semantics. And yet it doesn't seem like you only want
>> just
>> >> > the
>> >> > > broadcast property. Can you clarify the problem statement with
>> some
>> >> > > before/after client code examples?
>> >> > >
>> >> > > On (b): you're right that adding variables to SparkContext should
>> be
>> >> done
>> >> > > with caution, as it may have unintended consequences beyond just
>> serdes
>> >> > > payload size. For example, there is a stated intention of
>> supporting
>> >> > > multiple SparkContexts in the future, and this proposed solution
>> can
>> >> make
>> >> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching
>> call to
>> >> > make
>> >> > > a while back on a subject related to this (see
>> >> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a
>> >> single
>> >> > > SparkContext application, there may be multiple "clients" (of that
>> >> > > application) whose intent to use the proposed "SparkDynamic" would
>> not
>> >> > > necessarily be coordinated.
>> >> > >
>> >> > > So, considering a ratio of a/b (benefit/cost), it's not clear to
>> me
>> >> that
>> >> > > the benefits are significant enough to warrant the costs. Do I
>> >> > > misunderstand that the benefit is to save one explicit parameter
>> (the
>> >> > > "context") in the signature/closure code?
>> >> > >
>> >> > > --
>> >> > > Christopher T. Nguyen
>> >> > > Co-founder & CEO, Adatao <http://adatao.com>
>> >> > > linkedin.com/in/ctnguyen
>> >> > >
>> >> > >
>> >> > >
>> >> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <
>> nferguson@gmail.com>
>> >> > > wrote:
>> >> > >
>> >> > > > Hi all
>> >> > > >
>> >> > > > I have been adding some metrics to the ADAM project
>> >> > > > https://github.com/bigdatagenomics/adam, which runs on Spark,
>> and
>> >> > have a
>> >> > > > proposal for an enhancement to Spark that would make this work
>> >> cleaner
>> >> > > and
>> >> > > > easier.
>> >> > > >
>> >> > > > I need to pass some Accumulators around, which will aggregate
>> metrics
>> >> > > > (timing stats and other metrics) across the cluster. However, it
>> is
>> >> > > > cumbersome to have to explicitly pass some "context" containing
>> these
>> >> > > > accumulators around everywhere that might need them. I can use
>> Scala
>> >> > > > implicits, which help slightly, but I'd still need to modify
>> every
>> >> > method
>> >> > > > in the call stack to take an implicit variable.
>> >> > > >
>> >> > > > So, I'd like to propose that we add the ability to have "dynamic
>> >> > > variables"
>> >> > > > (basically thread-local variables) to Spark. This would avoid
>> having
>> >> to
>> >> > > > pass the Accumulators around explicitly.
>> >> > > >
>> >> > > > My proposed approach is to add a method to the SparkContext
>> class as
>> >> > > > follows:
>> >> > > >
>> >> > > > /**
>> >> > > > * Sets the value of a "dynamic variable". This value is made
>> >> available
>> >> > > to
>> >> > > > jobs
>> >> > > > * without having to be passed around explicitly. During
>> execution
>> >> of a
>> >> > > > Spark job
>> >> > > > * this value can be obtained from the [[SparkDynamic]] object.
>> >> > > > */
>> >> > > > def setDynamicVariableValue(value: Any)
>> >> > > >
>> >> > > > Then, when a job is executing the SparkDynamic can be accessed
>> to
>> >> > obtain
>> >> > > > the value of the dynamic variable. The implementation of this
>> object
>> >> is
>> >> > > as
>> >> > > > follows:
>> >> > > >
>> >> > > > object SparkDynamic {
>> >> > > > private val dynamicVariable = new DynamicVariable[Any]()
>> >> > > > /**
>> >> > > > * Gets the value of the "dynamic variable" that has been set in
>> >> the
>> >> > > > [[SparkContext]]
>> >> > > > */
>> >> > > > def getValue: Option[Any] = {
>> >> > > > Option(dynamicVariable.value)
>> >> > > > }
>> >> > > > private[spark] def withValue[S](threadValue: Option[Any])(thunk:
>> =>
>> >> > > S): S
>> >> > > > = {
>> >> > > > dynamicVariable.withValue(threadValue.orNull)(thunk)
>> >> > > > }
>> >> > > > }
>> >> > > >
>> >> > > > The change involves modifying the Task object to serialize the
>> value
>> >> of
>> >> > > the
>> >> > > > dynamic variable, and modifying the TaskRunner class to
>> deserialize
>> >> the
>> >> > > > value and make it available in the thread that is running the
>> task
>> >> > (using
>> >> > > > the SparkDynamic.withValue method).
>> >> > > >
>> >> > > > I have done a quick prototype of this in this commit:
>> >> > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
>> >> > > > and it seems to work fine in my (limited) testing. It needs more
>> >> > testing,
>> >> > > > tidy-up and documentation though.
>> >> > > >
>> >> > > > One drawback is that the dynamic variable will be serialized for
>> >> every
>> >> > > Task
>> >> > > > whether it needs it or not. For my use case this might not be
>> too
>> >> much
>> >> > > of a
>> >> > > > problem, as serializing and deserializing Accumulators looks
>> fairly
>> >> > > > lightweight -- however we should certainly warn users against
>> >> setting a
>> >> > > > dynamic variable containing lots of data. I thought about using
>> >> > broadcast
>> >> > > > tables here, but I don't think it's possible to put Accumulators
>> in a
>> >> > > > broadcast table (as I understand it, they're intended for purely
>> >> > > read-only
>> >> > > > data).
>> >> > > >
>> >> > > > What do people think about this proposal? My use case aside, it
>> seems
>> >> > > like
>> >> > > > it would be a generally useful enhancment to be able to pass
>> certain
>> >> > data
>> >> > > > around without having to explicitly pass it everywhere.
>> >> > > >
>> >> > > > Neil
>> >> > > >
>> >> > >
>> >> >
>> >>
>>
>

Re: "Dynamic variables" in Spark

Posted by Neil Ferguson <nf...@gmail.com>.
Hi Patrick.




That looks very useful. The thing that seems to be missing from Shivaram's example is the ability to access TaskMetrics statically (this is the same problem that I am trying to solve with dynamic variables).






You mention defining an accumulator on the RDD. Perhaps I am missing something here, but my understanding was that accumulators are defined in SparkContext and are not part of the RDD. Is that correct?




Neil


On Tue, Jul 22, 2014 at 22:21, Patrick Wendell <pwendell@gmail.com="mailto:pwendell@gmail.com">> wrote:
Shivaram,


You should take a look at this patch which adds support for naming

accumulators - this is likely to get merged in soon. I actually

started this patch by supporting named TaskMetrics similar to what you

have there, but then I realized there is too much semantic overlap

with accumulators, so I just went that route.


For instance, it would be nice if any user-defined metrics are

accessible at the driver program.


https://github.com/apache/spark/pull/1309


In your example, you could just define an accumulator here on the RDD

and you'd see the incremental update in the web UI automatically.


- Patrick


On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman

<sh...@eecs.berkeley.edu> wrote:

> From reading Neil's first e-mail, I think the motivation is to get some

> metrics in ADAM ? --  I've run into a similar use-case with having

> user-defined metrics in long-running tasks and I think a nice way to solve

> this would be to have user-defined TaskMetrics.

>

> To state my problem more clearly, lets say you have two functions you use

> in a map call and want to measure how much time each of them takes. For

> example, if you have a code block like the one below and you want to

> measure how much time f1 takes as a fraction of the task.

>

> a.map { l =>

>    val f = f1(l)

>    ... some work here ...

> }

>

> It would be really cool if we could do something like

>

> a.map { l =>

>    val start = System.nanoTime

>    val f = f1(l)

>    TaskMetrics.get("f1-time").add(System.nanoTime - start)

> }

>

> These task metrics have a different purpose from Accumulators in the sense

> that we don't need to track lineage, perform commutative operations etc.

>  Further we also have a bunch of code in place to aggregate task metrics

> across a stage etc. So it would be great if we could also populate these in

> the UI and show median/max etc.

> I think counters [1] in Hadoop served a similar purpose.

>

> Thanks

> Shivaram

>

> [1]

> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters

>

>

>

> On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nf...@gmail.com> wrote:

>

>> Hi Reynold

>>

>> Thanks for your reply.

>>

>> Accumulators are, of course, stored in the Accumulators object as

>> thread-local variables. However, the Accumulators object isn't public, so

>> when a Task is executing there's no way to get the set of accumulators for

>> the current thread -- accumulators still have to be passed to every method

>> that needs them.

>>

>> Additionally, unless an accumulator is explicitly referenced it won't be

>> serialized as part of a Task, and won't make it into the Accumulators

>> object in the first place.

>>

>> I should also note that what I'm proposing is not specific to Accumulators

>> -- I am proposing that any data can be stored in a thread-local variable. I

>> think there are probably many other use cases other than my one.

>>

>> Neil

>>

>>

>> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <rx...@databricks.com> wrote:

>>

>> > Thanks for the thoughtful email, Neil and Christopher.

>> >

>> > If I understand this correctly, it seems like the dynamic variable is

>> just

>> > a variant of the accumulator (a static one since it is a global object).

>> > Accumulators are already implemented using thread-local variables under

>> the

>> > hood. Am I misunderstanding something?

>> >

>> >

>> >

>> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <ct...@adatao.com>

>> > wrote:

>> >

>> > > Hi Neil, first off, I'm generally a sympathetic advocate for making

>> > changes

>> > > to Spark internals to make it easier/better/faster/more awesome.

>> > >

>> > > In this case, I'm (a) not clear about what you're trying to accomplish,

>> > and

>> > > (b) a bit worried about the proposed solution.

>> > >

>> > > On (a): it is stated that you want to pass some Accumulators around.

>> Yet

>> > > the proposed solution is for some "shared" variable that may be set and

>> > > "mapped out" and possibly "reduced back", but without any accompanying

>> > > accumulation semantics. And yet it doesn't seem like you only want just

>> > the

>> > > broadcast property. Can you clarify the problem statement with some

>> > > before/after client code examples?

>> > >

>> > > On (b): you're right that adding variables to SparkContext should be

>> done

>> > > with caution, as it may have unintended consequences beyond just serdes

>> > > payload size. For example, there is a stated intention of supporting

>> > > multiple SparkContexts in the future, and this proposed solution can

>> make

>> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching call to

>> > make

>> > > a while back on a subject related to this (see

>> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a

>> single

>> > > SparkContext application, there may be multiple "clients" (of that

>> > > application) whose intent to use the proposed "SparkDynamic" would not

>> > > necessarily be coordinated.

>> > >

>> > > So, considering a ratio of a/b (benefit/cost), it's not clear to me

>> that

>> > > the benefits are significant enough to warrant the costs. Do I

>> > > misunderstand that the benefit is to save one explicit parameter (the

>> > > "context") in the signature/closure code?

>> > >

>> > > --

>> > > Christopher T. Nguyen

>> > > Co-founder & CEO, Adatao <http://adatao.com>

>> > > linkedin.com/in/ctnguyen

>> > >

>> > >

>> > >

>> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nf...@gmail.com>

>> > > wrote:

>> > >

>> > > > Hi all

>> > > >

>> > > > I have been adding some metrics to the ADAM project

>> > > > https://github.com/bigdatagenomics/adam, which runs on Spark, and

>> > have a

>> > > > proposal for an enhancement to Spark that would make this work

>> cleaner

>> > > and

>> > > > easier.

>> > > >

>> > > > I need to pass some Accumulators around, which will aggregate metrics

>> > > > (timing stats and other metrics) across the cluster. However, it is

>> > > > cumbersome to have to explicitly pass some "context" containing these

>> > > > accumulators around everywhere that might need them. I can use Scala

>> > > > implicits, which help slightly, but I'd still need to modify every

>> > method

>> > > > in the call stack to take an implicit variable.

>> > > >

>> > > > So, I'd like to propose that we add the ability to have "dynamic

>> > > variables"

>> > > > (basically thread-local variables) to Spark. This would avoid having

>> to

>> > > > pass the Accumulators around explicitly.

>> > > >

>> > > > My proposed approach is to add a method to the SparkContext class as

>> > > > follows:

>> > > >

>> > > > /**

>> > > >  * Sets the value of a "dynamic variable". This value is made

>> available

>> > > to

>> > > > jobs

>> > > >  * without having to be passed around explicitly. During execution

>> of a

>> > > > Spark job

>> > > >  * this value can be obtained from the [[SparkDynamic]] object.

>> > > >  */

>> > > > def setDynamicVariableValue(value: Any)

>> > > >

>> > > > Then, when a job is executing the SparkDynamic can be accessed to

>> > obtain

>> > > > the value of the dynamic variable. The implementation of this object

>> is

>> > > as

>> > > > follows:

>> > > >

>> > > > object SparkDynamic {

>> > > >   private val dynamicVariable = new DynamicVariable[Any]()

>> > > >   /**

>> > > >    * Gets the value of the "dynamic variable" that has been set in

>> the

>> > > > [[SparkContext]]

>> > > >    */

>> > > >   def getValue: Option[Any] = {

>> > > >     Option(dynamicVariable.value)

>> > > >   }

>> > > >   private[spark] def withValue[S](threadValue: Option[Any])(thunk: =>

>> > > S): S

>> > > > = {

>> > > >     dynamicVariable.withValue(threadValue.orNull)(thunk)

>> > > >   }

>> > > > }

>> > > >

>> > > > The change involves modifying the Task object to serialize the value

>> of

>> > > the

>> > > > dynamic variable, and modifying the TaskRunner class to deserialize

>> the

>> > > > value and make it available in the thread that is running the task

>> > (using

>> > > > the SparkDynamic.withValue method).

>> > > >

>> > > > I have done a quick prototype of this in this commit:

>> > > >

>> > > >

>> > >

>> >

>> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6

>> > > > and it seems to work fine in my (limited) testing. It needs more

>> > testing,

>> > > > tidy-up and documentation though.

>> > > >

>> > > > One drawback is that the dynamic variable will be serialized for

>> every

>> > > Task

>> > > > whether it needs it or not. For my use case this might not be too

>> much

>> > > of a

>> > > > problem, as serializing and deserializing Accumulators looks fairly

>> > > > lightweight -- however we should certainly warn users against

>> setting a

>> > > > dynamic variable containing lots of data. I thought about using

>> > broadcast

>> > > > tables here, but I don't think it's possible to put Accumulators in a

>> > > > broadcast table (as I understand it, they're intended for purely

>> > > read-only

>> > > > data).

>> > > >

>> > > > What do people think about this proposal? My use case aside, it seems

>> > > like

>> > > > it would be a generally useful enhancment to be able to pass certain

>> > data

>> > > > around without having to explicitly pass it everywhere.

>> > > >

>> > > > Neil

>> > > >

>> > >

>> >

>>

Re: "Dynamic variables" in Spark

Posted by Patrick Wendell <pw...@gmail.com>.
Shivaram,

You should take a look at this patch which adds support for naming
accumulators - this is likely to get merged in soon. I actually
started this patch by supporting named TaskMetrics similar to what you
have there, but then I realized there is too much semantic overlap
with accumulators, so I just went that route.

For instance, it would be nice if any user-defined metrics are
accessible at the driver program.

https://github.com/apache/spark/pull/1309

In your example, you could just define an accumulator here on the RDD
and you'd see the incremental update in the web UI automatically.

- Patrick

On Tue, Jul 22, 2014 at 2:09 PM, Shivaram Venkataraman
<sh...@eecs.berkeley.edu> wrote:
> From reading Neil's first e-mail, I think the motivation is to get some
> metrics in ADAM ? --  I've run into a similar use-case with having
> user-defined metrics in long-running tasks and I think a nice way to solve
> this would be to have user-defined TaskMetrics.
>
> To state my problem more clearly, lets say you have two functions you use
> in a map call and want to measure how much time each of them takes. For
> example, if you have a code block like the one below and you want to
> measure how much time f1 takes as a fraction of the task.
>
> a.map { l =>
>    val f = f1(l)
>    ... some work here ...
> }
>
> It would be really cool if we could do something like
>
> a.map { l =>
>    val start = System.nanoTime
>    val f = f1(l)
>    TaskMetrics.get("f1-time").add(System.nanoTime - start)
> }
>
> These task metrics have a different purpose from Accumulators in the sense
> that we don't need to track lineage, perform commutative operations etc.
>  Further we also have a bunch of code in place to aggregate task metrics
> across a stage etc. So it would be great if we could also populate these in
> the UI and show median/max etc.
> I think counters [1] in Hadoop served a similar purpose.
>
> Thanks
> Shivaram
>
> [1]
> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters
>
>
>
> On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nf...@gmail.com> wrote:
>
>> Hi Reynold
>>
>> Thanks for your reply.
>>
>> Accumulators are, of course, stored in the Accumulators object as
>> thread-local variables. However, the Accumulators object isn't public, so
>> when a Task is executing there's no way to get the set of accumulators for
>> the current thread -- accumulators still have to be passed to every method
>> that needs them.
>>
>> Additionally, unless an accumulator is explicitly referenced it won't be
>> serialized as part of a Task, and won't make it into the Accumulators
>> object in the first place.
>>
>> I should also note that what I'm proposing is not specific to Accumulators
>> -- I am proposing that any data can be stored in a thread-local variable. I
>> think there are probably many other use cases other than my one.
>>
>> Neil
>>
>>
>> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <rx...@databricks.com> wrote:
>>
>> > Thanks for the thoughtful email, Neil and Christopher.
>> >
>> > If I understand this correctly, it seems like the dynamic variable is
>> just
>> > a variant of the accumulator (a static one since it is a global object).
>> > Accumulators are already implemented using thread-local variables under
>> the
>> > hood. Am I misunderstanding something?
>> >
>> >
>> >
>> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <ct...@adatao.com>
>> > wrote:
>> >
>> > > Hi Neil, first off, I'm generally a sympathetic advocate for making
>> > changes
>> > > to Spark internals to make it easier/better/faster/more awesome.
>> > >
>> > > In this case, I'm (a) not clear about what you're trying to accomplish,
>> > and
>> > > (b) a bit worried about the proposed solution.
>> > >
>> > > On (a): it is stated that you want to pass some Accumulators around.
>> Yet
>> > > the proposed solution is for some "shared" variable that may be set and
>> > > "mapped out" and possibly "reduced back", but without any accompanying
>> > > accumulation semantics. And yet it doesn't seem like you only want just
>> > the
>> > > broadcast property. Can you clarify the problem statement with some
>> > > before/after client code examples?
>> > >
>> > > On (b): you're right that adding variables to SparkContext should be
>> done
>> > > with caution, as it may have unintended consequences beyond just serdes
>> > > payload size. For example, there is a stated intention of supporting
>> > > multiple SparkContexts in the future, and this proposed solution can
>> make
>> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching call to
>> > make
>> > > a while back on a subject related to this (see
>> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a
>> single
>> > > SparkContext application, there may be multiple "clients" (of that
>> > > application) whose intent to use the proposed "SparkDynamic" would not
>> > > necessarily be coordinated.
>> > >
>> > > So, considering a ratio of a/b (benefit/cost), it's not clear to me
>> that
>> > > the benefits are significant enough to warrant the costs. Do I
>> > > misunderstand that the benefit is to save one explicit parameter (the
>> > > "context") in the signature/closure code?
>> > >
>> > > --
>> > > Christopher T. Nguyen
>> > > Co-founder & CEO, Adatao <http://adatao.com>
>> > > linkedin.com/in/ctnguyen
>> > >
>> > >
>> > >
>> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nf...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi all
>> > > >
>> > > > I have been adding some metrics to the ADAM project
>> > > > https://github.com/bigdatagenomics/adam, which runs on Spark, and
>> > have a
>> > > > proposal for an enhancement to Spark that would make this work
>> cleaner
>> > > and
>> > > > easier.
>> > > >
>> > > > I need to pass some Accumulators around, which will aggregate metrics
>> > > > (timing stats and other metrics) across the cluster. However, it is
>> > > > cumbersome to have to explicitly pass some "context" containing these
>> > > > accumulators around everywhere that might need them. I can use Scala
>> > > > implicits, which help slightly, but I'd still need to modify every
>> > method
>> > > > in the call stack to take an implicit variable.
>> > > >
>> > > > So, I'd like to propose that we add the ability to have "dynamic
>> > > variables"
>> > > > (basically thread-local variables) to Spark. This would avoid having
>> to
>> > > > pass the Accumulators around explicitly.
>> > > >
>> > > > My proposed approach is to add a method to the SparkContext class as
>> > > > follows:
>> > > >
>> > > > /**
>> > > >  * Sets the value of a "dynamic variable". This value is made
>> available
>> > > to
>> > > > jobs
>> > > >  * without having to be passed around explicitly. During execution
>> of a
>> > > > Spark job
>> > > >  * this value can be obtained from the [[SparkDynamic]] object.
>> > > >  */
>> > > > def setDynamicVariableValue(value: Any)
>> > > >
>> > > > Then, when a job is executing the SparkDynamic can be accessed to
>> > obtain
>> > > > the value of the dynamic variable. The implementation of this object
>> is
>> > > as
>> > > > follows:
>> > > >
>> > > > object SparkDynamic {
>> > > >   private val dynamicVariable = new DynamicVariable[Any]()
>> > > >   /**
>> > > >    * Gets the value of the "dynamic variable" that has been set in
>> the
>> > > > [[SparkContext]]
>> > > >    */
>> > > >   def getValue: Option[Any] = {
>> > > >     Option(dynamicVariable.value)
>> > > >   }
>> > > >   private[spark] def withValue[S](threadValue: Option[Any])(thunk: =>
>> > > S): S
>> > > > = {
>> > > >     dynamicVariable.withValue(threadValue.orNull)(thunk)
>> > > >   }
>> > > > }
>> > > >
>> > > > The change involves modifying the Task object to serialize the value
>> of
>> > > the
>> > > > dynamic variable, and modifying the TaskRunner class to deserialize
>> the
>> > > > value and make it available in the thread that is running the task
>> > (using
>> > > > the SparkDynamic.withValue method).
>> > > >
>> > > > I have done a quick prototype of this in this commit:
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
>> > > > and it seems to work fine in my (limited) testing. It needs more
>> > testing,
>> > > > tidy-up and documentation though.
>> > > >
>> > > > One drawback is that the dynamic variable will be serialized for
>> every
>> > > Task
>> > > > whether it needs it or not. For my use case this might not be too
>> much
>> > > of a
>> > > > problem, as serializing and deserializing Accumulators looks fairly
>> > > > lightweight -- however we should certainly warn users against
>> setting a
>> > > > dynamic variable containing lots of data. I thought about using
>> > broadcast
>> > > > tables here, but I don't think it's possible to put Accumulators in a
>> > > > broadcast table (as I understand it, they're intended for purely
>> > > read-only
>> > > > data).
>> > > >
>> > > > What do people think about this proposal? My use case aside, it seems
>> > > like
>> > > > it would be a generally useful enhancment to be able to pass certain
>> > data
>> > > > around without having to explicitly pass it everywhere.
>> > > >
>> > > > Neil
>> > > >
>> > >
>> >
>>

Re: "Dynamic variables" in Spark

Posted by Shivaram Venkataraman <sh...@eecs.berkeley.edu>.
>From reading Neil's first e-mail, I think the motivation is to get some
metrics in ADAM ? --  I've run into a similar use-case with having
user-defined metrics in long-running tasks and I think a nice way to solve
this would be to have user-defined TaskMetrics.

To state my problem more clearly, lets say you have two functions you use
in a map call and want to measure how much time each of them takes. For
example, if you have a code block like the one below and you want to
measure how much time f1 takes as a fraction of the task.

a.map { l =>
   val f = f1(l)
   ... some work here ...
}

It would be really cool if we could do something like

a.map { l =>
   val start = System.nanoTime
   val f = f1(l)
   TaskMetrics.get("f1-time").add(System.nanoTime - start)
}

These task metrics have a different purpose from Accumulators in the sense
that we don't need to track lineage, perform commutative operations etc.
 Further we also have a bunch of code in place to aggregate task metrics
across a stage etc. So it would be great if we could also populate these in
the UI and show median/max etc.
I think counters [1] in Hadoop served a similar purpose.

Thanks
Shivaram

[1]
https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/counters



On Tue, Jul 22, 2014 at 1:43 PM, Neil Ferguson <nf...@gmail.com> wrote:

> Hi Reynold
>
> Thanks for your reply.
>
> Accumulators are, of course, stored in the Accumulators object as
> thread-local variables. However, the Accumulators object isn't public, so
> when a Task is executing there's no way to get the set of accumulators for
> the current thread -- accumulators still have to be passed to every method
> that needs them.
>
> Additionally, unless an accumulator is explicitly referenced it won't be
> serialized as part of a Task, and won't make it into the Accumulators
> object in the first place.
>
> I should also note that what I'm proposing is not specific to Accumulators
> -- I am proposing that any data can be stored in a thread-local variable. I
> think there are probably many other use cases other than my one.
>
> Neil
>
>
> On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <rx...@databricks.com> wrote:
>
> > Thanks for the thoughtful email, Neil and Christopher.
> >
> > If I understand this correctly, it seems like the dynamic variable is
> just
> > a variant of the accumulator (a static one since it is a global object).
> > Accumulators are already implemented using thread-local variables under
> the
> > hood. Am I misunderstanding something?
> >
> >
> >
> > On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <ct...@adatao.com>
> > wrote:
> >
> > > Hi Neil, first off, I'm generally a sympathetic advocate for making
> > changes
> > > to Spark internals to make it easier/better/faster/more awesome.
> > >
> > > In this case, I'm (a) not clear about what you're trying to accomplish,
> > and
> > > (b) a bit worried about the proposed solution.
> > >
> > > On (a): it is stated that you want to pass some Accumulators around.
> Yet
> > > the proposed solution is for some "shared" variable that may be set and
> > > "mapped out" and possibly "reduced back", but without any accompanying
> > > accumulation semantics. And yet it doesn't seem like you only want just
> > the
> > > broadcast property. Can you clarify the problem statement with some
> > > before/after client code examples?
> > >
> > > On (b): you're right that adding variables to SparkContext should be
> done
> > > with caution, as it may have unintended consequences beyond just serdes
> > > payload size. For example, there is a stated intention of supporting
> > > multiple SparkContexts in the future, and this proposed solution can
> make
> > > it a bigger challenge to do so. Indeed, we had a gut-wrenching call to
> > make
> > > a while back on a subject related to this (see
> > > https://github.com/mesos/spark/pull/779). Furthermore, even in a
> single
> > > SparkContext application, there may be multiple "clients" (of that
> > > application) whose intent to use the proposed "SparkDynamic" would not
> > > necessarily be coordinated.
> > >
> > > So, considering a ratio of a/b (benefit/cost), it's not clear to me
> that
> > > the benefits are significant enough to warrant the costs. Do I
> > > misunderstand that the benefit is to save one explicit parameter (the
> > > "context") in the signature/closure code?
> > >
> > > --
> > > Christopher T. Nguyen
> > > Co-founder & CEO, Adatao <http://adatao.com>
> > > linkedin.com/in/ctnguyen
> > >
> > >
> > >
> > > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nf...@gmail.com>
> > > wrote:
> > >
> > > > Hi all
> > > >
> > > > I have been adding some metrics to the ADAM project
> > > > https://github.com/bigdatagenomics/adam, which runs on Spark, and
> > have a
> > > > proposal for an enhancement to Spark that would make this work
> cleaner
> > > and
> > > > easier.
> > > >
> > > > I need to pass some Accumulators around, which will aggregate metrics
> > > > (timing stats and other metrics) across the cluster. However, it is
> > > > cumbersome to have to explicitly pass some "context" containing these
> > > > accumulators around everywhere that might need them. I can use Scala
> > > > implicits, which help slightly, but I'd still need to modify every
> > method
> > > > in the call stack to take an implicit variable.
> > > >
> > > > So, I'd like to propose that we add the ability to have "dynamic
> > > variables"
> > > > (basically thread-local variables) to Spark. This would avoid having
> to
> > > > pass the Accumulators around explicitly.
> > > >
> > > > My proposed approach is to add a method to the SparkContext class as
> > > > follows:
> > > >
> > > > /**
> > > >  * Sets the value of a "dynamic variable". This value is made
> available
> > > to
> > > > jobs
> > > >  * without having to be passed around explicitly. During execution
> of a
> > > > Spark job
> > > >  * this value can be obtained from the [[SparkDynamic]] object.
> > > >  */
> > > > def setDynamicVariableValue(value: Any)
> > > >
> > > > Then, when a job is executing the SparkDynamic can be accessed to
> > obtain
> > > > the value of the dynamic variable. The implementation of this object
> is
> > > as
> > > > follows:
> > > >
> > > > object SparkDynamic {
> > > >   private val dynamicVariable = new DynamicVariable[Any]()
> > > >   /**
> > > >    * Gets the value of the "dynamic variable" that has been set in
> the
> > > > [[SparkContext]]
> > > >    */
> > > >   def getValue: Option[Any] = {
> > > >     Option(dynamicVariable.value)
> > > >   }
> > > >   private[spark] def withValue[S](threadValue: Option[Any])(thunk: =>
> > > S): S
> > > > = {
> > > >     dynamicVariable.withValue(threadValue.orNull)(thunk)
> > > >   }
> > > > }
> > > >
> > > > The change involves modifying the Task object to serialize the value
> of
> > > the
> > > > dynamic variable, and modifying the TaskRunner class to deserialize
> the
> > > > value and make it available in the thread that is running the task
> > (using
> > > > the SparkDynamic.withValue method).
> > > >
> > > > I have done a quick prototype of this in this commit:
> > > >
> > > >
> > >
> >
> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
> > > > and it seems to work fine in my (limited) testing. It needs more
> > testing,
> > > > tidy-up and documentation though.
> > > >
> > > > One drawback is that the dynamic variable will be serialized for
> every
> > > Task
> > > > whether it needs it or not. For my use case this might not be too
> much
> > > of a
> > > > problem, as serializing and deserializing Accumulators looks fairly
> > > > lightweight -- however we should certainly warn users against
> setting a
> > > > dynamic variable containing lots of data. I thought about using
> > broadcast
> > > > tables here, but I don't think it's possible to put Accumulators in a
> > > > broadcast table (as I understand it, they're intended for purely
> > > read-only
> > > > data).
> > > >
> > > > What do people think about this proposal? My use case aside, it seems
> > > like
> > > > it would be a generally useful enhancment to be able to pass certain
> > data
> > > > around without having to explicitly pass it everywhere.
> > > >
> > > > Neil
> > > >
> > >
> >
>

Re: "Dynamic variables" in Spark

Posted by Neil Ferguson <nf...@gmail.com>.
Hi Reynold

Thanks for your reply.

Accumulators are, of course, stored in the Accumulators object as
thread-local variables. However, the Accumulators object isn't public, so
when a Task is executing there's no way to get the set of accumulators for
the current thread -- accumulators still have to be passed to every method
that needs them.

Additionally, unless an accumulator is explicitly referenced it won't be
serialized as part of a Task, and won't make it into the Accumulators
object in the first place.

I should also note that what I'm proposing is not specific to Accumulators
-- I am proposing that any data can be stored in a thread-local variable. I
think there are probably many other use cases other than my one.

Neil


On Tue, Jul 22, 2014 at 5:39 AM, Reynold Xin <rx...@databricks.com> wrote:

> Thanks for the thoughtful email, Neil and Christopher.
>
> If I understand this correctly, it seems like the dynamic variable is just
> a variant of the accumulator (a static one since it is a global object).
> Accumulators are already implemented using thread-local variables under the
> hood. Am I misunderstanding something?
>
>
>
> On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <ct...@adatao.com>
> wrote:
>
> > Hi Neil, first off, I'm generally a sympathetic advocate for making
> changes
> > to Spark internals to make it easier/better/faster/more awesome.
> >
> > In this case, I'm (a) not clear about what you're trying to accomplish,
> and
> > (b) a bit worried about the proposed solution.
> >
> > On (a): it is stated that you want to pass some Accumulators around. Yet
> > the proposed solution is for some "shared" variable that may be set and
> > "mapped out" and possibly "reduced back", but without any accompanying
> > accumulation semantics. And yet it doesn't seem like you only want just
> the
> > broadcast property. Can you clarify the problem statement with some
> > before/after client code examples?
> >
> > On (b): you're right that adding variables to SparkContext should be done
> > with caution, as it may have unintended consequences beyond just serdes
> > payload size. For example, there is a stated intention of supporting
> > multiple SparkContexts in the future, and this proposed solution can make
> > it a bigger challenge to do so. Indeed, we had a gut-wrenching call to
> make
> > a while back on a subject related to this (see
> > https://github.com/mesos/spark/pull/779). Furthermore, even in a single
> > SparkContext application, there may be multiple "clients" (of that
> > application) whose intent to use the proposed "SparkDynamic" would not
> > necessarily be coordinated.
> >
> > So, considering a ratio of a/b (benefit/cost), it's not clear to me that
> > the benefits are significant enough to warrant the costs. Do I
> > misunderstand that the benefit is to save one explicit parameter (the
> > "context") in the signature/closure code?
> >
> > --
> > Christopher T. Nguyen
> > Co-founder & CEO, Adatao <http://adatao.com>
> > linkedin.com/in/ctnguyen
> >
> >
> >
> > On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nf...@gmail.com>
> > wrote:
> >
> > > Hi all
> > >
> > > I have been adding some metrics to the ADAM project
> > > https://github.com/bigdatagenomics/adam, which runs on Spark, and
> have a
> > > proposal for an enhancement to Spark that would make this work cleaner
> > and
> > > easier.
> > >
> > > I need to pass some Accumulators around, which will aggregate metrics
> > > (timing stats and other metrics) across the cluster. However, it is
> > > cumbersome to have to explicitly pass some "context" containing these
> > > accumulators around everywhere that might need them. I can use Scala
> > > implicits, which help slightly, but I'd still need to modify every
> method
> > > in the call stack to take an implicit variable.
> > >
> > > So, I'd like to propose that we add the ability to have "dynamic
> > variables"
> > > (basically thread-local variables) to Spark. This would avoid having to
> > > pass the Accumulators around explicitly.
> > >
> > > My proposed approach is to add a method to the SparkContext class as
> > > follows:
> > >
> > > /**
> > >  * Sets the value of a "dynamic variable". This value is made available
> > to
> > > jobs
> > >  * without having to be passed around explicitly. During execution of a
> > > Spark job
> > >  * this value can be obtained from the [[SparkDynamic]] object.
> > >  */
> > > def setDynamicVariableValue(value: Any)
> > >
> > > Then, when a job is executing the SparkDynamic can be accessed to
> obtain
> > > the value of the dynamic variable. The implementation of this object is
> > as
> > > follows:
> > >
> > > object SparkDynamic {
> > >   private val dynamicVariable = new DynamicVariable[Any]()
> > >   /**
> > >    * Gets the value of the "dynamic variable" that has been set in the
> > > [[SparkContext]]
> > >    */
> > >   def getValue: Option[Any] = {
> > >     Option(dynamicVariable.value)
> > >   }
> > >   private[spark] def withValue[S](threadValue: Option[Any])(thunk: =>
> > S): S
> > > = {
> > >     dynamicVariable.withValue(threadValue.orNull)(thunk)
> > >   }
> > > }
> > >
> > > The change involves modifying the Task object to serialize the value of
> > the
> > > dynamic variable, and modifying the TaskRunner class to deserialize the
> > > value and make it available in the thread that is running the task
> (using
> > > the SparkDynamic.withValue method).
> > >
> > > I have done a quick prototype of this in this commit:
> > >
> > >
> >
> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
> > > and it seems to work fine in my (limited) testing. It needs more
> testing,
> > > tidy-up and documentation though.
> > >
> > > One drawback is that the dynamic variable will be serialized for every
> > Task
> > > whether it needs it or not. For my use case this might not be too much
> > of a
> > > problem, as serializing and deserializing Accumulators looks fairly
> > > lightweight -- however we should certainly warn users against setting a
> > > dynamic variable containing lots of data. I thought about using
> broadcast
> > > tables here, but I don't think it's possible to put Accumulators in a
> > > broadcast table (as I understand it, they're intended for purely
> > read-only
> > > data).
> > >
> > > What do people think about this proposal? My use case aside, it seems
> > like
> > > it would be a generally useful enhancment to be able to pass certain
> data
> > > around without having to explicitly pass it everywhere.
> > >
> > > Neil
> > >
> >
>

Re: "Dynamic variables" in Spark

Posted by Reynold Xin <rx...@databricks.com>.
Thanks for the thoughtful email, Neil and Christopher.

If I understand this correctly, it seems like the dynamic variable is just
a variant of the accumulator (a static one since it is a global object).
Accumulators are already implemented using thread-local variables under the
hood. Am I misunderstanding something?



On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <ct...@adatao.com> wrote:

> Hi Neil, first off, I'm generally a sympathetic advocate for making changes
> to Spark internals to make it easier/better/faster/more awesome.
>
> In this case, I'm (a) not clear about what you're trying to accomplish, and
> (b) a bit worried about the proposed solution.
>
> On (a): it is stated that you want to pass some Accumulators around. Yet
> the proposed solution is for some "shared" variable that may be set and
> "mapped out" and possibly "reduced back", but without any accompanying
> accumulation semantics. And yet it doesn't seem like you only want just the
> broadcast property. Can you clarify the problem statement with some
> before/after client code examples?
>
> On (b): you're right that adding variables to SparkContext should be done
> with caution, as it may have unintended consequences beyond just serdes
> payload size. For example, there is a stated intention of supporting
> multiple SparkContexts in the future, and this proposed solution can make
> it a bigger challenge to do so. Indeed, we had a gut-wrenching call to make
> a while back on a subject related to this (see
> https://github.com/mesos/spark/pull/779). Furthermore, even in a single
> SparkContext application, there may be multiple "clients" (of that
> application) whose intent to use the proposed "SparkDynamic" would not
> necessarily be coordinated.
>
> So, considering a ratio of a/b (benefit/cost), it's not clear to me that
> the benefits are significant enough to warrant the costs. Do I
> misunderstand that the benefit is to save one explicit parameter (the
> "context") in the signature/closure code?
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nf...@gmail.com>
> wrote:
>
> > Hi all
> >
> > I have been adding some metrics to the ADAM project
> > https://github.com/bigdatagenomics/adam, which runs on Spark, and have a
> > proposal for an enhancement to Spark that would make this work cleaner
> and
> > easier.
> >
> > I need to pass some Accumulators around, which will aggregate metrics
> > (timing stats and other metrics) across the cluster. However, it is
> > cumbersome to have to explicitly pass some "context" containing these
> > accumulators around everywhere that might need them. I can use Scala
> > implicits, which help slightly, but I'd still need to modify every method
> > in the call stack to take an implicit variable.
> >
> > So, I'd like to propose that we add the ability to have "dynamic
> variables"
> > (basically thread-local variables) to Spark. This would avoid having to
> > pass the Accumulators around explicitly.
> >
> > My proposed approach is to add a method to the SparkContext class as
> > follows:
> >
> > /**
> >  * Sets the value of a "dynamic variable". This value is made available
> to
> > jobs
> >  * without having to be passed around explicitly. During execution of a
> > Spark job
> >  * this value can be obtained from the [[SparkDynamic]] object.
> >  */
> > def setDynamicVariableValue(value: Any)
> >
> > Then, when a job is executing the SparkDynamic can be accessed to obtain
> > the value of the dynamic variable. The implementation of this object is
> as
> > follows:
> >
> > object SparkDynamic {
> >   private val dynamicVariable = new DynamicVariable[Any]()
> >   /**
> >    * Gets the value of the "dynamic variable" that has been set in the
> > [[SparkContext]]
> >    */
> >   def getValue: Option[Any] = {
> >     Option(dynamicVariable.value)
> >   }
> >   private[spark] def withValue[S](threadValue: Option[Any])(thunk: =>
> S): S
> > = {
> >     dynamicVariable.withValue(threadValue.orNull)(thunk)
> >   }
> > }
> >
> > The change involves modifying the Task object to serialize the value of
> the
> > dynamic variable, and modifying the TaskRunner class to deserialize the
> > value and make it available in the thread that is running the task (using
> > the SparkDynamic.withValue method).
> >
> > I have done a quick prototype of this in this commit:
> >
> >
> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
> > and it seems to work fine in my (limited) testing. It needs more testing,
> > tidy-up and documentation though.
> >
> > One drawback is that the dynamic variable will be serialized for every
> Task
> > whether it needs it or not. For my use case this might not be too much
> of a
> > problem, as serializing and deserializing Accumulators looks fairly
> > lightweight -- however we should certainly warn users against setting a
> > dynamic variable containing lots of data. I thought about using broadcast
> > tables here, but I don't think it's possible to put Accumulators in a
> > broadcast table (as I understand it, they're intended for purely
> read-only
> > data).
> >
> > What do people think about this proposal? My use case aside, it seems
> like
> > it would be a generally useful enhancment to be able to pass certain data
> > around without having to explicitly pass it everywhere.
> >
> > Neil
> >
>

Re: "Dynamic variables" in Spark

Posted by Christopher Nguyen <ct...@adatao.com>.
Hi Neil, first off, I'm generally a sympathetic advocate for making changes
to Spark internals to make it easier/better/faster/more awesome.

In this case, I'm (a) not clear about what you're trying to accomplish, and
(b) a bit worried about the proposed solution.

On (a): it is stated that you want to pass some Accumulators around. Yet
the proposed solution is for some "shared" variable that may be set and
"mapped out" and possibly "reduced back", but without any accompanying
accumulation semantics. And yet it doesn't seem like you only want just the
broadcast property. Can you clarify the problem statement with some
before/after client code examples?

On (b): you're right that adding variables to SparkContext should be done
with caution, as it may have unintended consequences beyond just serdes
payload size. For example, there is a stated intention of supporting
multiple SparkContexts in the future, and this proposed solution can make
it a bigger challenge to do so. Indeed, we had a gut-wrenching call to make
a while back on a subject related to this (see
https://github.com/mesos/spark/pull/779). Furthermore, even in a single
SparkContext application, there may be multiple "clients" (of that
application) whose intent to use the proposed "SparkDynamic" would not
necessarily be coordinated.

So, considering a ratio of a/b (benefit/cost), it's not clear to me that
the benefits are significant enough to warrant the costs. Do I
misunderstand that the benefit is to save one explicit parameter (the
"context") in the signature/closure code?

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nf...@gmail.com> wrote:

> Hi all
>
> I have been adding some metrics to the ADAM project
> https://github.com/bigdatagenomics/adam, which runs on Spark, and have a
> proposal for an enhancement to Spark that would make this work cleaner and
> easier.
>
> I need to pass some Accumulators around, which will aggregate metrics
> (timing stats and other metrics) across the cluster. However, it is
> cumbersome to have to explicitly pass some "context" containing these
> accumulators around everywhere that might need them. I can use Scala
> implicits, which help slightly, but I'd still need to modify every method
> in the call stack to take an implicit variable.
>
> So, I'd like to propose that we add the ability to have "dynamic variables"
> (basically thread-local variables) to Spark. This would avoid having to
> pass the Accumulators around explicitly.
>
> My proposed approach is to add a method to the SparkContext class as
> follows:
>
> /**
>  * Sets the value of a "dynamic variable". This value is made available to
> jobs
>  * without having to be passed around explicitly. During execution of a
> Spark job
>  * this value can be obtained from the [[SparkDynamic]] object.
>  */
> def setDynamicVariableValue(value: Any)
>
> Then, when a job is executing the SparkDynamic can be accessed to obtain
> the value of the dynamic variable. The implementation of this object is as
> follows:
>
> object SparkDynamic {
>   private val dynamicVariable = new DynamicVariable[Any]()
>   /**
>    * Gets the value of the "dynamic variable" that has been set in the
> [[SparkContext]]
>    */
>   def getValue: Option[Any] = {
>     Option(dynamicVariable.value)
>   }
>   private[spark] def withValue[S](threadValue: Option[Any])(thunk: => S): S
> = {
>     dynamicVariable.withValue(threadValue.orNull)(thunk)
>   }
> }
>
> The change involves modifying the Task object to serialize the value of the
> dynamic variable, and modifying the TaskRunner class to deserialize the
> value and make it available in the thread that is running the task (using
> the SparkDynamic.withValue method).
>
> I have done a quick prototype of this in this commit:
>
> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
> and it seems to work fine in my (limited) testing. It needs more testing,
> tidy-up and documentation though.
>
> One drawback is that the dynamic variable will be serialized for every Task
> whether it needs it or not. For my use case this might not be too much of a
> problem, as serializing and deserializing Accumulators looks fairly
> lightweight -- however we should certainly warn users against setting a
> dynamic variable containing lots of data. I thought about using broadcast
> tables here, but I don't think it's possible to put Accumulators in a
> broadcast table (as I understand it, they're intended for purely read-only
> data).
>
> What do people think about this proposal? My use case aside, it seems like
> it would be a generally useful enhancment to be able to pass certain data
> around without having to explicitly pass it everywhere.
>
> Neil
>