You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Yufei Zhang <af...@gmail.com> on 2022/05/09 05:57:22 UTC

[Question][Kafka SQL Connector] When will decoding format emits multiple keys for a Kafka key; why do a cartesian product.

I was reading the source code of Kafka SQL Source Connector, I noticed that
in  DynamicKafkaDeserializationSchema[1], when the schema emits multiple
keys, the code is doing a cartesian product of the key rows and value rows.
I know that in CDC, a format can emit multiple rows (UPDATE_BEFORE and
UPDATE_AFTER rows) for a single message, but I'm wondering
1. In what case will a key emits multiple rows?
2. what does the cartesian product of key and value rows represent? (if
there is only 1 keyRow, then it make sense, but when both keyRows and
valueRows have more than 1, I failed to infer the possible use case)

```

// otherwise emit a value for each key
for (RowData physicalKeyRow : physicalKeyRows) {
    emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
}


I searched in the docs but failed to understand the design here.  Any hints
would be appreciated~ Thanks~







[1]
https://github.com/apache/flink/blob/1fc26192cf794961af4f6933c155daf86eb8a0fe/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L224


Cheers,
Affe

Re: [Question][Kafka SQL Connector] When will decoding format emits multiple keys for a Kafka key; why do a cartesian product.

Posted by Yufei Zhang <af...@gmail.com>.
Hi Jark,

Thanks for the explanation, it answered my question well ~

Only one thing, if the keyRow count is N and value rowCount is M (N, M >
1), the cartesian product might not cover all use cases.
But I think we don't need to worry about it for now, since this case is
rare and we can discuss it later if we do encounter
such a use case.

Thanks~
Yufei

On Mon, May 9, 2022 at 2:35 PM Jark Wu <im...@gmail.com> wrote:

> Hi Affe,
>
> Regarding the implementation, from the interface of
> `DeserializationSchema#deserialize(byte[], Collector<T>)`, it might emit
> multiple rows.
> So this is just a more generic implementation instead of hard-code dropping
> rows.
>
> Even though, currently, there is no built-in key format that will emit
> multiple rows.
> However, we can't assume there is no use case for it. I can imagine some
> special formats
> that group the same values into one key-value entry. In the terms of
> implementation,
> I think it makes sense to do the cartesian product.
>
> Best,
> Jark
>
> On Mon, 9 May 2022 at 13:57, Yufei Zhang <af...@gmail.com> wrote:
>
> > I was reading the source code of Kafka SQL Source Connector, I noticed
> that
> > in  DynamicKafkaDeserializationSchema[1], when the schema emits multiple
> > keys, the code is doing a cartesian product of the key rows and value
> rows.
> > I know that in CDC, a format can emit multiple rows (UPDATE_BEFORE and
> > UPDATE_AFTER rows) for a single message, but I'm wondering
> > 1. In what case will a key emits multiple rows?
> > 2. what does the cartesian product of key and value rows represent? (if
> > there is only 1 keyRow, then it make sense, but when both keyRows and
> > valueRows have more than 1, I failed to infer the possible use case)
> >
> > ```
> >
> > // otherwise emit a value for each key
> > for (RowData physicalKeyRow : physicalKeyRows) {
> >     emitRow((GenericRowData) physicalKeyRow, (GenericRowData)
> > physicalValueRow);
> > }
> >
> >
> > I searched in the docs but failed to understand the design here.  Any
> hints
> > would be appreciated~ Thanks~
> >
> >
> >
> >
> >
> >
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/1fc26192cf794961af4f6933c155daf86eb8a0fe/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L224
> >
> >
> > Cheers,
> > Affe
> >
>

Re: [Question][Kafka SQL Connector] When will decoding format emits multiple keys for a Kafka key; why do a cartesian product.

Posted by Jark Wu <im...@gmail.com>.
Hi Affe,

Regarding the implementation, from the interface of
`DeserializationSchema#deserialize(byte[], Collector<T>)`, it might emit
multiple rows.
So this is just a more generic implementation instead of hard-code dropping
rows.

Even though, currently, there is no built-in key format that will emit
multiple rows.
However, we can't assume there is no use case for it. I can imagine some
special formats
that group the same values into one key-value entry. In the terms of
implementation,
I think it makes sense to do the cartesian product.

Best,
Jark

On Mon, 9 May 2022 at 13:57, Yufei Zhang <af...@gmail.com> wrote:

> I was reading the source code of Kafka SQL Source Connector, I noticed that
> in  DynamicKafkaDeserializationSchema[1], when the schema emits multiple
> keys, the code is doing a cartesian product of the key rows and value rows.
> I know that in CDC, a format can emit multiple rows (UPDATE_BEFORE and
> UPDATE_AFTER rows) for a single message, but I'm wondering
> 1. In what case will a key emits multiple rows?
> 2. what does the cartesian product of key and value rows represent? (if
> there is only 1 keyRow, then it make sense, but when both keyRows and
> valueRows have more than 1, I failed to infer the possible use case)
>
> ```
>
> // otherwise emit a value for each key
> for (RowData physicalKeyRow : physicalKeyRows) {
>     emitRow((GenericRowData) physicalKeyRow, (GenericRowData)
> physicalValueRow);
> }
>
>
> I searched in the docs but failed to understand the design here.  Any hints
> would be appreciated~ Thanks~
>
>
>
>
>
>
>
> [1]
>
> https://github.com/apache/flink/blob/1fc26192cf794961af4f6933c155daf86eb8a0fe/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L224
>
>
> Cheers,
> Affe
>