You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> on 2020/05/09 02:38:15 UTC

Write to database directly by referencing schema registry, no jdbc sink connector

Using debezium to parse binlog, using avro serialization and send to kafka.

Need to consume the avro serialized  binlog data and  wirite  to target database
I want to use self-written  java code instead of kafka jdbc sink connector. 

How can i  reference the schema registry, convert a kafka message to corresponding table record and write to corresponding table?
Is there any example code to do this ?

Thanks,
Lei



wanglei2@geekplus.com.cn


Re: Re: Write to database directly by referencing schema registry, no jdbc sink connector

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Hi robin, 

Seems i didn't make it clear. 

Actually we still use jdbc sink connector. 
But we want to use the JDBC Sink function  in our own distributed  platform intead of kafka connector

I want to consolidate the code here: https://github.com/confluentinc/kafka-connect-jdbc/

Receive kafka avro record,  add it to JDBCSinkTask, the task will automatically generate the sql and execute it according to the schema registry. 

Seems i can do it like this.
But i am not able to transform GenericRecord to SinkRecord. 

    JdbcSinkTask task = new JdbcSinkTask();
    task.start(props2);
    consumer.subscribe(Collections.singletonList("orders"));

    while (true) {
        final ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
        for (final ConsumerRecord<String, GenericRecord> record : records) {
            final String key = record.key();
            final GenericRecord value = record.value();
            change the GenericRecord to sinkRecord
            task.put(Arrays.asList(sinkRecord));
        }
    }

Thanks,
Lei


wanglei2@geekplus.com.cn

 
From: Robin Moffatt
Date: 2020-05-11 16:40
To: users
Subject: Re: Write to database directly by referencing schema registry, no jdbc sink connector
>  wirite  to target database. I want to use self-written  java code
instead of kafka jdbc sink connector.
 
Out of interest, why do you want to do this? Why not use the JDBC sink
connector (or a fork of it if you need to amend its functionality)?
 
 
 
-- 
 
Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
 
 
On Sat, 9 May 2020 at 03:38, wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
 
>
> Using debezium to parse binlog, using avro serialization and send to kafka.
>
> Need to consume the avro serialized  binlog data and  wirite  to target
> database
> I want to use self-written  java code instead of kafka jdbc sink
> connector.
>
> How can i  reference the schema registry, convert a kafka message to
> corresponding table record and write to corresponding table?
> Is there any example code to do this ?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>

Re: Write to database directly by referencing schema registry, no jdbc sink connector

Posted by Robin Moffatt <ro...@confluent.io>.
>  wirite  to target database. I want to use self-written  java code
instead of kafka jdbc sink connector.

Out of interest, why do you want to do this? Why not use the JDBC sink
connector (or a fork of it if you need to amend its functionality)?



-- 

Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff


On Sat, 9 May 2020 at 03:38, wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

>
> Using debezium to parse binlog, using avro serialization and send to kafka.
>
> Need to consume the avro serialized  binlog data and  wirite  to target
> database
> I want to use self-written  java code instead of kafka jdbc sink
> connector.
>
> How can i  reference the schema registry, convert a kafka message to
> corresponding table record and write to corresponding table?
> Is there any example code to do this ?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>

Re: Write to database directly by referencing schema registry, no jdbc sink connector

Posted by Chris Toomey <ct...@gmail.com>.
Write your own implementation of the JDBC sink connector and use the avro
serializer to convert the kafka record into a connect record that your
connector takes and writes to DB via JDBC.


On Fri, May 8, 2020 at 7:38 PM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

>
> Using debezium to parse binlog, using avro serialization and send to kafka.
>
> Need to consume the avro serialized  binlog data and  wirite  to target
> database
> I want to use self-written  java code instead of kafka jdbc sink
> connector.
>
> How can i  reference the schema registry, convert a kafka message to
> corresponding table record and write to corresponding table?
> Is there any example code to do this ?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>

Re: Re: Write to database directly by referencing schema registry, no jdbc sink connector

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Hi Liam, 

I have consumed the avro record using the java code:
for (final ConsumerRecord<String, GenericRecord> record : records) {
    final String key = record.key();
    final GenericRecord value = record.value();
    System.out.println(record.value().getSchema());
    System.out.printf("key = %s, value = %s%n", key, value);
}
Next I need to write it to database using the existing kafka jdbc sink connector API: 

Seems i need to consolidate the code here: https://github.com/confluentinc/kafka-connect-jdbc/ 
Just new a JDBCSinkTask,  add the record to the JDBCSinkTask, then the task will automatically  genterate the sql according the record schema and execute it, no matter what the table is.

But i have no idea how  to get it.

Thanks,
Lei





wanglei2@geekplus.com.cn

 
From: Liam Clarke-Hutchinson
Date: 2020-05-09 18:20
To: users
Subject: Re: Re: Write to database directly by referencing schema registry, no jdbc sink connector
Hi Lei,
 
This tutorial will introduce you to the Avro consumers.
https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html
 
In terms of going from Avro record to SQL, the JDBC sink generates SQL
based on the field names in the schema, and configured table names.
 
IIRC, the Avro consumer returns an Avro GenericRecord.Record[1], which has
a getSchema() method that returns the schema used to deserialise it,  so
you could access that to generate the SQL.
 
[1]:
https://avro.apache.org/docs/current/api/java/org/apache/avro/generic/GenericData.Record.html
 
Good luck,
 
Liam Clarke-Hutchinson
 
On Sat, 9 May 2020, 10:03 pm wanglei2@geekplus.com.cn, <
wanglei2@geekplus.com.cn> wrote:
 
