You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Svend Vanderveken (Jira)" <ji...@apache.org> on 2021/01/16 13:51:00 UTC
[jira] [Updated] (FLINK-20999) Confluent Avro Format should
document how to serialize kafka keys
[ https://issues.apache.org/jira/browse/FLINK-20999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Svend Vanderveken updated FLINK-20999:
--------------------------------------
Description:
The [Confluent Avro Format|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html] only shows example of how to serialize/deserialize Kafka values. Also, parameter description is not always clear what is influencing the source and the sink behaviour, IMHO.
This seems surprising especially in the context of a sink kafka connector since keys are such an important concept in that case.
Adding examples of how to serialize/deserialize Kafka keys would add clarity.
While it can be argued that a connector format is independent from the underlying storage, probably showing kafka-oriented examples in this case (i.e, with a concept of "key" and "value") makes senses here since this connector is very much thought with Kafka in mind.
I'm happy to submit a PR with all if this suggested change is approved?
I suggest to add this:
h3. writing to Kafka while keeping the keys in "raw" big endian format:
{code:java}
CREATE TABLE OUTPUT_TABLE (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'raw',
'key.raw.endianness' = 'big-endian',
'key.fields' = 'user_id',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8081',
'value.avro-confluent.schema-registry.subject' = 'user_behavior'
)
{code}
h3. writing to Kafka while registering both the key and the value to the schema registry
{code:java}
CREATE TABLE OUTPUT_TABLE (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
-- => this will register a {user_id: long} Avro type in the schema registry.
-- Watch out: schema evolution in the context of a Kafka key is almost never backward nor
-- forward compatible in practice due to hash partitioning.
'key.avro-confluent.schema-registry.url' = 'http://localhost:8081',
'key.avro-confluent.schema-registry.subject' = 'user_behavior_key',
'key.format' = 'avro-confluent',
'key.fields' = 'user_id',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8081',
'value.avro-confluent.schema-registry.subject' = 'user_behavior_value'
)
{code}
h3. reading form Kafka with both the key and value schema in the registry while resolving field name clashes:
{code:java}
CREATE TABLE INPUT_TABLE (
-- user_id as read from the kafka key:
from_kafka_key_user_id BIGINT,
-- user_id, and other fields, as read from the kafka value-
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'avro-confluent',
'key.avro-confluent.schema-registry.url' = 'http://localhost:8081',
'key.fields' = 'from_kafka_key_user_id',
-- Adds a column prefix when mapping the avro fields of the kafka key to columns of this Table
-- to avoid clashes with avro fields of the value (both contain 'user_id' in this example)
'key.fields-prefix' = 'from_kafka_key_',
'value.format' = 'avro-confluent',
-- cannot include key here since dealt with above
'value.fields-include' = 'EXCEPT_KEY',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8081'
)
{code}
was:
The [Confluent Avro Format|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html] only shows example of how to serialize/deserialize Kafka values. Also, parameter description is not always clear what is influencing the source and the sink behaviour, IMHO.
This seems surprising especially in the context of a sink kafka connector since keys are such an important concept in that case.
Adding examples of how to serialize/deserialize Kafka keys would add clarity.
While it can be argued that a connector format is independent from the underlying storage, probably showing kafka-oriented examples in this case (i.e, with a concept of "key" and "value") makes senses here since this connector is very much thought with Kafka in mind.
I suggest to add this:
h3. writing to Kafka while keeping the keys in "raw" big endian format:
{code:java}
CREATE TABLE OUTPUT_TABLE (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'raw',
'key.raw.endianness' = 'big-endian',
'key.fields' = 'user_id',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8081',
'value.avro-confluent.schema-registry.subject' = 'user_behavior'
)
{code}
h3. writing to Kafka while registering both the key and the value to the schema registry
{code:java}
CREATE TABLE OUTPUT_TABLE (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
-- => this will register a {user_id: long} Avro type in the schema registry.
-- Watch out: schema evolution in the context of a Kafka key is almost never backward nor
-- forward compatible in practice due to hash partitioning.
'key.avro-confluent.schema-registry.url' = 'http://localhost:8081',
'key.avro-confluent.schema-registry.subject' = 'user_behavior_key',
'key.format' = 'avro-confluent',
'key.fields' = 'user_id',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8081',
'value.avro-confluent.schema-registry.subject' = 'user_behavior_value'
)
{code}
h3. reading form Kafka with both the key and value schema in the registry while resolving field name clashes:
{code:java}
CREATE TABLE INPUT_TABLE (
-- user_id as read from the kafka key:
from_kafka_key_user_id BIGINT,
-- user_id, and other fields, as read from the kafka value-
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'avro-confluent',
'key.avro-confluent.schema-registry.url' = 'http://localhost:8081',
'key.fields' = 'from_kafka_key_user_id',
-- Adds a column prefix when mapping the avro fields of the kafka key to columns of this Table
-- to avoid clashes with avro fields of the value (both contain 'user_id' in this example)
'key.fields-prefix' = 'from_kafka_key_',
'value.format' = 'avro-confluent',
-- cannot include key here since dealt with above
'value.fields-include' = 'EXCEPT_KEY',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8081'
)
{code}
> Confluent Avro Format should document how to serialize kafka keys
> -----------------------------------------------------------------
>
> Key: FLINK-20999
> URL: https://issues.apache.org/jira/browse/FLINK-20999
> Project: Flink
> Issue Type: Improvement
> Components: Documentation
> Affects Versions: 1.12.0
> Reporter: Svend Vanderveken
> Priority: Minor
>
> The [Confluent Avro Format|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html] only shows example of how to serialize/deserialize Kafka values. Also, parameter description is not always clear what is influencing the source and the sink behaviour, IMHO.
> This seems surprising especially in the context of a sink kafka connector since keys are such an important concept in that case.
> Adding examples of how to serialize/deserialize Kafka keys would add clarity.
> While it can be argued that a connector format is independent from the underlying storage, probably showing kafka-oriented examples in this case (i.e, with a concept of "key" and "value") makes senses here since this connector is very much thought with Kafka in mind.
>
> I'm happy to submit a PR with all if this suggested change is approved?
>
> I suggest to add this:
> h3. writing to Kafka while keeping the keys in "raw" big endian format:
> {code:java}
> CREATE TABLE OUTPUT_TABLE (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'key.format' = 'raw',
> 'key.raw.endianness' = 'big-endian',
> 'key.fields' = 'user_id',
> 'value.format' = 'avro-confluent',
> 'value.avro-confluent.schema-registry.url' = 'http://localhost:8081',
> 'value.avro-confluent.schema-registry.subject' = 'user_behavior'
> )
>
> {code}
>
> h3. writing to Kafka while registering both the key and the value to the schema registry
> {code:java}
> CREATE TABLE OUTPUT_TABLE (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = 'localhost:9092',
> -- => this will register a {user_id: long} Avro type in the schema registry.
> -- Watch out: schema evolution in the context of a Kafka key is almost never backward nor
> -- forward compatible in practice due to hash partitioning.
> 'key.avro-confluent.schema-registry.url' = 'http://localhost:8081',
> 'key.avro-confluent.schema-registry.subject' = 'user_behavior_key',
> 'key.format' = 'avro-confluent',
> 'key.fields' = 'user_id',
> 'value.format' = 'avro-confluent',
> 'value.avro-confluent.schema-registry.url' = 'http://localhost:8081',
> 'value.avro-confluent.schema-registry.subject' = 'user_behavior_value'
> )
>
> {code}
>
> h3. reading form Kafka with both the key and value schema in the registry while resolving field name clashes:
> {code:java}
> CREATE TABLE INPUT_TABLE (
> -- user_id as read from the kafka key:
> from_kafka_key_user_id BIGINT,
>
> -- user_id, and other fields, as read from the kafka value-
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'key.format' = 'avro-confluent',
> 'key.avro-confluent.schema-registry.url' = 'http://localhost:8081',
> 'key.fields' = 'from_kafka_key_user_id',
> -- Adds a column prefix when mapping the avro fields of the kafka key to columns of this Table
> -- to avoid clashes with avro fields of the value (both contain 'user_id' in this example)
> 'key.fields-prefix' = 'from_kafka_key_',
> 'value.format' = 'avro-confluent',
> -- cannot include key here since dealt with above
> 'value.fields-include' = 'EXCEPT_KEY',
> 'value.avro-confluent.schema-registry.url' = 'http://localhost:8081'
> )
>
> {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)