You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Greg Kopff <gr...@q10stats.com> on 2022/10/12 00:33:38 UTC

Efficiently updating running sums only on new data

I'm new to Spark and would like to seek some advice on how to approach a problem.

I have a large dataset that has dated observations. There are also columns that are running sums of some of other columns.

   date     | thing |   foo   |   bar   |  foo_sum  |  bar_sum  |
============+=======+=========+=========+===========+===========+
 2020-01-01 |  101  |      1  |       3 |        1  |        3  |
 2020-01-01 |  202  |      0  |       2 |        0  |        2  |
 2020-01-01 |  303  |      1  |       1 |        1  |        1  |
------------+-------+---------+---------+-----------+-----------+
 2020-01-02 |  101  |      1  |       2 |        2  |        5  |
 2020-01-02 |  202  |      0  |       0 |        0  |        2  |
 2020-01-02 |  303  |      4  |       1 |        5  |        2  |
 2020-01-02 |  404  |      2  |       2 |        2  |        2  |
------------+-------+---------+---------+-----------+-----------+
Currently I generate the running sums using a WindowSpec:

final WindowSpec w =
    Window.partitionBy(col("thing"))
        .orderBy(col("date"), col("thing"))
        .rowsBetween(Window.unboundedPreceding(), Window.currentRow());

return df
    .withColumn(col("foo_sum"), sum("foo").over(w))
    .withColumn(col("bar_sum"), sum("bar").over(w));
Once these extra sum columns are computed, they are written back to storage.

Periodically this dataset is appended to with new observations. These new observations are all chronologically later than any of the previous observations.

I need to "continue" the previous running sums for the new observations -- but I want to avoid having to recompute the running sums completely from scratch.

   date     | thing |   foo   |   bar   |  foo_sum  |  bar_sum  |
============+=======+=========+=========+===========+===========+
 2020-01-01 |  101  |      1  |       3 |        1  |        3  |
 2020-01-01 |  202  |      0  |       2 |        0  |        2  |
 2020-01-01 |  303  |      1  |       1 |        1  |        1  |
------------+-------+---------+---------+-----------+-----------+
 2020-01-02 |  101  |      1  |       2 |        2  |        5  |
 2020-01-02 |  202  |      0  |       0 |        0  |        2  |
 2020-01-02 |  303  |      4  |       1 |        5  |        2  |
 2020-01-02 |  404  |      2  |       2 |        2  |        2  |
------------+-------+---------+---------+-----------+-----------+   new data
 2020-01-03 |  101  |      2  |       2 |        .  |        .  |
 2020-01-03 |  303  |      1  |       1 |        .  |        .  |
 2020-01-02 |  404  |      2  |       1 |        .  |        .  |
I would appreciate it if anyone had any pointers about how to approach this sort of problem that they could share.

Kind regards,

—
Greg

Re: Efficiently updating running sums only on new data

Posted by Igor Calabria <ig...@gmail.com>.
You can tag the last entry with each key using the same window you're using
for your rolling sum. Something like this: "LEAD(1) OVER your_window IS
NULL as last_record". Then, you just UNION ALL the last entry of each
key(which you tagged) with the new data and run the same query over the
windowed data.
Before appending the data, you'll have to drop the previously last_record
as it will be duplicated. But that's pretty easy, just add a constant to
query which marks old and new data. The tricky part of all this is making
appends atomic if you're not using a table format(delta, iceberg or hudi).
If you're not using a table format,
my suggestion is to at least make the query idempotent. One of the ways you
can achieve this, is having a unique partition for each new batch and use
dynamic overwrite. That way, you won't end up with duplicate data if the
query runs twice and you can always just run the query again if you end up
with a partial failure.

On a side node, it's a lot easier to just process everything again after
every batch. Only try the incremental approach if you really need it.

On Thu, Oct 13, 2022 at 5:56 AM Artemis User <ar...@dtechspace.com> wrote:

> Do you have to use SQL/window function for this? If I understand this
> correctly, you could just keep track of the last record of each "thing",
> then calculate the new sum by adding the current value of "thing" to the
> sum of last record when a new record is generated.  Looks like your problem
> will get much easier if thinking outside the SQL box or maybe outside the
> Spark box...
>
> On 10/11/22 8:33 PM, Greg Kopff wrote:
>
> I'm new to Spark and would like to seek some advice on how to approach a
> problem.
>
> I have a large dataset that has dated observations. There are also columns
> that are running sums of some of other columns.
>
>    date     | thing |   foo   |   bar   |  foo_sum  |  bar_sum  |
> ============+=======+=========+=========+===========+===========+
>  2020-01-01 |  101  |      1  |       3 |        1  |        3  |
>  2020-01-01 |  202  |      0  |       2 |        0  |        2  |
>  2020-01-01 |  303  |      1  |       1 |        1  |        1  |
> ------------+-------+---------+---------+-----------+-----------+
>  2020-01-02 |  101  |      1  |       2 |        2  |        5  |
>  2020-01-02 |  202  |      0  |       0 |        0  |        2  |
>  2020-01-02 |  303  |      4  |       1 |        5  |        2  |
>  2020-01-02 |  404  |      2  |       2 |        2  |        2  |
> ------------+-------+---------+---------+-----------+-----------+
>
> Currently I generate the running sums using a WindowSpec:
>
> final WindowSpec w =
>     Window.partitionBy(col("thing"))
>         .orderBy(col("date"), col("thing"))
>         .rowsBetween(Window.unboundedPreceding(), Window.currentRow());
>
> return df
>     .withColumn(col("foo_sum"), sum("foo").over(w))
>     .withColumn(col("bar_sum"), sum("bar").over(w));
>
> Once these extra sum columns are computed, they are written back to
> storage.
>
> Periodically this dataset is appended to with new observations. These new
> observations are all chronologically later than any of the previous
> observations.
>
> I need to "continue" the previous running sums for the new observations --
> but I want to avoid having to recompute the running sums completely from
> scratch.
>
>    date     | thing |   foo   |   bar   |  foo_sum  |  bar_sum  |
> ============+=======+=========+=========+===========+===========+
>  2020-01-01 |  101  |      1  |       3 |        1  |        3  |
>  2020-01-01 |  202  |      0  |       2 |        0  |        2  |
>  2020-01-01 |  303  |      1  |       1 |        1  |        1  |
> ------------+-------+---------+---------+-----------+-----------+
>  2020-01-02 |  101  |      1  |       2 |        2  |        5  |
>  2020-01-02 |  202  |      0  |       0 |        0  |        2  |
>  2020-01-02 |  303  |      4  |       1 |        5  |        2  |
>  2020-01-02 |  404  |      2  |       2 |        2  |        2  |
> ------------+-------+---------+---------+-----------+-----------+   new data
>  2020-01-03 |  101  |      2  |       2 |        .  |        .  |
>  2020-01-03 |  303  |      1  |       1 |        .  |        .  |
>  2020-01-02 |  404  |      2  |       1 |        .  |        .  |
>
> I would appreciate it if anyone had any pointers about how to approach
> this sort of problem that they could share.
>
> Kind regards,
>
> —
> Greg
>
>
>

Re: Efficiently updating running sums only on new data

Posted by Artemis User <ar...@dtechspace.com>.
Do you have to use SQL/window function for this? If I understand this 
correctly, you could just keep track of the last record of each "thing", 
then calculate the new sum by adding the current value of "thing" to the 
sum of last record when a new record is generated. Looks like your 
problem will get much easier if thinking outside the SQL box or maybe 
outside the Spark box...

On 10/11/22 8:33 PM, Greg Kopff wrote:
>
> I'm new to Spark and would like to seek some advice on how to approach 
> a problem.
>
> I have a large dataset that has dated observations. There are also 
> columns that are running sums of some of other columns.
>
> |date | thing | foo | bar | foo_sum | bar_sum | 
> ============+=======+=========+=========+===========+===========+ 
> 2020-01-01 | 101 | 1 | 3 | 1 | 3 | 2020-01-01 | 202 | 0 | 2 | 0 | 2 | 
> 2020-01-01 | 303 | 1 | 1 | 1 | 1 | 
> ------------+-------+---------+---------+-----------+-----------+ 
> 2020-01-02 | 101 | 1 | 2 | 2 | 5 | 2020-01-02 | 202 | 0 | 0 | 0 | 2 | 
> 2020-01-02 | 303 | 4 | 1 | 5 | 2 | 2020-01-02 | 404 | 2 | 2 | 2 | 2 | 
> ------------+-------+---------+---------+-----------+-----------+|
>
> Currently I generate the running sums using a |WindowSpec|:
>
> |final WindowSpec w = Window.partitionBy(col("thing")) 
> .orderBy(col("date"), col("thing")) 
> .rowsBetween(Window.unboundedPreceding(), Window.currentRow()); return 
> df .withColumn(col("foo_sum"), sum("foo").over(w)) 
> .withColumn(col("bar_sum"), sum("bar").over(w));|
>
> Once these extra sum columns are computed, they are written back to 
> storage.
>
> Periodically this dataset is appended to with new observations. These 
> new observations are all chronologically later than any of the 
> previous observations.
>
> I need to "continue" the previous running sums for the new 
> observations -- but I want to avoid having to recompute the running 
> sums completely from scratch.
>
> |date | thing | foo | bar | foo_sum | bar_sum | 
> ============+=======+=========+=========+===========+===========+ 
> 2020-01-01 | 101 | 1 | 3 | 1 | 3 | 2020-01-01 | 202 | 0 | 2 | 0 | 2 | 
> 2020-01-01 | 303 | 1 | 1 | 1 | 1 | 
> ------------+-------+---------+---------+-----------+-----------+ 
> 2020-01-02 | 101 | 1 | 2 | 2 | 5 | 2020-01-02 | 202 | 0 | 0 | 0 | 2 | 
> 2020-01-02 | 303 | 4 | 1 | 5 | 2 | 2020-01-02 | 404 | 2 | 2 | 2 | 2 | 
> ------------+-------+---------+---------+-----------+-----------+ new 
> data 2020-01-03 | 101 | 2 | 2 | . | . | 2020-01-03 | 303 | 1 | 1 | . | 
> . | 2020-01-02 | 404 | 2 | 1 | . | . ||
>
> I would appreciate it if anyone had any pointers about how to approach 
> this sort of problem that they could share.
>
>
> Kind regards,
>
> —
> Greg