You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Schmeier, Jannik" <J....@Fraport.de> on 2023/05/31 10:43:36 UTC

Using pre-registered schemas with avro-confluent-registry format is not possible

Hello,

I'm trying to use the avro-confluent-registry format with the Confluent Cloud Schema Registry in our company.
Our schemas are managed via Terraform and global write access is denied for all Kafka clients in our environments (or at least in production).
Therefore, when using the avro-confluent-registry format I'm getting an error when Flink is trying to serialize a row:

java.lang.RuntimeException: Failed to serialize row.
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36) ~[?:?]
                at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) ~[?:?]
                at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at StreamExecCalc$2221.processElement_0_0(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_split308(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement(Unknown Source) ~[?:?]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.0.jar:1.17.0]
                at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to serialize schema registry.
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: java.io.IOException: Could not register schema in registry
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: User is denied operation Write on Subject: my-topic-key; error code: 40301
                at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42) ~[?:?]
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more


I've inspected the code of the avro-confluent-registry format and it seems like there is now way to disable this behavior. The format will always try to register a schema when serializing a row:

https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85
https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85

Is there a particular reason for this or would you be interested in adding a configuration option to disable this behavior?

Best regards,
Jannik

AW: Using pre-registered schemas with avro-confluent-registry format is not possible

Posted by "Schmeier, Jannik" <J....@Fraport.de>.
Hello Martijn,

thanks for your answer. We've actually tried those properties. Unfortunately, they only apply to the standard KafkaAvroSerializer/-Deserializer classes by Confluent which are not used by Flink and therefore those properties have no effect here.

Best regards,
Jannik


Von: Martijn Visser <ma...@apache.org>
Gesendet: Mittwoch, 31. Mai 2023 17:53
An: Schwalbe Matthias <Ma...@viseca.ch>
Cc: Schmeier, Jannik <J....@Fraport.de>; user@flink.apache.org
Betreff: Re: Using pre-registered schemas with avro-confluent-registry format is not possible

Hi Jannik,

By default, Kafka client applications automatically register new schemas [1]. You should be able to influence that by using properties, e.g. setting:

'properties.auto.register.schemas' = 'false'
'properties.use.latest.version' = 'true'

Best regards,

Martijn

[1] https://docs.confluent.io/platform/current/schema-registry/security/index.html#disabling-auto-schema-registration


On Wed, May 31, 2023 at 1:35 PM Schwalbe Matthias <Ma...@viseca.ch>> wrote:

Hello Jannik,

Some things to consider (I had a similar problem a couple of years before):

  *   The schemaRegistryClient actually caches schema ids, so it will hit the schema registry only once,
  *   The schema registered in schema registry needs to be byte-equal, otherwise schema registry considers it to be a new schema (version)
  *   … to my best knowledge writing an existing schema to the schema registry does not fail because it is actually not written

     *   Could be that this is not entirely true as we had to replace the whole schemaRegistryClient with our own implementation because the existing one could not be reconfigured to accept compressed answers from our r/o proxy

  *   if you manage to fill the cache of your schemaRegistryClient with the exact schema (e.g. by querying it beforehand) you might never run into the trouble

Hope this helps … keep us posted 😊

Thias




From: Schmeier, Jannik <J....@Fraport.de>>
Sent: Wednesday, May 31, 2023 12:44 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Using pre-registered schemas with avro-confluent-registry format is not possible

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

Hello,

I'm trying to use the avro-confluent-registry format with the Confluent Cloud Schema Registry in our company.
Our schemas are managed via Terraform and global write access is denied for all Kafka clients in our environments (or at least in production).
Therefore, when using the avro-confluent-registry format I'm getting an error when Flink is trying to serialize a row:

java.lang.RuntimeException: Failed to serialize row.
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36) ~[?:?]
                at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) ~[?:?]
                at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at StreamExecCalc$2221.processElement_0_0(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_split308(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement(Unknown Source) ~[?:?]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.0.jar:1.17.0]
                at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to serialize schema registry.
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: java.io.IOException: Could not register schema in registry
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: User is denied operation Write on Subject: my-topic-key; error code: 40301
                at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42) ~[?:?]
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more

I've inspected the code of the avro-confluent-registry format and it seems like there is now way to disable this behavior. The format will always try to register a schema when serializing a row:

https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85
https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85
Is there a particular reason for this or would you be interested in adding a configuration option to disable this behavior?

Best regards,
Jannik
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Using pre-registered schemas with avro-confluent-registry format is not possible

Posted by Martijn Visser <ma...@apache.org>.
Hi Jannik,

By default, Kafka client applications automatically register new schemas
[1]. You should be able to influence that by using properties, e.g. setting:

'properties.auto.register.schemas' = 'false'
'properties.use.latest.version' = 'true'

Best regards,

Martijn

[1]
https://docs.confluent.io/platform/current/schema-registry/security/index.html#disabling-auto-schema-registration


On Wed, May 31, 2023 at 1:35 PM Schwalbe Matthias <
Matthias.Schwalbe@viseca.ch> wrote:

