You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> on 2020/05/09 02:38:15 UTC
Write to database directly by referencing schema registry, no jdbc sink connector
Using debezium to parse binlog, using avro serialization and send to kafka.
Need to consume the avro serialized binlog data and wirite to target database
I want to use self-written java code instead of kafka jdbc sink connector.
How can i reference the schema registry, convert a kafka message to corresponding table record and write to corresponding table?
Is there any example code to do this ?
Thanks,
Lei
wanglei2@geekplus.com.cn
Re: Re: Write to database directly by referencing schema registry, no jdbc sink connector
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Hi robin,
Seems i didn't make it clear.
Actually we still use jdbc sink connector.
But we want to use the JDBC Sink function in our own distributed platform intead of kafka connector
I want to consolidate the code here: https://github.com/confluentinc/kafka-connect-jdbc/
Receive kafka avro record, add it to JDBCSinkTask, the task will automatically generate the sql and execute it according to the schema registry.
Seems i can do it like this.
But i am not able to transform GenericRecord to SinkRecord.
JdbcSinkTask task = new JdbcSinkTask();
task.start(props2);
consumer.subscribe(Collections.singletonList("orders"));
while (true) {
final ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (final ConsumerRecord<String, GenericRecord> record : records) {
final String key = record.key();
final GenericRecord value = record.value();
change the GenericRecord to sinkRecord
task.put(Arrays.asList(sinkRecord));
}
}
Thanks,
Lei
wanglei2@geekplus.com.cn
From: Robin Moffatt
Date: 2020-05-11 16:40
To: users
Subject: Re: Write to database directly by referencing schema registry, no jdbc sink connector
> wirite to target database. I want to use self-written java code
instead of kafka jdbc sink connector.
Out of interest, why do you want to do this? Why not use the JDBC sink
connector (or a fork of it if you need to amend its functionality)?
--
Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
On Sat, 9 May 2020 at 03:38, wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
>
> Using debezium to parse binlog, using avro serialization and send to kafka.
>
> Need to consume the avro serialized binlog data and wirite to target
> database
> I want to use self-written java code instead of kafka jdbc sink
> connector.
>
> How can i reference the schema registry, convert a kafka message to
> corresponding table record and write to corresponding table?
> Is there any example code to do this ?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>
Re: Write to database directly by referencing schema registry, no
jdbc sink connector
Posted by Robin Moffatt <ro...@confluent.io>.
> wirite to target database. I want to use self-written java code
instead of kafka jdbc sink connector.
Out of interest, why do you want to do this? Why not use the JDBC sink
connector (or a fork of it if you need to amend its functionality)?
--
Robin Moffatt | Senior Developer Advocate | robin@confluent.io | @rmoff
On Sat, 9 May 2020 at 03:38, wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
>
> Using debezium to parse binlog, using avro serialization and send to kafka.
>
> Need to consume the avro serialized binlog data and wirite to target
> database
> I want to use self-written java code instead of kafka jdbc sink
> connector.
>
> How can i reference the schema registry, convert a kafka message to
> corresponding table record and write to corresponding table?
> Is there any example code to do this ?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>
Re: Write to database directly by referencing schema registry, no
jdbc sink connector
Posted by Chris Toomey <ct...@gmail.com>.
Write your own implementation of the JDBC sink connector and use the avro
serializer to convert the kafka record into a connect record that your
connector takes and writes to DB via JDBC.
On Fri, May 8, 2020 at 7:38 PM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
>
> Using debezium to parse binlog, using avro serialization and send to kafka.
>
> Need to consume the avro serialized binlog data and wirite to target
> database
> I want to use self-written java code instead of kafka jdbc sink
> connector.
>
> How can i reference the schema registry, convert a kafka message to
> corresponding table record and write to corresponding table?
> Is there any example code to do this ?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>
Re: Re: Write to database directly by referencing schema registry, no jdbc sink connector
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Hi Liam,
I have consumed the avro record using the java code:
for (final ConsumerRecord<String, GenericRecord> record : records) {
final String key = record.key();
final GenericRecord value = record.value();
System.out.println(record.value().getSchema());
System.out.printf("key = %s, value = %s%n", key, value);
}
Next I need to write it to database using the existing kafka jdbc sink connector API:
Seems i need to consolidate the code here: https://github.com/confluentinc/kafka-connect-jdbc/
Just new a JDBCSinkTask, add the record to the JDBCSinkTask, then the task will automatically genterate the sql according the record schema and execute it, no matter what the table is.
But i have no idea how to get it.
Thanks,
Lei
wanglei2@geekplus.com.cn
From: Liam Clarke-Hutchinson
Date: 2020-05-09 18:20
To: users
Subject: Re: Re: Write to database directly by referencing schema registry, no jdbc sink connector
Hi Lei,
This tutorial will introduce you to the Avro consumers.
https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html
In terms of going from Avro record to SQL, the JDBC sink generates SQL
based on the field names in the schema, and configured table names.
IIRC, the Avro consumer returns an Avro GenericRecord.Record[1], which has
a getSchema() method that returns the schema used to deserialise it, so
you could access that to generate the SQL.
[1]:
https://avro.apache.org/docs/current/api/java/org/apache/avro/generic/GenericData.Record.html
Good luck,
Liam Clarke-Hutchinson
On Sat, 9 May 2020, 10:03 pm wanglei2@geekplus.com.cn, <
wanglei2@geekplus.com.cn> wrote:
>
> Thanks Liam,
>
> I want to achive the following function using java code:
>
> For each avro serialized record received:
> 1 deserialized the record automatically by referencing schema
> registry
> 2 change the record to a sql statement needed to be executed and
> execute it
>
> Seems the kafka jdbc sink connector (
> https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html)
> can achieve this function.
>
> But i have no idea how to write with java code.
> Is there any code example to achieve this?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>
> From: Liam Clarke-Hutchinson
> Date: 2020-05-09 16:30
> To: users
> Subject: Re: Write to database directly by referencing schema registry, no
> jdbc sink connector
> Hi Lei,
>
> You could use the Kafka Avro consumer to deserialise records using the
> Schema Registry automatically.
>
> Then write to the DB as you see fit.
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
> On Sat, 9 May 2020, 2:38 pm wanglei2@geekplus.com.cn, <
> wanglei2@geekplus.com.cn> wrote:
>
> >
> > Using debezium to parse binlog, using avro serialization and send to
> kafka.
> >
> > Need to consume the avro serialized binlog data and wirite to target
> > database
> > I want to use self-written java code instead of kafka jdbc sink
> > connector.
> >
> > How can i reference the schema registry, convert a kafka message to
> > corresponding table record and write to corresponding table?
> > Is there any example code to do this ?
> >
> > Thanks,
> > Lei
> >
> >
> >
> > wanglei2@geekplus.com.cn
> >
> >
>
Re: Re: Write to database directly by referencing schema registry, no
jdbc sink connector
Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi Lei,
This tutorial will introduce you to the Avro consumers.
https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html
In terms of going from Avro record to SQL, the JDBC sink generates SQL
based on the field names in the schema, and configured table names.
IIRC, the Avro consumer returns an Avro GenericRecord.Record[1], which has
a getSchema() method that returns the schema used to deserialise it, so
you could access that to generate the SQL.
[1]:
https://avro.apache.org/docs/current/api/java/org/apache/avro/generic/GenericData.Record.html
Good luck,
Liam Clarke-Hutchinson
On Sat, 9 May 2020, 10:03 pm wanglei2@geekplus.com.cn, <
wanglei2@geekplus.com.cn> wrote:
>
> Thanks Liam,
>
> I want to achive the following function using java code:
>
> For each avro serialized record received:
> 1 deserialized the record automatically by referencing schema
> registry
> 2 change the record to a sql statement needed to be executed and
> execute it
>
> Seems the kafka jdbc sink connector (
> https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html)
> can achieve this function.
>
> But i have no idea how to write with java code.
> Is there any code example to achieve this?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>
> From: Liam Clarke-Hutchinson
> Date: 2020-05-09 16:30
> To: users
> Subject: Re: Write to database directly by referencing schema registry, no
> jdbc sink connector
> Hi Lei,
>
> You could use the Kafka Avro consumer to deserialise records using the
> Schema Registry automatically.
>
> Then write to the DB as you see fit.
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
> On Sat, 9 May 2020, 2:38 pm wanglei2@geekplus.com.cn, <
> wanglei2@geekplus.com.cn> wrote:
>
> >
> > Using debezium to parse binlog, using avro serialization and send to
> kafka.
> >
> > Need to consume the avro serialized binlog data and wirite to target
> > database
> > I want to use self-written java code instead of kafka jdbc sink
> > connector.
> >
> > How can i reference the schema registry, convert a kafka message to
> > corresponding table record and write to corresponding table?
> > Is there any example code to do this ?
> >
> > Thanks,
> > Lei
> >
> >
> >
> > wanglei2@geekplus.com.cn
> >
> >
>
Re: Re: Write to database directly by referencing schema registry, no jdbc sink connector
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks Liam,
I want to achive the following function using java code:
For each avro serialized record received:
1 deserialized the record automatically by referencing schema registry
2 change the record to a sql statement needed to be executed and execute it
Seems the kafka jdbc sink connector (https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html) can achieve this function.
But i have no idea how to write with java code.
Is there any code example to achieve this?
Thanks,
Lei
wanglei2@geekplus.com.cn
From: Liam Clarke-Hutchinson
Date: 2020-05-09 16:30
To: users
Subject: Re: Write to database directly by referencing schema registry, no jdbc sink connector
Hi Lei,
You could use the Kafka Avro consumer to deserialise records using the
Schema Registry automatically.
Then write to the DB as you see fit.
Cheers,
Liam Clarke-Hutchinson
On Sat, 9 May 2020, 2:38 pm wanglei2@geekplus.com.cn, <
wanglei2@geekplus.com.cn> wrote:
>
> Using debezium to parse binlog, using avro serialization and send to kafka.
>
> Need to consume the avro serialized binlog data and wirite to target
> database
> I want to use self-written java code instead of kafka jdbc sink
> connector.
>
> How can i reference the schema registry, convert a kafka message to
> corresponding table record and write to corresponding table?
> Is there any example code to do this ?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>
Re: Write to database directly by referencing schema registry, no
jdbc sink connector
Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi Lei,
You could use the Kafka Avro consumer to deserialise records using the
Schema Registry automatically.
Then write to the DB as you see fit.
Cheers,
Liam Clarke-Hutchinson
On Sat, 9 May 2020, 2:38 pm wanglei2@geekplus.com.cn, <
wanglei2@geekplus.com.cn> wrote:
>
> Using debezium to parse binlog, using avro serialization and send to kafka.
>
> Need to consume the avro serialized binlog data and wirite to target
> database
> I want to use self-written java code instead of kafka jdbc sink
> connector.
>
> How can i reference the schema registry, convert a kafka message to
> corresponding table record and write to corresponding table?
> Is there any example code to do this ?
>
> Thanks,
> Lei
>
>
>
> wanglei2@geekplus.com.cn
>
>