You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Fruzsina Nagy <fr...@cloudera.com.INVALID> on 2023/02/06 10:34:50 UTC

RE: Re: Confluent Avro and Debezium formats - default schema name can be incompatible with registered schema name

Hi Dawid,
Thanks for the suggestion, it’s worth a try. I’ll have a look at it.
I assume this ’schema’ option would not be required and the current logic with the default name ‘record’ would be used, if the schema is not provided explicitly.
Best regards,
Fruzsina

On 2023/01/27 13:14:39 Dawid Wysakowicz wrote:
> Hi Fruzsina,
> 
> I think this is a valid issue we should try to solve. A different 
> approach I am thinking about is that we could actually add an option to 
> provide an entire avro schema to use. Something like: 
> `avro-confluent.schema` which we would validate it maps properly to the 
> schema of the table (that is names of fields and their types match) and 
> use it instead of the generated one.
> 
> What do you think about that approach?
> 
> Best,
> 
> Dawid
> 
> On 26/01/2023 11:29, Fruzsina Nagy wrote:
> > Hi everyone,
> >
> > I have come across the below issue, while experimenting with the Confluent registry and avro-confluent, debezium-avro-confluent formats. Please let me know your thoughts on it. Should this issue be addressed?
> >
> > Thanks in advance,
> > Fruzsina
> > The use case
> >
> > Create a new topic on Confluent Cloud
> > Create a value schema with the name “sampleRecord”:
> > {
> >    "type": "record",
> >    "namespace": "com.mycorp.mynamespace",
> >    "name": "sampleRecord",
> > …}
> > Create table with “avro-confluent” format:
> > CREATE TABLE `newtesttopic` (
> >       `my_field1` INT NOT NULL,
> >       `my_field2` DOUBLE NOT NULL,
> >       `my_field3` VARCHAR(2147483647) NOT NULL,
> >       ") WITH (
> >       'connector' = 'kafka',
> >       'topic' = 'newtesttopic',
> >       'scan.startup.mode' = 'latest-offset',
> >       'properties.bootstrap.servers' = 'bootstrapServers',
> >       'properties.sasl.jaas.config' = 'saslJaasConfig',
> >       'properties.sasl.mechanism' = 'PLAIN',
> >       'properties.security.protocol' = 'SASL_SSL',
> >       'format' = 'avro-confluent',
> >       'avro-confluent.url' = 'confluentSchemaRegUrl',
> >       'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
> >       'avro-confluent.basic-auth.user-info' = 'user:pw')
> >
> > Insert data into the “newtesttopic”
> > The following error is thrown:
> > io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "newtesttopic-value", details: [Incompatibility{type:NAME_MISMATCH, location:/name, message:expected: com.mycorp.mynamespace.sampleRecord, reader:{"type":"record","name":"record",...}, writer:{"type":"record","name":"sampleRecord",...}
> > This error of course can be avoided if we don’t register a schema for our topic on the Confluent Cloud site before inserting data into the kafka table, and we just let Flink register it for us with the name “record”.
> >
> > The cause of the error
> >
> > I found that the error is caused by the EncodingFormat<SerializationSchema<RowData>> created by RegistryAvroFormatFactory.createEncodingFormat, because when creating a AvroRowDataSerializationSchema, it uses AvroSchemaConverter.convertToSchema(LogicalType schema) <https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java#L100>
> > which names the schema “record” <https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L306> by default.
> >
> > But the registered schema is named “sampleRecord” in the above example, so the Confluent Schema Registry client doesn’t accept it.
> > The problem
> >
> > To resolve this I added a new option “schema-name” to “avro-confluent” and “debezium-avro-confluent” formats. And as I was testing the “debezium-avro-confluent” format, it turned out that this solution doesn’t solve the problem in those cases when there are named schemas (record, enum, fixed types) nested in the schema of the topic.
> >
> > For example:
> > In case of “debezium-avro-confluent” the schema created is a union of null and a Debezium specific record schema (before, after, op). If I use the above option to provide a specific name for the schema, I get an org.apache.avro.UnresolvedUnionException, because AvroRowDataSerializationSchema <https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java#L79> converts the RowType to a record schema with the name “record”, which will not be found in the union, if the the Debezium specific record has a different name.
> > Union type is problematic because in the general case, if we define a union schema [schema1, schema2]meaning that the schema is either schema1 or schema2, we must determine somehow which schema we are converting the RowType to.
> >
> > In case of nested named schemas, Flink creates a name based on the record name and the field name <https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L400>. Schema registry client will also throw an error in this case, if the registered names don’t match.
> >
> > Possible solutions
> >
> > Look up names of the schemas in the field comment, e.g. if there is a field of type ROW<field1 String, field2 String> with a comment “avro-name = recordname”, we can use this name when converting the LogicalType to avro schema.
> > there could be a schema-name option for the schema of the topic / table or
> > the name of the topic / table schema could be defined in the table comment
> > Use further table options to define the schema names, e.g.:
> > ‘avro-confluent.schema-name.record.nested_record’ = ‘nested_record_name’ (where record and nested_record are field names)
> > in this case the schema-name option is suffixed with the path to the named schema
> >
> >
> 

Re: Re: Confluent Avro and Debezium formats - default schema name can be incompatible with registered schema name

Posted by Fruzsina Nagy <fr...@cloudera.com.INVALID>.
Hello Dawid,
I created a pull request based on your suggestion.
https://github.com/apache/flink/pull/21942
I'd appreciate it if you could have a look at it.
Thanks in advance.
Best regards,
Fruzsina

On Mon, Feb 6, 2023 at 11:34 AM Fruzsina Nagy <fr...@cloudera.com> wrote:

> Hi Dawid,
> Thanks for the suggestion, it’s worth a try. I’ll have a look at it.
> I assume this ’schema’ option would not be required and the current logic
> with the default name ‘record’ would be used, if the schema is not provided
> explicitly.
> Best regards,
> Fruzsina
>
> On 2023/01/27 13:14:39 Dawid Wysakowicz wrote:
> > Hi Fruzsina,
> >
> > I think this is a valid issue we should try to solve. A different
> > approach I am thinking about is that we could actually add an option to
> > provide an entire avro schema to use. Something like:
> > `avro-confluent.schema` which we would validate it maps properly to the
> > schema of the table (that is names of fields and their types match) and
> > use it instead of the generated one.
> >
> > What do you think about that approach?
> >
> > Best,
> >
> > Dawid
> >
> > On 26/01/2023 11:29, Fruzsina Nagy wrote:
> > > Hi everyone,
> > >
> > > I have come across the below issue, while experimenting with the
> Confluent registry and avro-confluent, debezium-avro-confluent formats.
> Please let me know your thoughts on it. Should this issue be addressed?
> > >
> > > Thanks in advance,
> > > Fruzsina
> > > The use case
> > >
> > > Create a new topic on Confluent Cloud
> > > Create a value schema with the name “sampleRecord”:
> > > {
> > >    "type": "record",
> > >    "namespace": "com.mycorp.mynamespace",
> > >    "name": "sampleRecord",
> > > …}
> > > Create table with “avro-confluent” format:
> > > CREATE TABLE `newtesttopic` (
> > >       `my_field1` INT NOT NULL,
> > >       `my_field2` DOUBLE NOT NULL,
> > >       `my_field3` VARCHAR(2147483647) NOT NULL,
> > >       ") WITH (
> > >       'connector' = 'kafka',
> > >       'topic' = 'newtesttopic',
> > >       'scan.startup.mode' = 'latest-offset',
> > >       'properties.bootstrap.servers' = 'bootstrapServers',
> > >       'properties.sasl.jaas.config' = 'saslJaasConfig',
> > >       'properties.sasl.mechanism' = 'PLAIN',
> > >       'properties.security.protocol' = 'SASL_SSL',
> > >       'format' = 'avro-confluent',
> > >       'avro-confluent.url' = 'confluentSchemaRegUrl',
> > >       'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
> > >       'avro-confluent.basic-auth.user-info' = 'user:pw')
> > >
> > > Insert data into the “newtesttopic”
> > > The following error is thrown:
> > >
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
> Schema being registered is incompatible with an earlier schema for subject
> "newtesttopic-value", details: [Incompatibility{type:NAME_MISMATCH,
> location:/name, message:expected: com.mycorp.mynamespace.sampleRecord,
> reader:{"type":"record","name":"record",...},
> writer:{"type":"record","name":"sampleRecord",...}
> > > This error of course can be avoided if we don’t register a schema for
> our topic on the Confluent Cloud site before inserting data into the kafka
> table, and we just let Flink register it for us with the name “record”.
> > >
> > > The cause of the error
> > >
> > > I found that the error is caused by the
> EncodingFormat<SerializationSchema<RowData>> created by
> RegistryAvroFormatFactory.createEncodingFormat, because when creating a
> AvroRowDataSerializationSchema, it uses
> AvroSchemaConverter.convertToSchema(LogicalType schema) <
> https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java#L100
> >
> > > which names the schema “record” <
> https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L306>
> by default.
> > >
> > > But the registered schema is named “sampleRecord” in the above
> example, so the Confluent Schema Registry client doesn’t accept it.
> > > The problem
> > >
> > > To resolve this I added a new option “schema-name” to “avro-confluent”
> and “debezium-avro-confluent” formats. And as I was testing the
> “debezium-avro-confluent” format, it turned out that this solution doesn’t
> solve the problem in those cases when there are named schemas (record,
> enum, fixed types) nested in the schema of the topic.
> > >
> > > For example:
> > > In case of “debezium-avro-confluent” the schema created is a union of
> null and a Debezium specific record schema (before, after, op). If I use
> the above option to provide a specific name for the schema, I get an
> org.apache.avro.UnresolvedUnionException, because
> AvroRowDataSerializationSchema <
> https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java#L79>
> converts the RowType to a record schema with the name “record”, which will
> not be found in the union, if the the Debezium specific record has a
> different name.
> > > Union type is problematic because in the general case, if we define a
> union schema [schema1, schema2]meaning that the schema is either schema1 or
> schema2, we must determine somehow which schema we are converting the
> RowType to.
> > >
> > > In case of nested named schemas, Flink creates a name based on the
> record name and the field name <
> https://github.com/apache/flink/blob/359217381a3a5e48133614407445435c3bc97622/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L400>.
> Schema registry client will also throw an error in this case, if the
> registered names don’t match.
> > >
> > > Possible solutions
> > >
> > > Look up names of the schemas in the field comment, e.g. if there is a
> field of type ROW<field1 String, field2 String> with a comment “avro-name =
> recordname”, we can use this name when converting the LogicalType to avro
> schema.
> > > there could be a schema-name option for the schema of the topic /
> table or
> > > the name of the topic / table schema could be defined in the table
> comment
> > > Use further table options to define the schema names, e.g.:
> > > ‘avro-confluent.schema-name.record.nested_record’ =
> ‘nested_record_name’ (where record and nested_record are field names)
> > > in this case the schema-name option is suffixed with the path to the
> named schema
> > >
> > >
> >