>
>
> Hello Jannik,
>
>
>
> Some things to consider (I had a similar problem a couple of years before):
>
>    - The schemaRegistryClient actually caches schema ids, so it will hit
>    the schema registry only once,
>    - The schema registered in schema registry needs to be byte-equal,
>    otherwise schema registry considers it to be a new schema (version)
>    - … to my best knowledge writing an existing schema to the schema
>    registry does not fail because it is actually not written
>       - Could be that this is not entirely true as we had to replace the
>       whole schemaRegistryClient with our own implementation because the existing
>       one could not be reconfigured to accept compressed answers from our r/o
>       proxy
>    - if you manage to fill the cache of your schemaRegistryClient with
>    the exact schema (e.g. by querying it beforehand) you might never run into
>    the trouble
>
>
>
> Hope this helps … keep us posted 😊
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Schmeier, Jannik <J....@Fraport.de>
> *Sent:* Wednesday, May 31, 2023 12:44 PM
> *To:* user@flink.apache.org
> *Subject:* Using pre-registered schemas with avro-confluent-registry
> format is not possible
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hello,
>
>
>
> I'm trying to use the avro-confluent-registry format with the Confluent
> Cloud Schema Registry in our company.
>
> Our schemas are managed via Terraform and global write access is denied
> for all Kafka clients in our environments (or at least in production).
>
> Therefore, when using the avro-confluent-registry format I'm getting an
> error when Flink is trying to serialize a row:
>
> java.lang.RuntimeException: Failed to serialize row.
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36)
> ~[?:?]
>
>                 at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at StreamExecCalc$2221.processElement_0_0(Unknown Source)
> ~[?:?]
>
>                 at
> StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown
> Source) ~[?:?]
>
>                 at
> StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
>
>                 at StreamExecCalc$2221.processElement_split308(Unknown
> Source) ~[?:?]
>
>                 at StreamExecCalc$2221.processElement(Unknown Source)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at java.lang.Thread.run(Unknown Source) ~[?:?]
>
> Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to
> serialize schema registry.
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
> Caused by: java.io.IOException: Could not register schema in registry
>
>                 at
> org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
> Caused by:
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
> User is denied operation Write on Subject: my-topic-key; error code: 40301
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
>
> I've inspected the code of the avro-confluent-registry format and it seems
> like there is now way to disable this behavior. The format will always try
> to register a schema when serializing a row:
>
>
> https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85
>
> https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85
>
> Is there a particular reason for this or would you be interested in adding
> a configuration option to disable this behavior?
>
>
>
> Best regards,
>
> Jannik
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Re: Using pre-registered schemas with avro-confluent-registry format is not possible

Posted by Martijn Visser <ma...@apache.org>.
Hi Jannik,

Can you still share what are the values you're setting for your properties?
From the top of my head, you need to set:

value.avro-confluent.properties.auto.register.schemas=false
value.avro-confluent.properties.use.latest.version=true

Best regards,

Martijn

On Tue, Jul 11, 2023 at 4:43 AM Schmeier, Jannik <J....@fraport.de>
wrote:

