You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Hamed Amini <ha...@messagebird.com> on 2021/08/19 11:04:22 UTC

Spanner update inc

Hi guys,

I am working on an accumulation task with dataflow; I want to consume
events from Pubsub and accumulate and insert them into Spanner. For
inserting part I want to increment the value of the count column in my
table; with DML I can do it as below:

"UPDATE MY_TABLE set COLUMN1 = COLUMN1 + newValue"

but I don't know how can I do that with mutation (or with Beam Spanner
API)! Could you please help me?


Regards
Hamed

Re: Spanner update inc

Posted by Hamed Amini <ha...@messagebird.com>.
Hi Luke,

Thank you for your explanation. It was a greats help for me.

Regards,
Hamed

On Thu, 19 Aug 2021 at 19:35, Luke Cwik <lc...@google.com> wrote:

> This is an example[1] of how to perform a read+write transaction which you
> could place in a DoFn. The issue with such an approach is that if something
> fails after the transaction is committed in Spanner but before the bundle
> is successfully completed then the bundle will be retried and you will
> increment the value multiple times. For some pipelines this is ok but if
> you need to guard against this then you will want to use a stateful DoFn[2]
> that keeps track of what the COLUMN1 value is and use it as the source of
> truth when running the pipeline. Your logic would look something like:
>
> processElement(...) {
>   column1 = readFromStateValue();
>   if (column1 is null) {
>     firstRead = true;
>     column1 = readValueFromSpanner();
>   }
>   column1 = column1 + newValue;
>   writeValueToState(column1);
>   if (firstRead) {
>     // Since this is the first time we read, we don't want to write to
> spanner till the state is durably committed otherwise if the bundle fails
> the next first read will have the wrong value
>     scheduleProcessingTimerToWriteValueToSpanner();
>   } else {
>     writeValueToSpanner(column1);
>   }
> }
>
> Note that the stateful DoFn will run serially for each unique key and
> window which typically means that you want to be using the global window
> and one key per table and or (table, column) your updating.
>
> 1:
> https://cloud.google.com/spanner/docs/modify-mutation-api#updating_rows_in_a_table
> 2: https://beam.apache.org/blog/stateful-processing/
>
> On Thu, Aug 19, 2021 at 4:04 AM Hamed Amini <ha...@messagebird.com>
> wrote:
>
>> Hi guys,
>>
>> I am working on an accumulation task with dataflow; I want to consume
>> events from Pubsub and accumulate and insert them into Spanner. For
>> inserting part I want to increment the value of the count column in my
>> table; with DML I can do it as below:
>>
>> "UPDATE MY_TABLE set COLUMN1 = COLUMN1 + newValue"
>>
>> but I don't know how can I do that with mutation (or with Beam Spanner
>> API)! Could you please help me?
>>
>>
>> Regards
>> Hamed
>>
>

Re: Spanner update inc

Posted by Luke Cwik <lc...@google.com>.
This is an example[1] of how to perform a read+write transaction which you
could place in a DoFn. The issue with such an approach is that if something
fails after the transaction is committed in Spanner but before the bundle
is successfully completed then the bundle will be retried and you will
increment the value multiple times. For some pipelines this is ok but if
you need to guard against this then you will want to use a stateful DoFn[2]
that keeps track of what the COLUMN1 value is and use it as the source of
truth when running the pipeline. Your logic would look something like:

processElement(...) {
  column1 = readFromStateValue();
  if (column1 is null) {
    firstRead = true;
    column1 = readValueFromSpanner();
  }
  column1 = column1 + newValue;
  writeValueToState(column1);
  if (firstRead) {
    // Since this is the first time we read, we don't want to write to
spanner till the state is durably committed otherwise if the bundle fails
the next first read will have the wrong value
    scheduleProcessingTimerToWriteValueToSpanner();
  } else {
    writeValueToSpanner(column1);
  }
}

Note that the stateful DoFn will run serially for each unique key and
window which typically means that you want to be using the global window
and one key per table and or (table, column) your updating.

1:
https://cloud.google.com/spanner/docs/modify-mutation-api#updating_rows_in_a_table
2: https://beam.apache.org/blog/stateful-processing/

On Thu, Aug 19, 2021 at 4:04 AM Hamed Amini <ha...@messagebird.com>
wrote:

> Hi guys,
>
> I am working on an accumulation task with dataflow; I want to consume
> events from Pubsub and accumulate and insert them into Spanner. For
> inserting part I want to increment the value of the count column in my
> table; with DML I can do it as below:
>
> "UPDATE MY_TABLE set COLUMN1 = COLUMN1 + newValue"
>
> but I don't know how can I do that with mutation (or with Beam Spanner
> API)! Could you please help me?
>
>
> Regards
> Hamed
>