You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Juan Romero <js...@gmail.com> on 2023/04/19 20:08:08 UTC

Can I batch data when i use JDBC write operation?

Hi community.

On this occasion I have a doubt regarding how to read a stream from kafka
and write batches of data with the jdbc connector. The idea is to override
a specific row if the current row we want to insert into has the same id
and the load_date_time is greater. The conceptual pipeline look like this
and it is working (Take in mind that the source will be a streaming from
kafka):

ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str,
load_date_time=str)


with beam.Pipeline() as p:
  _ = (
      p
      | beam.Create(
        [

            ExampleRow(1, 'zzzz', '2023-04-05 12:34:56'),
            ExampleRow(1, 'yyyz', '2023-04-05 12:34:55')
        ]).with_output_types(ExampleRow)
      | 'Write to jdbc' >> WriteToJdbc(
          driver_class_name='org.postgresql.Driver',
          jdbc_url='jdbc:postgresql://localhost:5432/postgres',
          username='postgres',
          password='postgres',
          table_name= 'test',
          connection_properties="stringtype=unspecified",
          statement= 'INSERT INTO test \
                        VALUES(?,?,?) \
                      ON CONFLICT (id)\
                        DO UPDATE SET name = EXCLUDED.name,
load_date_time = EXCLUDED.load_date_time\
                      WHERE EXCLUDED.load_date_time::timestamp >
test.load_date_time::timestamp',
      ))

My question is if I want to write a stream that comes from kafka how can
how can avoid the jdbc connector inserting the register one by one
statement and rather insert the data in based time batches. Probably
internally jdbc has some kind of "intelligence for do this" but i want to
know what do you think about it  .

Thank you!

Re: Can I batch data when i use JDBC write operation?

Posted by Wiśniowski Piotr <co...@gmail.com>.
Hi,

Not tested, but few options that might be a solutions for You problem:

1. go with having read and write replicas of Your DB - so that write 
replica would get inserts one by one and live with this. Make sure to 
deduplicate the data before insert to avoid potential collisions (this 
should not be a problem, but I am not sure how the subsystem would behave)

2.

- Add a step to group the input data into a time window

- then ingest the events from a window to a unique temp table (just by 
plain `INSERT INTO`)

- then add next step in pipeline to trigger merge operation from tmp 
table to You production table. Make sure same connection session is used 
or the tmp table will be gone. Also not sure if it is possible to invoke 
only one command per window by using `WriteToJdbc` operator after 
previous write finishes. But this is up for your experimentation/ anyone 
with more experience has some knowledge how to code it?

3. Another option is to

- aggregate events into a window - so that only one element would be 
emited a window (array of events)

- try somehow to upt this record in statement as a single row

- in subsequent CTEs deserialize the array into multiple rows

- do insert with update.

So the sql on the DB side would look something like:

```

|WITH new_values (arr) as ( values (?) ), deser AS ( SELECT explode(arr) 
FROM new_values ), ||upsert as ( update mytable m set field1 = nv.field1, field2 = nv.field2 
FROM |||deser| nv WHERE m.id = nv.id RETURNING m.* ) INSERT INTO mytable (id, 
field1, field2) SELECT id, field1, field2 FROM |||deser| WHERE NOT EXISTS (SELECT 1 FROM upsert up WHERE up.id = 
new_values.id)|

```

Above is just pseudocode that I did not test, but it could be a hint for 
You. Also great answer on this one here: 
https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/8702291#8702291

4. some mix of p.2 and p.3

Hopefully this helps You in breaking the problem.

Best regards

Wiśniowski Piotr

On 21.04.2023 01:34, Juan Romero wrote:
> Hi. Can someone help me with this?
>
> El mié, 19 abr 2023 a las 15:08, Juan Romero (<js...@gmail.com>) 
> escribió:
>
>     Hi community.
>
>     On this occasion I have a doubt regarding how to read a stream
>     from kafka and write batches of data with the jdbc connector. The
>     idea is to override a specific row if the current row we want to
>     insert into has the same id and the load_date_time is greater. The
>     conceptual pipeline look like this and it is working (Take in mind
>     that the source will be a streaming from kafka):
>
>     ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str, load_date_time=str)
>     with beam.Pipeline()as p:
>        _ = (
>            p
>            | beam.Create(
>              [
>
>                  ExampleRow(1, 'zzzz', '2023-04-05 12:34:56'), ExampleRow(1, 'yyyz', '2023-04-05 12:34:55')
>              ]).with_output_types(ExampleRow)
>            |'Write to jdbc' >> WriteToJdbc(
>                driver_class_name='org.postgresql.Driver', jdbc_url='jdbc:postgresql://localhost:5432/postgres', username='postgres', password='postgres', table_name='test', connection_properties="stringtype=unspecified", statement='INSERT INTO test \ VALUES(?,?,?) \ ON CONFLICT (id)\ DO UPDATE
>     SET name = EXCLUDED.name, load_date_time =
>     EXCLUDED.load_date_time\ WHERE EXCLUDED.load_date_time::timestamp
>     > test.load_date_time::timestamp', ))
>
>     My question is if I want to write a stream that comes from kafka
>     how can how can avoid the jdbc connector inserting the register
>     one by one statement and rather insert the data in based time
>     batches. Probably internally jdbc has some kind of "intelligence
>     for do this" but i want to know what do you think about it  .
>
>     Thank you!
>

Re: Can I batch data when i use JDBC write operation?

Posted by Juan Romero <js...@gmail.com>.
Hi. Can someone help me with this?

El mié, 19 abr 2023 a las 15:08, Juan Romero (<js...@gmail.com>) escribió:

> Hi community.
>
> On this occasion I have a doubt regarding how to read a stream from kafka
> and write batches of data with the jdbc connector. The idea is to override
> a specific row if the current row we want to insert into has the same id
> and the load_date_time is greater. The conceptual pipeline look like this
> and it is working (Take in mind that the source will be a streaming from
> kafka):
>
> ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str, load_date_time=str)
>
>
> with beam.Pipeline() as p:
>   _ = (
>       p
>       | beam.Create(
>         [
>
>             ExampleRow(1, 'zzzz', '2023-04-05 12:34:56'),
>             ExampleRow(1, 'yyyz', '2023-04-05 12:34:55')
>         ]).with_output_types(ExampleRow)
>       | 'Write to jdbc' >> WriteToJdbc(
>           driver_class_name='org.postgresql.Driver',
>           jdbc_url='jdbc:postgresql://localhost:5432/postgres',
>           username='postgres',
>           password='postgres',
>           table_name= 'test',
>           connection_properties="stringtype=unspecified",
>           statement= 'INSERT INTO test \
>                         VALUES(?,?,?) \
>                       ON CONFLICT (id)\
>                         DO UPDATE SET name = EXCLUDED.name, load_date_time = EXCLUDED.load_date_time\
>                       WHERE EXCLUDED.load_date_time::timestamp > test.load_date_time::timestamp',
>       ))
>
> My question is if I want to write a stream that comes from kafka how can
> how can avoid the jdbc connector inserting the register one by one
> statement and rather insert the data in based time batches. Probably
> internally jdbc has some kind of "intelligence for do this" but i want to
> know what do you think about it  .
>
> Thank you!
>