> Do you know how they are overriding the method? Are they building their
> own distribution of Flink with their own implementation of that method? I'd
> like to avoid having to build that myself. I'd be interested in a solution
> in the official release.
>
>
>
> *Von:* Meissner, Dylan <dy...@nordstrom.com>
> *Gesendet:* Freitag, 30. Juni 2023 17:26
> *An:* Martijn Visser <ma...@apache.org>; Schmeier, Jannik
> <J....@Fraport.de>
> *Cc:* Schwalbe Matthias <Ma...@viseca.ch>;
> user@flink.apache.org
> *Betreff:* Re: Using pre-registered schemas with avro-confluent-registry
> format is not possible
>
>
>
> This comes up when the Schema Registry has authorization capabilities.
>
>
>
> A producing application may not be authorized to call “register” when
> another principal owns the schema subject, for example in organizations
> with strong schema governance practices. When
> ConfluentSchemaRegistryCoder.writeSchema is invoked, these producers will
> receive HTTP 403 failures.
>
>
>
> In these unfortunate cases, they are overriding writeSchema…
>
>
>
> @Override
>
> public void writeSchema(Schema schema, OutputStream out) throws
> IOException {
>
>     int registeredId = schemaRegistryClient.register(subject, schema);
>
>     out.write(CONFLUENT_MAGIC_BYTE);
>
>     byte[] schemaIdBytes =
> ByteBuffer.allocate(4).putInt(registeredId).array();
>
>     out.write(schemaIdBytes);
>
> }
>
>
>
> …by replacing “register()” with the schema lookup call, “getId()”
>
>
>
> @Override
>
> public void writeSchema(Schema schema, OutputStream out) throws
> IOException {
>
>     int registeredId = schemaRegistryClient.getId(subject, schema);
>
>     out.write(CONFLUENT_MAGIC_BYTE);
>
>     byte[] schemaIdBytes =
> ByteBuffer.allocate(4).putInt(registeredId).array();
>
>     out.write(schemaIdBytes);
>
> }
>
>
>
> Dylan
>
>
>
>
>
> *From: *Martijn Visser <ma...@apache.org>
> *Date: *Monday, June 5, 2023 at 7:45 AM
> *To: *Schmeier, Jannik <J....@fraport.de>
> *Cc: *Schwalbe Matthias <Ma...@viseca.ch>,
> user@flink.apache.org <us...@flink.apache.org>
> *Subject: *Re: Using pre-registered schemas with avro-confluent-registry
> format is not possible
>
> *Caution:* EXTERNAL EMAIL ALERT. This sender is not from your
> organization. If you do not recognize the sender, DO NOT CLICK on links or
> attachments.
>
>
>
> Hi Jannik,
>
>
>
> Can you share how you've set those properties, because I've been able to
> use this without any problems.
>
>
>
> Best regards,
>
>
>
> Martijn
>
>
>
> On Thu, Jun 1, 2023 at 2:43 PM Schmeier, Jannik <J....@fraport.de>
> wrote:
>
> Hello Thias,
>
>
>
> thank you for your answer.
>
>
>
> We've tested registering an existing (byte equal) schema a second time,
> but unfortunately the schema registry does still deny the request.
>
>
>
> Your last suggestion sounds promising, but I think there are some edge
> cases with this approach that will still cause an error. For example when
> writing to a new topic that’s empty, querying this topic before won't
> return any records and therefore the schema would not be put into the
> schemaRegistryClient cache.
>
>
>
> I'm still preferring a flag for the "avro-confluent-registry" format that
> will disable registering schemas and instead the format will just try to
> get the ID for a schema string from the registry. If there is an ID for
> that schema, Flink will use it. If there is none, an exception should be
> thrown.
>
> What do you think of that?
>
>
>
> Best regards,
>
> Jannik
>
>
>
>
>
> *Von:* Schwalbe Matthias <Ma...@viseca.ch>
> *Gesendet:* Mittwoch, 31. Mai 2023 13:33
> *An:* Schmeier, Jannik <J....@Fraport.de>; user@flink.apache.org
> *Betreff:* RE: Using pre-registered schemas with avro-confluent-registry
> format is not possible
>
>
>
>
>
> Hello Jannik,
>
>
>
> Some things to consider (I had a similar problem a couple of years before):
>
>    - The schemaRegistryClient actually caches schema ids, so it will hit
>    the schema registry only once,
>    - The schema registered in schema registry needs to be byte-equal,
>    otherwise schema registry considers it to be a new schema (version)
>    - … to my best knowledge writing an existing schema to the schema
>    registry does not fail because it is actually not written
>
>
>    - Could be that this is not entirely true as we had to replace the
>       whole schemaRegistryClient with our own implementation because the existing
>       one could not be reconfigured to accept compressed answers from our r/o
>       proxy
>
>
>    - if you manage to fill the cache of your schemaRegistryClient with
>    the exact schema (e.g. by querying it beforehand) you might never run into
>    the trouble
>
>
>
> Hope this helps … keep us posted 😊
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Schmeier, Jannik <J....@Fraport.de>
> *Sent:* Wednesday, May 31, 2023 12:44 PM
> *To:* user@flink.apache.org
> *Subject:* Using pre-registered schemas with avro-confluent-registry
> format is not possible
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hello,
>
>
>
> I'm trying to use the avro-confluent-registry format with the Confluent
> Cloud Schema Registry in our company.
>
> Our schemas are managed via Terraform and global write access is denied
> for all Kafka clients in our environments (or at least in production).
>
> Therefore, when using the avro-confluent-registry format I'm getting an
> error when Flink is trying to serialize a row:
>
> java.lang.RuntimeException: Failed to serialize row.
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36)
> ~[?:?]
>
>                 at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at StreamExecCalc$2221.processElement_0_0(Unknown Source)
> ~[?:?]
>
>                 at
> StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown
> Source) ~[?:?]
>
>                 at
> StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
>
>                 at StreamExecCalc$2221.processElement_split308(Unknown
> Source) ~[?:?]
>
>                 at StreamExecCalc$2221.processElement(Unknown Source)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at java.lang.Thread.run(Unknown Source) ~[?:?]
>
> Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to
> serialize schema registry.
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
> Caused by: java.io.IOException: Could not register schema in registry
>
>                 at
> org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
> Caused by:
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
> User is denied operation Write on Subject: my-topic-key; error code: 40301
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
>
> I've inspected the code of the avro-confluent-registry format and it seems
> like there is now way to disable this behavior. The format will always try
> to register a schema when serializing a row:
>
>
> https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85
> <https://protect2.fireeye.com/v1/url?k=31323334-501cfaeb-3130e0a1-454455535732-7390c701ed1b735f&q=1&e=10d0bc8a-494c-411b-8ee0-ba0c9041d581&u=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.17.1%2Fflink-formats%2Fflink-avro%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fformats%2Favro%2FRegistryAvroSerializationSchema.java%23L85>
>
> https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85
> <https://protect2.fireeye.com/v1/url?k=31323334-501cfaeb-3130e0a1-454455535732-e472ad0314ef54c8&q=1&e=10d0bc8a-494c-411b-8ee0-ba0c9041d581&u=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.17.1%2Fflink-formats%2Fflink-avro-confluent-registry%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fformats%2Favro%2Fregistry%2Fconfluent%2FConfluentSchemaRegistryCoder.java%23L85>
>
> Is there a particular reason for this or would you be interested in adding
> a configuration option to disable this behavior?
>
>
>
> Best regards,
>
> Jannik
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
>

Re: Using pre-registered schemas with avro-confluent-registry format is not possible

Posted by "Meissner, Dylan via user" <us...@flink.apache.org>.
There’s no need to build Flink. Implement your own org.apache.flink.formats.avro.SchemaCoder, and the SchemaCoder.SchemaCoderProvider accepted by the constructor of org.apache.flink.formats.avro.RegistryAvroSerializationSchema.

This way, you can replace the dangerous “register()” with simply “getId()”.

To solve this for everyone, the Confluent schema coder should honor Confluent’s “auto.register.schemas” property in SchemaCoder.