>
> Thanks Liam,
>
> I want to achive the following  function using java code:
>
> For each avro serialized record received:
>          1  deserialized the record automatically  by referencing  schema
> registry
>          2  change the record to a sql statement needed to be executed and
> execute it
>
> Seems the kafka jdbc sink connector (
> https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html)
> can achieve this function.
>
> But i have no idea how to write with java code.
> Is there any code example to achieve this?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>
> From: Liam Clarke-Hutchinson
> Date: 2020-05-09 16:30
> To: users
> Subject: Re: Write to database directly by referencing schema registry, no
> jdbc sink connector
> Hi Lei,
>
> You could use the Kafka Avro consumer to deserialise records using the
> Schema Registry automatically.
>
> Then write to the DB as you see fit.
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
> On Sat, 9 May 2020, 2:38 pm wanglei2@geekplus.com.cn, <
> wanglei2@geekplus.com.cn> wrote:
>
> >
> > Using debezium to parse binlog, using avro serialization and send to
> kafka.
> >
> > Need to consume the avro serialized  binlog data and  wirite  to target
> > database
> > I want to use self-written  java code instead of kafka jdbc sink
> > connector.
> >
> > How can i  reference the schema registry, convert a kafka message to
> > corresponding table record and write to corresponding table?
> > Is there any example code to do this ?
> >
> > Thanks,
> > Lei
> >
> >
> >
> > wanglei2@geekplus.com.cn
> >
> >
>

Re: Re: Write to database directly by referencing schema registry, no jdbc sink connector

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi Lei,

This tutorial will introduce you to the Avro consumers.
https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html

In terms of going from Avro record to SQL, the JDBC sink generates SQL
based on the field names in the schema, and configured table names.

IIRC, the Avro consumer returns an Avro GenericRecord.Record[1], which has
a getSchema() method that returns the schema used to deserialise it,  so
you could access that to generate the SQL.

[1]:
https://avro.apache.org/docs/current/api/java/org/apache/avro/generic/GenericData.Record.html

Good luck,

Liam Clarke-Hutchinson

On Sat, 9 May 2020, 10:03 pm wanglei2@geekplus.com.cn, <
wanglei2@geekplus.com.cn> wrote:

>
> Thanks Liam,
>
> I want to achive the following  function using java code:
>
> For each avro serialized record received:
>          1  deserialized the record automatically  by referencing  schema
> registry
>          2  change the record to a sql statement needed to be executed and
> execute it
>
> Seems the kafka jdbc sink connector (
> https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html)
> can achieve this function.
>
> But i have no idea how to write with java code.
> Is there any code example to achieve this?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>
> From: Liam Clarke-Hutchinson
> Date: 2020-05-09 16:30
> To: users
> Subject: Re: Write to database directly by referencing schema registry, no
> jdbc sink connector
> Hi Lei,
>
> You could use the Kafka Avro consumer to deserialise records using the
> Schema Registry automatically.
>
> Then write to the DB as you see fit.
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
> On Sat, 9 May 2020, 2:38 pm wanglei2@geekplus.com.cn, <
> wanglei2@geekplus.com.cn> wrote:
>
> >
> > Using debezium to parse binlog, using avro serialization and send to
> kafka.
> >
> > Need to consume the avro serialized  binlog data and  wirite  to target
> > database
> > I want to use self-written  java code instead of kafka jdbc sink
> > connector.
> >
> > How can i  reference the schema registry, convert a kafka message to
> > corresponding table record and write to corresponding table?
> > Is there any example code to do this ?
> >
> > Thanks,
> > Lei
> >
> >
> >
> > wanglei2@geekplus.com.cn
> >
> >
>

Re: Re: Write to database directly by referencing schema registry, no jdbc sink connector

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks Liam, 

I want to achive the following  function using java code:

For each avro serialized record received:
         1  deserialized the record automatically  by referencing  schema registry
         2  change the record to a sql statement needed to be executed and execute it

Seems the kafka jdbc sink connector (https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html) can achieve this function.

But i have no idea how to write with java code. 
Is there any code example to achieve this?

Thanks,
Lei



wanglei2@geekplus.com.cn 

 
From: Liam Clarke-Hutchinson
Date: 2020-05-09 16:30
To: users
Subject: Re: Write to database directly by referencing schema registry, no jdbc sink connector
Hi Lei,
 
You could use the Kafka Avro consumer to deserialise records using the
Schema Registry automatically.
 
Then write to the DB as you see fit.
 
Cheers,
 
Liam Clarke-Hutchinson
 
On Sat, 9 May 2020, 2:38 pm wanglei2@geekplus.com.cn, <
wanglei2@geekplus.com.cn> wrote:
 
>
> Using debezium to parse binlog, using avro serialization and send to kafka.
>
> Need to consume the avro serialized  binlog data and  wirite  to target
> database
> I want to use self-written  java code instead of kafka jdbc sink
> connector.
>
> How can i  reference the schema registry, convert a kafka message to
> corresponding table record and write to corresponding table?
> Is there any example code to do this ?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>

Re: Write to database directly by referencing schema registry, no jdbc sink connector

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi Lei,

You could use the Kafka Avro consumer to deserialise records using the
Schema Registry automatically.

Then write to the DB as you see fit.

Cheers,

Liam Clarke-Hutchinson

On Sat, 9 May 2020, 2:38 pm wanglei2@geekplus.com.cn, <
wanglei2@geekplus.com.cn> wrote:

>
> Using debezium to parse binlog, using avro serialization and send to kafka.
>
> Need to consume the avro serialized  binlog data and  wirite  to target
> database
> I want to use self-written  java code instead of kafka jdbc sink
> connector.
>
> How can i  reference the schema registry, convert a kafka message to
> corresponding table record and write to corresponding table?
> Is there any example code to do this ?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>