You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jeroen Miller <bl...@gmail.com> on 2016/06/07 21:54:51 UTC

Sequential computation over several partitions

Dear fellow Sparkers,

I am a new Spark user and I am trying to solve a (conceptually simple)
problem which may not be a good use case for Spark, at least for the RDD
API. But before I turn my back on it, I would rather have the opinion of
more knowledgeable developers than me, as it is highly likely that I am
missing something.

Here is my problem in a nutshell.

I have numerous files where each line is an event of the form:

    (subscribe|unsubscribe),<list_id>,<user_id>,<timestamp>

I need to gather time-framed (for example, weekly or monthly) statistics of
the kind:

  <list_id>,
  <num_current_users>,
  <total_num_users_who_joined_from_dawn_of_time>,
  <total_num_events_from_dawn_of_time>

Ideally, I would need a Spark job that output these statistics for all time
periods at once. The number of unique <list_id> is a about a few hundreds,
the number of unique <user_id> is a few dozens of millions.

The trouble is that the data is not "clean", in the sense that I can have
'unsubscribe' events for users that are not subscribed, or 'subscribe'
events for users that are already subscribed.

This means I have to keep in memory the complete list of

    (subscribe|unsubscribe),<list_id>,<user_id>,<timestamp>

keeping only the entry for the most recent <timestamp> for a given couple
(list_id,user_id).

If one is only interested in keeping the final statistics, this is
relatively easy to do with reduceByKey and combineByKey on a properly keyed
RDD containing all events.

However I am struggling when it comes down to compute the "partial"
statistics, as I obviously do not want to duplicate most of the
computations done for period (i-1) when I am adding the events for period
(i) as my reduceByKey/combineByKey approach will lead to.

Sequentially, the problem is trivial: keep all events (with the latest
'valid' event for each couple (list_id,user_id)) in a huge hash table which
can be used to decide whether to increment or decrement <num_current_users>
(for example) and save the states of the current statistics whenever we are
done dealing with period (i).

I do not know how to efficiently solve this in Spark though.

A naive idea would be to fetch the data for period(0) in an explicitly
partitioned RDD (for example according to the last few characters of
<user_id>) and proceed in a sequential fashion within a call to
mapPartition.

The trouble would then be how to process new data files for later periods.
Suppose I store the event RDDs in an array 'data' (indexed by period
number), all of them similarly partitioned, I am afraid something like this
is not viable (please excuse pseudo-code):

    data[0].mapPartitionWithIndex(

      (index, iterator) => {
            //
            // 1. Initialize 'hashmap' keyed by (list_id,user_id) for the
partition
            //
            val hashmap = new HashMap[(String, String), Event]

            //
            // 2. Iterate over events in data[0] rdd, update 'hashmap',
            //    output stats for this partition and period.
            //
            while (iterator.hasNext) {
                //
                // Process entry, update 'hashmap', output stats
                // for the partition and period.
                //
            }

            //
            // 3. Loop over all the periods.
            //
            for (period <- 1 until max) {
                val next = data[period].mapPartitionWithIndex(
                    (index2, iterator2) => {
                        if (index2 == index) {
                            while (iterator2.hasNext) {
                                //
                                // Iterate over the elements of next (since
                                // the data should be on the same node, so
no
                                // shuffling after the initial partitioning,
                                // right?), update 'hashmap', and output
stats
                                // for this partition and period.
                                //
                            }
                        } else {
                            iterator2
                        }
                    }
                )
            }
        }
    )

The trouble with this approach it that I am afraid the data files for
period (i > 0) will be read as many times as there are partitions in
data[0] unless I explicitly persist them maybe, which is inefficient. That
said there is probably a (clumsy) way to unpersist them once I am sure I'm
100% done with them.

All of this looks not only inelegant but shamefully un-spark like to me.

Am I missing a trick here, maybe a well-known pattern? Are RDDs not the
most appropriate API to handle this kind of tasks? If so, what do you
suggest I could look into?

Thank you for taking the time to read that overly long message ;-)

Jeroen

Fwd: Sequential computation over several partitions

Posted by Jeroen Miller <bl...@gmail.com>.
Hello,

On Wed, Jun 8, 2016 at 12:59 AM, Mich Talebzadeh wrote:
>
> one thing you may consider is using something like flume to store
> data on hfs. [...]

Thank you for your sensible suggestions.

> Have you thought of other tools besides Spark?

No, as least not seriously yet. Flume looks like a good candidate
indeed but other distributed key-value stores (Cassandra, HBase, Redis
Cluster) would fit the bill too I guess. Of course, the lighter the
better.

Other than that, does anyone has any comment on how to proceed with a
Spark job? I understand this may not be the best solution, but I'd
like to know if there is an efficient way to solve my problem with the
RDD API out of curiosity.

Thanks,

--
Jeroen

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Sequential computation over several partitions