From: Schmeier, Jannik <J....@Fraport.de>
Date: Tuesday, July 11, 2023 at 4:43 AM
To: Meissner, Dylan <dy...@nordstrom.com>, Martijn Visser <ma...@apache.org>
Cc: Schwalbe Matthias <Ma...@viseca.ch>, user@flink.apache.org <us...@flink.apache.org>
Subject: AW: Using pre-registered schemas with avro-confluent-registry format is not possible
Caution: EXTERNAL EMAIL ALERT. This sender is not from your organization. If you do not recognize the sender, DO NOT CLICK on links or attachments.


Do you know how they are overriding the method? Are they building their own distribution of Flink with their own implementation of that method? I'd like to avoid having to build that myself. I'd be interested in a solution in the official release.

Von: Meissner, Dylan <dy...@nordstrom.com>
Gesendet: Freitag, 30. Juni 2023 17:26
An: Martijn Visser <ma...@apache.org>; Schmeier, Jannik <J....@Fraport.de>
Cc: Schwalbe Matthias <Ma...@viseca.ch>; user@flink.apache.org
Betreff: Re: Using pre-registered schemas with avro-confluent-registry format is not possible

This comes up when the Schema Registry has authorization capabilities.

A producing application may not be authorized to call “register” when another principal owns the schema subject, for example in organizations with strong schema governance practices. When ConfluentSchemaRegistryCoder.writeSchema is invoked, these producers will receive HTTP 403 failures.

In these unfortunate cases, they are overriding writeSchema…

@Override
public void writeSchema(Schema schema, OutputStream out) throws IOException {
    int registeredId = schemaRegistryClient.register(subject, schema);
    out.write(CONFLUENT_MAGIC_BYTE);
    byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array();
    out.write(schemaIdBytes);
}

…by replacing “register()” with the schema lookup call, “getId()”

@Override
public void writeSchema(Schema schema, OutputStream out) throws IOException {
    int registeredId = schemaRegistryClient.getId(subject, schema);
    out.write(CONFLUENT_MAGIC_BYTE);
    byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array();
    out.write(schemaIdBytes);
}

Dylan


From: Martijn Visser <ma...@apache.org>>
Date: Monday, June 5, 2023 at 7:45 AM
To: Schmeier, Jannik <J....@fraport.de>>
Cc: Schwalbe Matthias <Ma...@viseca.ch>>, user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: Re: Using pre-registered schemas with avro-confluent-registry format is not possible
Caution: EXTERNAL EMAIL ALERT. This sender is not from your organization. If you do not recognize the sender, DO NOT CLICK on links or attachments.


Hi Jannik,

Can you share how you've set those properties, because I've been able to use this without any problems.

Best regards,

Martijn

On Thu, Jun 1, 2023 at 2:43 PM Schmeier, Jannik <J....@fraport.de>> wrote:
Hello Thias,

thank you for your answer.

We've tested registering an existing (byte equal) schema a second time, but unfortunately the schema registry does still deny the request.

Your last suggestion sounds promising, but I think there are some edge cases with this approach that will still cause an error. For example when writing to a new topic that’s empty, querying this topic before won't return any records and therefore the schema would not be put into the schemaRegistryClient cache.

I'm still preferring a flag for the "avro-confluent-registry" format that will disable registering schemas and instead the format will just try to get the ID for a schema string from the registry. If there is an ID for that schema, Flink will use it. If there is none, an exception should be thrown.
What do you think of that?

Best regards,
Jannik


Von: Schwalbe Matthias <Ma...@viseca.ch>>
Gesendet: Mittwoch, 31. Mai 2023 13:33
An: Schmeier, Jannik <J....@Fraport.de>>; user@flink.apache.org<ma...@flink.apache.org>
Betreff: RE: Using pre-registered schemas with avro-confluent-registry format is not possible


Hello Jannik,

Some things to consider (I had a similar problem a couple of years before):

  *   The schemaRegistryClient actually caches schema ids, so it will hit the schema registry only once,
  *   The schema registered in schema registry needs to be byte-equal, otherwise schema registry considers it to be a new schema (version)
  *   … to my best knowledge writing an existing schema to the schema registry does not fail because it is actually not written

     *   Could be that this is not entirely true as we had to replace the whole schemaRegistryClient with our own implementation because the existing one could not be reconfigured to accept compressed answers from our r/o proxy

  *   if you manage to fill the cache of your schemaRegistryClient with the exact schema (e.g. by querying it beforehand) you might never run into the trouble

Hope this helps … keep us posted 😊

Thias




From: Schmeier, Jannik <J....@Fraport.de>>
Sent: Wednesday, May 31, 2023 12:44 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Using pre-registered schemas with avro-confluent-registry format is not possible

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

Hello,

I'm trying to use the avro-confluent-registry format with the Confluent Cloud Schema Registry in our company.
Our schemas are managed via Terraform and global write access is denied for all Kafka clients in our environments (or at least in production).
Therefore, when using the avro-confluent-registry format I'm getting an error when Flink is trying to serialize a row:

java.lang.RuntimeException: Failed to serialize row.
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36) ~[?:?]
                at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) ~[?:?]
                at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at StreamExecCalc$2221.processElement_0_0(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_split308(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement(Unknown Source) ~[?:?]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.0.jar:1.17.0]
                at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to serialize schema registry.
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: java.io.IOException: Could not register schema in registry
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: User is denied operation Write on Subject: my-topic-key; error code: 40301
                at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42) ~[?:?]
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more

I've inspected the code of the avro-confluent-registry format and it seems like there is now way to disable this behavior. The format will always try to register a schema when serializing a row:

