You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kevin <ke...@gmail.com> on 2014/07/30 19:07:47 UTC

Keep state inside map function

Hi,

Is it possible to maintain state inside a Spark map function? With Hadoop
MapReduce, Mappers and Reducers are classes that can have their own state
using instance variables. Can this be done with Spark? Are there any
examples?

Most examples I have seen do a simple operating on the value passed into
the map function and then pass it along to the reduce function.

Thanks in advance.

-Kevin

Re: Keep state inside map function

Posted by Koert Kuipers <ko...@tresata.com>.
doing cleanup in an iterator like that assumes the iterator always gets
fully read, which is not necessary the case (for example RDD.take does not).

instead i would use mapPartitionsWithContext, in which case you can write a
function of the form.
 f: (TaskContext, Iterator[T]) => Iterator[U]

now you can register a cleanup with the task context, like this:
context.addTaskCompletionListener(new TaskCompletionListener {
  override def onTaskCompletion(context: TaskContext): Unit = dosomething
)

and after that proceed with an iterator transformation as usual


On Thu, Jul 31, 2014 at 4:35 AM, Sean Owen <so...@cloudera.com> wrote:

> On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:
> > rdd.mapPartitions { partition =>
> >    // Some setup code here
> >    val result = partition.map(yourfunction)
> >
> >    // Some cleanup code here
> >    result
> > }
>
> Yes, I realized that after I hit send. You definitely have to store
> and return the result from the mapping!
>
>
> > rdd.mapPartitions { partition =>
> >    if (!partition.isEmpty) {
> >
> >      // Some setup code here
> >      partition.map(item => {
> >        val output = yourfunction(item)
> >        if (!partition.hasNext) {
> >          // Some cleanup code here
> >        }
> >        output
> >      })
> >    } else {
> >      // return an empty Iterator of your return type
> >    }
> > }
>
> Great point, yeah. If you knew the number of values were small you
> could collect them and process locally, but this is the right general
> way to do it.
>

Re: Keep state inside map function

Posted by Sean Owen <so...@cloudera.com>.
On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:
> rdd.mapPartitions { partition =>
>    // Some setup code here
>    val result = partition.map(yourfunction)
>
>    // Some cleanup code here
>    result
> }

Yes, I realized that after I hit send. You definitely have to store
and return the result from the mapping!


> rdd.mapPartitions { partition =>
>    if (!partition.isEmpty) {
>
>      // Some setup code here
>      partition.map(item => {
>        val output = yourfunction(item)
>        if (!partition.hasNext) {
>          // Some cleanup code here
>        }
>        output
>      })
>    } else {
>      // return an empty Iterator of your return type
>    }
> }

Great point, yeah. If you knew the number of values were small you
could collect them and process locally, but this is the right general
way to do it.

Re: Keep state inside map function

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

On Thu, Jul 31, 2014 at 2:23 AM, Sean Owen <so...@cloudera.com> wrote:
>
> ... you can run setup code before mapping a bunch of records, and
> after, like so:
>
> rdd.mapPartitions { partition =>
>    // Some setup code here
>    partition.map(yourfunction)
>    // Some cleanup code here
> }
>

Please be careful with that, it will not work as expected. First, it would
have to be

rdd.mapPartitions { partition =>
   // Some setup code here
   val result = partition.map(yourfunction)
   // Some cleanup code here
   result
}

because the function passed in to mapPartitions() needs to return an
Iterator, and if you do it like this, then the cleanup code will run
*before* the processing takes place because partition.map() is executed
lazily.

One example of what actually works is:

rdd.mapPartitions { partition =>
   if (!partition.isEmpty) {
     // Some setup code here
     partition.map(item => {
       val output = yourfunction(item)
       if (!partition.hasNext) {
         // Some cleanup code here
       }
       output
     })
   } else {
     // return an empty Iterator of your return type
   }
}

That is not very pretty, but it is the only way I found to actually get
tearDown code run after map() is run.

Tobias

Re: Keep state inside map function

Posted by Kevin <ke...@gmail.com>.
Thanks to the both of you for your inputs. Looks like I'll play with the
mapPartitions function to start porting MapReduce algorithms to Spark.


On Wed, Jul 30, 2014 at 1:23 PM, Sean Owen <so...@cloudera.com> wrote:

> Really, the analog of a Mapper is not map(), but mapPartitions(). Instead
> of:
>
> rdd.map(yourFunction)
>
> ... you can run setup code before mapping a bunch of records, and
> after, like so:
>
> rdd.mapPartitions { partition =>
>    // Some setup code here
>    partition.map(yourfunction)
>    // Some cleanup code here
> }
>
> You couldn't share state across Mappers, or Mappers and Reducers in
> Hadoop. (At least there was no direct way.) Same here. But you can
> maintain state across many map calls.
>
> On Wed, Jul 30, 2014 at 6:07 PM, Kevin <ke...@gmail.com> wrote:
> > Hi,
> >
> > Is it possible to maintain state inside a Spark map function? With Hadoop
> > MapReduce, Mappers and Reducers are classes that can have their own state
> > using instance variables. Can this be done with Spark? Are there any
> > examples?
> >
> > Most examples I have seen do a simple operating on the value passed into
> the
> > map function and then pass it along to the reduce function.
> >
> > Thanks in advance.
> >
> > -Kevin
>

Re: Keep state inside map function

Posted by Sean Owen <so...@cloudera.com>.
Really, the analog of a Mapper is not map(), but mapPartitions(). Instead of:

rdd.map(yourFunction)

... you can run setup code before mapping a bunch of records, and
after, like so:

rdd.mapPartitions { partition =>
   // Some setup code here
   partition.map(yourfunction)
   // Some cleanup code here
}

You couldn't share state across Mappers, or Mappers and Reducers in
Hadoop. (At least there was no direct way.) Same here. But you can
maintain state across many map calls.

On Wed, Jul 30, 2014 at 6:07 PM, Kevin <ke...@gmail.com> wrote:
> Hi,
>
> Is it possible to maintain state inside a Spark map function? With Hadoop
> MapReduce, Mappers and Reducers are classes that can have their own state
> using instance variables. Can this be done with Spark? Are there any
> examples?
>
> Most examples I have seen do a simple operating on the value passed into the
> map function and then pass it along to the reduce function.
>
> Thanks in advance.
>
> -Kevin

Re: Keep state inside map function

Posted by aaronjosephs <aa...@placeiq.com>.
use mapPartitions to get the equivalent functionality to hadoop



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Keep-state-inside-map-function-tp10968p10969.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.