Posted by Mich Talebzadeh <mi...@gmail.com>.
Am I correct in understanding that you want to read and iterate all the
data to be correct. For example if a user is already unsubscribed then you
want to ignore all the subsequent subscribe regardless

how often do you want to iterate through the full data. The frequency of
your analysis?

the issue I believe you may face as you go from t0-> t1-.tn you volume of
data is going to rise.

How about periodic storage of your analysis and working on deltas only
afterwards?

What sort of data is it? Is it typical web-users?

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 7 June 2016 at 22:54, Jeroen Miller <bl...@gmail.com> wrote:

> Dear fellow Sparkers,
>
> I am a new Spark user and I am trying to solve a (conceptually simple)
> problem which may not be a good use case for Spark, at least for the RDD
> API. But before I turn my back on it, I would rather have the opinion of
> more knowledgeable developers than me, as it is highly likely that I am
> missing something.
>
> Here is my problem in a nutshell.
>
> I have numerous files where each line is an event of the form:
>
>     (subscribe|unsubscribe),<list_id>,<user_id>,<timestamp>
>
> I need to gather time-framed (for example, weekly or monthly) statistics
> of the kind:
>
>   <list_id>,
>   <num_current_users>,
>   <total_num_users_who_joined_from_dawn_of_time>,
>   <total_num_events_from_dawn_of_time>
>
> Ideally, I would need a Spark job that output these statistics for all
> time periods at once. The number of unique <list_id> is a about a few
> hundreds, the number of unique <user_id> is a few dozens of millions.
>
> The trouble is that the data is not "clean", in the sense that I can have
> 'unsubscribe' events for users that are not subscribed, or 'subscribe'
> events for users that are already subscribed.
>
> This means I have to keep in memory the complete list of
>
>     (subscribe|unsubscribe),<list_id>,<user_id>,<timestamp>
>
> keeping only the entry for the most recent <timestamp> for a given couple
> (list_id,user_id).
>
> If one is only interested in keeping the final statistics, this is
> relatively easy to do with reduceByKey and combineByKey on a properly keyed
> RDD containing all events.
>
> However I am struggling when it comes down to compute the "partial"
> statistics, as I obviously do not want to duplicate most of the
> computations done for period (i-1) when I am adding the events for period
> (i) as my reduceByKey/combineByKey approach will lead to.
>
> Sequentially, the problem is trivial: keep all events (with the latest
> 'valid' event for each couple (list_id,user_id)) in a huge hash table which
> can be used to decide whether to increment or decrement <num_current_users>
> (for example) and save the states of the current statistics whenever we are
> done dealing with period (i).
>
> I do not know how to efficiently solve this in Spark though.
>
> A naive idea would be to fetch the data for period(0) in an explicitly
> partitioned RDD (for example according to the last few characters of
> <user_id>) and proceed in a sequential fashion within a call to
> mapPartition.
>
> The trouble would then be how to process new data files for later periods.
> Suppose I store the event RDDs in an array 'data' (indexed by period
> number), all of them similarly partitioned, I am afraid something like this
> is not viable (please excuse pseudo-code):
>
>     data[0].mapPartitionWithIndex(
>
>       (index, iterator) => {
>             //
>             // 1. Initialize 'hashmap' keyed by (list_id,user_id) for the
> partition
>             //
>             val hashmap = new HashMap[(String, String), Event]
>
>             //
>             // 2. Iterate over events in data[0] rdd, update 'hashmap',
>             //    output stats for this partition and period.
>             //
>             while (iterator.hasNext) {
>                 //
>                 // Process entry, update 'hashmap', output stats
>                 // for the partition and period.
>                 //
>             }
>
>             //
>             // 3. Loop over all the periods.
>             //
>             for (period <- 1 until max) {
>                 val next = data[period].mapPartitionWithIndex(
>                     (index2, iterator2) => {
>                         if (index2 == index) {
>                             while (iterator2.hasNext) {
>                                 //
>                                 // Iterate over the elements of next (since
>                                 // the data should be on the same node, so
> no
>                                 // shuffling after the initial
> partitioning,
>                                 // right?), update 'hashmap', and output
> stats
>                                 // for this partition and period.
>                                 //
>                             }
>                         } else {
>                             iterator2
>                         }
>                     }
>                 )
>             }
>         }
>     )
>
> The trouble with this approach it that I am afraid the data files for
> period (i > 0) will be read as many times as there are partitions in
> data[0] unless I explicitly persist them maybe, which is inefficient. That
> said there is probably a (clumsy) way to unpersist them once I am sure I'm
> 100% done with them.
>
> All of this looks not only inelegant but shamefully un-spark like to me.
>
> Am I missing a trick here, maybe a well-known pattern? Are RDDs not the
> most appropriate API to handle this kind of tasks? If so, what do you
> suggest I could look into?
>
> Thank you for taking the time to read that overly long message ;-)
>
> Jeroen
>
>