https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85<https://protect2.fireeye.com/v1/url?k=31323334-501cfaeb-3130e0a1-454455535732-7390c701ed1b735f&q=1&e=10d0bc8a-494c-411b-8ee0-ba0c9041d581&u=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.17.1%2Fflink-formats%2Fflink-avro%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fformats%2Favro%2FRegistryAvroSerializationSchema.java%23L85>
https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85<https://protect2.fireeye.com/v1/url?k=31323334-501cfaeb-3130e0a1-454455535732-e472ad0314ef54c8&q=1&e=10d0bc8a-494c-411b-8ee0-ba0c9041d581&u=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.17.1%2Fflink-formats%2Fflink-avro-confluent-registry%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fformats%2Favro%2Fregistry%2Fconfluent%2FConfluentSchemaRegistryCoder.java%23L85>
Is there a particular reason for this or would you be interested in adding a configuration option to disable this behavior?

Best regards,
Jannik
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

AW: Using pre-registered schemas with avro-confluent-registry format is not possible

Posted by "Schmeier, Jannik" <J....@Fraport.de>.
Do you know how they are overriding the method? Are they building their own distribution of Flink with their own implementation of that method? I'd like to avoid having to build that myself. I'd be interested in a solution in the official release.

Von: Meissner, Dylan <dy...@nordstrom.com>
Gesendet: Freitag, 30. Juni 2023 17:26
An: Martijn Visser <ma...@apache.org>; Schmeier, Jannik <J....@Fraport.de>
Cc: Schwalbe Matthias <Ma...@viseca.ch>; user@flink.apache.org
Betreff: Re: Using pre-registered schemas with avro-confluent-registry format is not possible

This comes up when the Schema Registry has authorization capabilities.

A producing application may not be authorized to call “register” when another principal owns the schema subject, for example in organizations with strong schema governance practices. When ConfluentSchemaRegistryCoder.writeSchema is invoked, these producers will receive HTTP 403 failures.

In these unfortunate cases, they are overriding writeSchema…

@Override
public void writeSchema(Schema schema, OutputStream out) throws IOException {
    int registeredId = schemaRegistryClient.register(subject, schema);
    out.write(CONFLUENT_MAGIC_BYTE);
    byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array();
    out.write(schemaIdBytes);
}

…by replacing “register()” with the schema lookup call, “getId()”

@Override
public void writeSchema(Schema schema, OutputStream out) throws IOException {
    int registeredId = schemaRegistryClient.getId(subject, schema);
    out.write(CONFLUENT_MAGIC_BYTE);
    byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array();
    out.write(schemaIdBytes);
}

Dylan


From: Martijn Visser <ma...@apache.org>>
Date: Monday, June 5, 2023 at 7:45 AM
To: Schmeier, Jannik <J....@fraport.de>>
Cc: Schwalbe Matthias <Ma...@viseca.ch>>, user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: Re: Using pre-registered schemas with avro-confluent-registry format is not possible
Caution: EXTERNAL EMAIL ALERT. This sender is not from your organization. If you do not recognize the sender, DO NOT CLICK on links or attachments.


Hi Jannik,

Can you share how you've set those properties, because I've been able to use this without any problems.

Best regards,

Martijn

On Thu, Jun 1, 2023 at 2:43 PM Schmeier, Jannik <J....@fraport.de>> wrote:
Hello Thias,

thank you for your answer.

We've tested registering an existing (byte equal) schema a second time, but unfortunately the schema registry does still deny the request.

Your last suggestion sounds promising, but I think there are some edge cases with this approach that will still cause an error. For example when writing to a new topic that’s empty, querying this topic before won't return any records and therefore the schema would not be put into the schemaRegistryClient cache.

I'm still preferring a flag for the "avro-confluent-registry" format that will disable registering schemas and instead the format will just try to get the ID for a schema string from the registry. If there is an ID for that schema, Flink will use it. If there is none, an exception should be thrown.
What do you think of that?

Best regards,
Jannik


Von: Schwalbe Matthias <Ma...@viseca.ch>>
Gesendet: Mittwoch, 31. Mai 2023 13:33
An: Schmeier, Jannik <J....@Fraport.de>>; user@flink.apache.org<ma...@flink.apache.org>
Betreff: RE: Using pre-registered schemas with avro-confluent-registry format is not possible


Hello Jannik,

Some things to consider (I had a similar problem a couple of years before):

  *   The schemaRegistryClient actually caches schema ids, so it will hit the schema registry only once,
  *   The schema registered in schema registry needs to be byte-equal, otherwise schema registry considers it to be a new schema (version)
  *   … to my best knowledge writing an existing schema to the schema registry does not fail because it is actually not written

     *   Could be that this is not entirely true as we had to replace the whole schemaRegistryClient with our own implementation because the existing one could not be reconfigured to accept compressed answers from our r/o proxy

  *   if you manage to fill the cache of your schemaRegistryClient with the exact schema (e.g. by querying it beforehand) you might never run into the trouble

Hope this helps … keep us posted 😊

Thias




From: Schmeier, Jannik <J....@Fraport.de>>
Sent: Wednesday, May 31, 2023 12:44 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Using pre-registered schemas with avro-confluent-registry format is not possible

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

Hello,

I'm trying to use the avro-confluent-registry format with the Confluent Cloud Schema Registry in our company.
Our schemas are managed via Terraform and global write access is denied for all Kafka clients in our environments (or at least in production).
Therefore, when using the avro-confluent-registry format I'm getting an error when Flink is trying to serialize a row:

java.lang.RuntimeException: Failed to serialize row.
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36) ~[?:?]
                at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) ~[?:?]
                at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at StreamExecCalc$2221.processElement_0_0(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_split308(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement(Unknown Source) ~[?:?]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.0.jar:1.17.0]
                at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to serialize schema registry.
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: java.io.IOException: Could not register schema in registry
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: User is denied operation Write on Subject: my-topic-key; error code: 40301
                at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42) ~[?:?]
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more

I've inspected the code of the avro-confluent-registry format and it seems like there is now way to disable this behavior. The format will always try to register a schema when serializing a row:

https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85<https://protect2.fireeye.com/v1/url?k=31323334-501cfaeb-3130e0a1-454455535732-7390c701ed1b735f&q=1&e=10d0bc8a-494c-411b-8ee0-ba0c9041d581&u=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.17.1%2Fflink-formats%2Fflink-avro%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fformats%2Favro%2FRegistryAvroSerializationSchema.java%23L85>
https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85<https://protect2.fireeye.com/v1/url?k=31323334-501cfaeb-3130e0a1-454455535732-e472ad0314ef54c8&q=1&e=10d0bc8a-494c-411b-8ee0-ba0c9041d581&u=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.17.1%2Fflink-formats%2Fflink-avro-confluent-registry%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fformats%2Favro%2Fregistry%2Fconfluent%2FConfluentSchemaRegistryCoder.java%23L85>
Is there a particular reason for this or would you be interested in adding a configuration option to disable this behavior?

Best regards,
Jannik
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Using pre-registered schemas with avro-confluent-registry format is not possible

Posted by "Meissner, Dylan via user" <us...@flink.apache.org>.
This comes up when the Schema Registry has authorization capabilities.

A producing application may not be authorized to call “register” when another principal owns the schema subject, for example in organizations with strong schema governance practices. When ConfluentSchemaRegistryCoder.writeSchema is invoked, these producers will receive HTTP 403 failures.

In these unfortunate cases, they are overriding writeSchema…

@Override
public void writeSchema(Schema schema, OutputStream out) throws IOException {
    int registeredId = schemaRegistryClient.register(subject, schema);
    out.write(CONFLUENT_MAGIC_BYTE);
    byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array();
    out.write(schemaIdBytes);
}

…by replacing “register()” with the schema lookup call, “getId()”

@Override
public void writeSchema(Schema schema, OutputStream out) throws IOException {
    int registeredId = schemaRegistryClient.getId(subject, schema);
    out.write(CONFLUENT_MAGIC_BYTE);
    byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(registeredId).array();
    out.write(schemaIdBytes);
}

Dylan


From: Martijn Visser <ma...@apache.org>
Date: Monday, June 5, 2023 at 7:45 AM
To: Schmeier, Jannik <J....@fraport.de>
Cc: Schwalbe Matthias <Ma...@viseca.ch>, user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Using pre-registered schemas with avro-confluent-registry format is not possible
Caution: EXTERNAL EMAIL ALERT. This sender is not from your organization. If you do not recognize the sender, DO NOT CLICK on links or attachments.


Hi Jannik,

Can you share how you've set those properties, because I've been able to use this without any problems.

Best regards,

Martijn

On Thu, Jun 1, 2023 at 2:43 PM Schmeier, Jannik <J....@fraport.de>> wrote:
Hello Thias,

thank you for your answer.

We've tested registering an existing (byte equal) schema a second time, but unfortunately the schema registry does still deny the request.

Your last suggestion sounds promising, but I think there are some edge cases with this approach that will still cause an error. For example when writing to a new topic that’s empty, querying this topic before won't return any records and therefore the schema would not be put into the schemaRegistryClient cache.

I'm still preferring a flag for the "avro-confluent-registry" format that will disable registering schemas and instead the format will just try to get the ID for a schema string from the registry. If there is an ID for that schema, Flink will use it. If there is none, an exception should be thrown.
What do you think of that?

Best regards,
Jannik


Von: Schwalbe Matthias <Ma...@viseca.ch>>
Gesendet: Mittwoch, 31. Mai 2023 13:33
An: Schmeier, Jannik <J....@Fraport.de>; user@flink.apache.org<ma...@flink.apache.org>
Betreff: RE: Using pre-registered schemas with avro-confluent-registry format is not possible


Hello Jannik,

Some things to consider (I had a similar problem a couple of years before):

  *   The schemaRegistryClient actually caches schema ids, so it will hit the schema registry only once,
  *   The schema registered in schema registry needs to be byte-equal, otherwise schema registry considers it to be a new schema (version)
  *   … to my best knowledge writing an existing schema to the schema registry does not fail because it is actually not written

     *   Could be that this is not entirely true as we had to replace the whole schemaRegistryClient with our own implementation because the existing one could not be reconfigured to accept compressed answers from our r/o proxy

  *   if you manage to fill the cache of your schemaRegistryClient with the exact schema (e.g. by querying it beforehand) you might never run into the trouble

Hope this helps … keep us posted 😊

Thias




From: Schmeier, Jannik <J....@Fraport.de>>
Sent: Wednesday, May 31, 2023 12:44 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Using pre-registered schemas with avro-confluent-registry format is not possible

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

Hello,

I'm trying to use the avro-confluent-registry format with the Confluent Cloud Schema Registry in our company.
Our schemas are managed via Terraform and global write access is denied for all Kafka clients in our environments (or at least in production).
Therefore, when using the avro-confluent-registry format I'm getting an error when Flink is trying to serialize a row:

java.lang.RuntimeException: Failed to serialize row.
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36) ~[?:?]
                at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) ~[?:?]
                at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at StreamExecCalc$2221.processElement_0_0(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_split308(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement(Unknown Source) ~[?:?]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.0.jar:1.17.0]
                at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to serialize schema registry.
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: java.io.IOException: Could not register schema in registry
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: User is denied operation Write on Subject: my-topic-key; error code: 40301
                at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42) ~[?:?]
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more

I've inspected the code of the avro-confluent-registry format and it seems like there is now way to disable this behavior. The format will always try to register a schema when serializing a row:

https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85<https://protect2.fireeye.com/v1/url?k=31323334-501cfaeb-3130e0a1-454455535732-7390c701ed1b735f&q=1&e=10d0bc8a-494c-411b-8ee0-ba0c9041d581&u=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.17.1%2Fflink-formats%2Fflink-avro%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fformats%2Favro%2FRegistryAvroSerializationSchema.java%23L85>
https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85<https://protect2.fireeye.com/v1/url?k=31323334-501cfaeb-3130e0a1-454455535732-e472ad0314ef54c8&q=1&e=10d0bc8a-494c-411b-8ee0-ba0c9041d581&u=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.17.1%2Fflink-formats%2Fflink-avro-confluent-registry%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fformats%2Favro%2Fregistry%2Fconfluent%2FConfluentSchemaRegistryCoder.java%23L85>
Is there a particular reason for this or would you be interested in adding a configuration option to disable this behavior?

Best regards,
Jannik
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Using pre-registered schemas with avro-confluent-registry format is not possible

Posted by Martijn Visser <ma...@apache.org>.
Hi Jannik,

Can you share how you've set those properties, because I've been able to
use this without any problems.

Best regards,

Martijn

On Thu, Jun 1, 2023 at 2:43 PM Schmeier, Jannik <J....@fraport.de>
wrote:

> Hello Thias,
>
>
>
> thank you for your answer.
>
>
>
> We've tested registering an existing (byte equal) schema a second time,
> but unfortunately the schema registry does still deny the request.
>
>
>
> Your last suggestion sounds promising, but I think there are some edge
> cases with this approach that will still cause an error. For example when
> writing to a new topic that’s empty, querying this topic before won't
> return any records and therefore the schema would not be put into the
> schemaRegistryClient cache.
>
>
>
> I'm still preferring a flag for the "avro-confluent-registry" format that
> will disable registering schemas and instead the format will just try to
> get the ID for a schema string from the registry. If there is an ID for
> that schema, Flink will use it. If there is none, an exception should be
> thrown.
>
> What do you think of that?
>
>
>
> Best regards,
>
> Jannik
>
>
>
>
>
> *Von:* Schwalbe Matthias <Ma...@viseca.ch>
> *Gesendet:* Mittwoch, 31. Mai 2023 13:33
> *An:* Schmeier, Jannik <J....@Fraport.de>; user@flink.apache.org
> *Betreff:* RE: Using pre-registered schemas with avro-confluent-registry
> format is not possible
>
>
>
>
>
> Hello Jannik,
>
>
>
> Some things to consider (I had a similar problem a couple of years before):
>
>    - The schemaRegistryClient actually caches schema ids, so it will hit
>    the schema registry only once,
>    - The schema registered in schema registry needs to be byte-equal,
>    otherwise schema registry considers it to be a new schema (version)
>    - … to my best knowledge writing an existing schema to the schema
>    registry does not fail because it is actually not written
>       - Could be that this is not entirely true as we had to replace the
>       whole schemaRegistryClient with our own implementation because the existing
>       one could not be reconfigured to accept compressed answers from our r/o
>       proxy
>    - if you manage to fill the cache of your schemaRegistryClient with
>    the exact schema (e.g. by querying it beforehand) you might never run into
>    the trouble
>
>
>
> Hope this helps … keep us posted 😊
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Schmeier, Jannik <J....@Fraport.de>
> *Sent:* Wednesday, May 31, 2023 12:44 PM
> *To:* user@flink.apache.org
> *Subject:* Using pre-registered schemas with avro-confluent-registry
> format is not possible
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hello,
>
>
>
> I'm trying to use the avro-confluent-registry format with the Confluent
> Cloud Schema Registry in our company.
>
> Our schemas are managed via Terraform and global write access is denied
> for all Kafka clients in our environments (or at least in production).
>
> Therefore, when using the avro-confluent-registry format I'm getting an
> error when Flink is trying to serialize a row:
>
> java.lang.RuntimeException: Failed to serialize row.
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36)
> ~[?:?]
>
>                 at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at StreamExecCalc$2221.processElement_0_0(Unknown Source)
> ~[?:?]
>
>                 at
> StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown
> Source) ~[?:?]
>
>                 at
> StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
>
>                 at StreamExecCalc$2221.processElement_split308(Unknown
> Source) ~[?:?]
>
>                 at StreamExecCalc$2221.processElement(Unknown Source)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at java.lang.Thread.run(Unknown Source) ~[?:?]
>
> Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to
> serialize schema registry.
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
> Caused by: java.io.IOException: Could not register schema in registry
>
>                 at
> org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
> Caused by:
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
> User is denied operation Write on Subject: my-topic-key; error code: 40301
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
>
> I've inspected the code of the avro-confluent-registry format and it seems
> like there is now way to disable this behavior. The format will always try
> to register a schema when serializing a row:
>
>
> https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85
>
> https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85
>
> Is there a particular reason for this or would you be interested in adding
> a configuration option to disable this behavior?
>
>
>
> Best regards,
>
> Jannik
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

AW: Using pre-registered schemas with avro-confluent-registry format is not possible

Posted by "Schmeier, Jannik" <J....@Fraport.de>.
Hello Thias,

thank you for your answer.

We've tested registering an existing (byte equal) schema a second time, but unfortunately the schema registry does still deny the request.

Your last suggestion sounds promising, but I think there are some edge cases with this approach that will still cause an error. For example when writing to a new topic that’s empty, querying this topic before won't return any records and therefore the schema would not be put into the schemaRegistryClient cache.

I'm still preferring a flag for the "avro-confluent-registry" format that will disable registering schemas and instead the format will just try to get the ID for a schema string from the registry. If there is an ID for that schema, Flink will use it. If there is none, an exception should be thrown.
What do you think of that?

Best regards,
Jannik


Von: Schwalbe Matthias <Ma...@viseca.ch>
Gesendet: Mittwoch, 31. Mai 2023 13:33
An: Schmeier, Jannik <J....@Fraport.de>; user@flink.apache.org
Betreff: RE: Using pre-registered schemas with avro-confluent-registry format is not possible


Hello Jannik,

Some things to consider (I had a similar problem a couple of years before):

  *   The schemaRegistryClient actually caches schema ids, so it will hit the schema registry only once,
  *   The schema registered in schema registry needs to be byte-equal, otherwise schema registry considers it to be a new schema (version)
  *   … to my best knowledge writing an existing schema to the schema registry does not fail because it is actually not written
     *   Could be that this is not entirely true as we had to replace the whole schemaRegistryClient with our own implementation because the existing one could not be reconfigured to accept compressed answers from our r/o proxy
  *   if you manage to fill the cache of your schemaRegistryClient with the exact schema (e.g. by querying it beforehand) you might never run into the trouble

Hope this helps … keep us posted 😊

Thias




From: Schmeier, Jannik <J....@Fraport.de>>
Sent: Wednesday, May 31, 2023 12:44 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Using pre-registered schemas with avro-confluent-registry format is not possible

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

Hello,

I'm trying to use the avro-confluent-registry format with the Confluent Cloud Schema Registry in our company.
Our schemas are managed via Terraform and global write access is denied for all Kafka clients in our environments (or at least in production).
Therefore, when using the avro-confluent-registry format I'm getting an error when Flink is trying to serialize a row:

java.lang.RuntimeException: Failed to serialize row.
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36) ~[?:?]
                at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) ~[?:?]
                at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at StreamExecCalc$2221.processElement_0_0(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_split308(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement(Unknown Source) ~[?:?]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.0.jar:1.17.0]
                at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to serialize schema registry.
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: java.io.IOException: Could not register schema in registry
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: User is denied operation Write on Subject: my-topic-key; error code: 40301
                at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42) ~[?:?]
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more

I've inspected the code of the avro-confluent-registry format and it seems like there is now way to disable this behavior. The format will always try to register a schema when serializing a row:

https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85
https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85
Is there a particular reason for this or would you be interested in adding a configuration option to disable this behavior?

Best regards,
Jannik
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

RE: Using pre-registered schemas with avro-confluent-registry format is not possible

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hello Jannik,

Some things to consider (I had a similar problem a couple of years before):

  *   The schemaRegistryClient actually caches schema ids, so it will hit the schema registry only once,
  *   The schema registered in schema registry needs to be byte-equal, otherwise schema registry considers it to be a new schema (version)
  *   … to my best knowledge writing an existing schema to the schema registry does not fail because it is actually not written
     *   Could be that this is not entirely true as we had to replace the whole schemaRegistryClient with our own implementation because the existing one could not be reconfigured to accept compressed answers from our r/o proxy
  *   if you manage to fill the cache of your schemaRegistryClient with the exact schema (e.g. by querying it beforehand) you might never run into the trouble

Hope this helps … keep us posted 😊

Thias




From: Schmeier, Jannik <J....@Fraport.de>
Sent: Wednesday, May 31, 2023 12:44 PM
To: user@flink.apache.org
Subject: Using pre-registered schemas with avro-confluent-registry format is not possible

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hello,

I'm trying to use the avro-confluent-registry format with the Confluent Cloud Schema Registry in our company.
Our schemas are managed via Terraform and global write access is denied for all Kafka clients in our environments (or at least in production).
Therefore, when using the avro-confluent-registry format I'm getting an error when Flink is trying to serialize a row:

java.lang.RuntimeException: Failed to serialize row.
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95) ~[?:?]
                at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36) ~[?:?]
                at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) ~[?:?]
                at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at StreamExecCalc$2221.processElement_0_0(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement_split308(Unknown Source) ~[?:?]
                at StreamExecCalc$2221.processElement(Unknown Source) ~[?:?]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228) ~[flink-table-runtime-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-dist-1.17.0.jar:1.17.0]
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.17.0.jar:1.17.0]
                at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to serialize schema registry.
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: java.io.IOException: Could not register schema in registry
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: User is denied operation Write on Subject: my-topic-key; error code: 40301
                at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259) ~[?:?]
                at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42) ~[?:?]
                at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85) ~[?:?]
                at org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85) ~[?:?]
                at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88) ~[?:?]
                ... 44 more

I've inspected the code of the avro-confluent-registry format and it seems like there is now way to disable this behavior. The format will always try to register a schema when serializing a row:

https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85
https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85
Is there a particular reason for this or would you be interested in adding a configuration option to disable this behavior?

Best regards,
Jannik
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.