You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Weiwen Xu <we...@google.com> on 2021/06/01 21:03:33 UTC

[Question] Best Practice of Handling Null Key for KafkaRecordCoder

Hello,

I'm working on [this issue](https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was very helpful in identifying the issue which is that KafkaRecordCoder couldn't handle the case when key is null.

We came out with two potential solutions. Yet both have its pros and cons so I'm hoping to gather some suggestions/opinions or ideas of how to handle this issue. For our solutions:

1. directly wrapping the keyCoder with Nullablecoder i.e. NullableCoder.of(keyCoder)
    cons: backwards compatibility problem

2. writing a completely new class named something like NullableKeyKafkaRecordCoder
    instead of using KVCoder and encode/decode KVs, we have KeyCoder and ValueCoder as fields and another BooleanCoder to encode/decode T/F for present of null key. If key is null, KeyCoder will not encode/decode.
   
              - [L63] encode(...){
                       stringCoder.encode(topic, ...);
                       intCoder.encode(partition, ...);
                       longCoder.encode(offset, ...);
                       longCoder.encode(timestamp, ...);
                       intCoder.encode(timestamptype, ...);
                       headerCoder.encode(...)
                       if(Key!=null){
                              BooleanCoder.encode(false, ...);
                              KeyCoder.encode(key, ...);
                       }else{
                              BooleanCoder.encode(true, ...);
                              // skips KeyCoder when key is null
                       }
                      ValueCoder.encode(value, ...);
                }
    
              - [L74] decode(...){
                      return new KafkaRecord<>(
                                            stringCoder.decode(inStream),
                                            intCoder.decode(inStream),
                                            longCoder.decode(inStream),
                                            longCoder.decode(inStream),
                                            KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
                                            (Headers) toHeaders(headerCoder.decode(inStream)),
                                            BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
                                            ValueCoder.decode(inStream)
                                            );
                }

Best regards,
Weiwen

Re: [Question] Best Practice of Handling Null Key for KafkaRecordCoder

Posted by Brian Hulette <bh...@google.com>.
Could you register a schema for KafkaRecord? Then you can use SchemaCoder
which handles the conversion to/from Row.

On Thu, Jun 3, 2021 at 2:39 PM Chamikara Jayalath <ch...@google.com>
wrote:

> I think for that and for any transform that produces a PCollection of a
> type that is not represented by a standard coder, we would have to add a
> cross-language builder class that returns a PCollection that can be
> supported at the cross-language boundary. For example, it can be a
> PCollection<Row> since RowCoder is already a standard coder. I haven't
> looked closely into fixing this particular Jira though.
>
> Thanks,
> Cham
>
> On Thu, Jun 3, 2021 at 2:31 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> Considering the problem of populating KafkaRecord metadata(BEAM-12076
>> <https://issues.apache.org/jira/projects/BEAM/issues/BEAM-12076>)
>> together, what's the plan there? Are we going to make KafkaRecordCoder as a
>> well-known coder as well? The reason why I ask is because it might be a
>> good chance to revisit the KafkaRecordCoder implementation.
>>
>> On Thu, Jun 3, 2021 at 2:17 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <bo...@google.com> wrote:
>>>
>>>> Supporting the x-lang boundary is a good point. So you are suggesting
>>>> that:
>>>>
>>>>    1. We make NullableCoder as a standard coder.
>>>>    2. KafkaIO wraps the keyCoder with NullabeCoder directly if it
>>>>    requires.
>>>>
>>>> Is that correct?
>>>>
>>>
>>> Yeah.
>>>
>>>
>>>>
>>>>
>>>> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <ch...@google.com>
>>>> wrote:
>>>>
>>>>> I think we should make NullableCoder a standard coder for Beam [1] and
>>>>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might
>>>>> be the standard ByteArrayCoder for example)
>>>>> I think we have compatible Java and Python NullableCoder
>>>>> implementations already so implementing this should be relatively
>>>>> straightforward.
>>>>>
>>>>> Non-standard coders may not be supported by runners at the
>>>>> cross-language boundary.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>>>>>
>>>>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <al...@google.com> wrote:
>>>>>
>>>>>> /cc folks who commented on the issue: @Robin Qiu <ro...@google.com>
>>>>>>  @Chamikara Jayalath <ch...@google.com> @Alexey Romanenko
>>>>>> <ar...@gmail.com> @Daniel Collins <dp...@google.com>
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <we...@google.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I'm working on [this issue](
>>>>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She
>>>>>>> was very helpful in identifying the issue which is that KafkaRecordCoder
>>>>>>> couldn't handle the case when key is null.
>>>>>>>
>>>>>>> We came out with two potential solutions. Yet both have its pros and
>>>>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to
>>>>>>> handle this issue. For our solutions:
>>>>>>>
>>>>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>>>>>> NullableCoder.of(keyCoder)
>>>>>>>     cons: backwards compatibility problem
>>>>>>>
>>>>>>> 2. writing a completely new class named something like
>>>>>>> NullableKeyKafkaRecordCoder
>>>>>>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder
>>>>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>>>>>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>>>>>
>>>>>>>               - [L63] encode(...){
>>>>>>>                        stringCoder.encode(topic, ...);
>>>>>>>                        intCoder.encode(partition, ...);
>>>>>>>                        longCoder.encode(offset, ...);
>>>>>>>                        longCoder.encode(timestamp, ...);
>>>>>>>                        intCoder.encode(timestamptype, ...);
>>>>>>>                        headerCoder.encode(...)
>>>>>>>                        if(Key!=null){
>>>>>>>                               BooleanCoder.encode(false, ...);
>>>>>>>                               KeyCoder.encode(key, ...);
>>>>>>>                        }else{
>>>>>>>                               BooleanCoder.encode(true, ...);
>>>>>>>                               // skips KeyCoder when key is null
>>>>>>>                        }
>>>>>>>                       ValueCoder.encode(value, ...);
>>>>>>>                 }
>>>>>>>
>>>>>>>               - [L74] decode(...){
>>>>>>>                       return new KafkaRecord<>(
>>>>>>>
>>>>>>> stringCoder.decode(inStream),
>>>>>>>
>>>>>>> intCoder.decode(inStream),
>>>>>>>
>>>>>>> longCoder.decode(inStream),
>>>>>>>
>>>>>>> longCoder.decode(inStream),
>>>>>>>
>>>>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>>>>>>                                             (Headers)
>>>>>>> toHeaders(headerCoder.decode(inStream)),
>>>>>>>
>>>>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>>>>>>
>>>>>>> ValueCoder.decode(inStream)
>>>>>>>                                             );
>>>>>>>                 }
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Weiwen
>>>>>>>
>>>>>>

Re: [Question] Best Practice of Handling Null Key for KafkaRecordCoder

Posted by Chamikara Jayalath <ch...@google.com>.
I think for that and for any transform that produces a PCollection of a
type that is not represented by a standard coder, we would have to add a
cross-language builder class that returns a PCollection that can be
supported at the cross-language boundary. For example, it can be a
PCollection<Row> since RowCoder is already a standard coder. I haven't
looked closely into fixing this particular Jira though.

Thanks,
Cham

On Thu, Jun 3, 2021 at 2:31 PM Boyuan Zhang <bo...@google.com> wrote:

> Considering the problem of populating KafkaRecord metadata(BEAM-12076
> <https://issues.apache.org/jira/projects/BEAM/issues/BEAM-12076>)
> together, what's the plan there? Are we going to make KafkaRecordCoder as a
> well-known coder as well? The reason why I ask is because it might be a
> good chance to revisit the KafkaRecordCoder implementation.
>
> On Thu, Jun 3, 2021 at 2:17 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <bo...@google.com> wrote:
>>
>>> Supporting the x-lang boundary is a good point. So you are suggesting
>>> that:
>>>
>>>    1. We make NullableCoder as a standard coder.
>>>    2. KafkaIO wraps the keyCoder with NullabeCoder directly if it
>>>    requires.
>>>
>>> Is that correct?
>>>
>>
>> Yeah.
>>
>>
>>>
>>>
>>> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> I think we should make NullableCoder a standard coder for Beam [1] and
>>>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might
>>>> be the standard ByteArrayCoder for example)
>>>> I think we have compatible Java and Python NullableCoder
>>>> implementations already so implementing this should be relatively
>>>> straightforward.
>>>>
>>>> Non-standard coders may not be supported by runners at the
>>>> cross-language boundary.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>>>>
>>>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <al...@google.com> wrote:
>>>>
>>>>> /cc folks who commented on the issue: @Robin Qiu <ro...@google.com> @Chamikara
>>>>> Jayalath <ch...@google.com> @Alexey Romanenko
>>>>> <ar...@gmail.com> @Daniel Collins <dp...@google.com>
>>>>>
>>>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <we...@google.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm working on [this issue](
>>>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She
>>>>>> was very helpful in identifying the issue which is that KafkaRecordCoder
>>>>>> couldn't handle the case when key is null.
>>>>>>
>>>>>> We came out with two potential solutions. Yet both have its pros and
>>>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to
>>>>>> handle this issue. For our solutions:
>>>>>>
>>>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>>>>> NullableCoder.of(keyCoder)
>>>>>>     cons: backwards compatibility problem
>>>>>>
>>>>>> 2. writing a completely new class named something like
>>>>>> NullableKeyKafkaRecordCoder
>>>>>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder
>>>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>>>>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>>>>
>>>>>>               - [L63] encode(...){
>>>>>>                        stringCoder.encode(topic, ...);
>>>>>>                        intCoder.encode(partition, ...);
>>>>>>                        longCoder.encode(offset, ...);
>>>>>>                        longCoder.encode(timestamp, ...);
>>>>>>                        intCoder.encode(timestamptype, ...);
>>>>>>                        headerCoder.encode(...)
>>>>>>                        if(Key!=null){
>>>>>>                               BooleanCoder.encode(false, ...);
>>>>>>                               KeyCoder.encode(key, ...);
>>>>>>                        }else{
>>>>>>                               BooleanCoder.encode(true, ...);
>>>>>>                               // skips KeyCoder when key is null
>>>>>>                        }
>>>>>>                       ValueCoder.encode(value, ...);
>>>>>>                 }
>>>>>>
>>>>>>               - [L74] decode(...){
>>>>>>                       return new KafkaRecord<>(
>>>>>>
>>>>>> stringCoder.decode(inStream),
>>>>>>                                             intCoder.decode(inStream),
>>>>>>
>>>>>> longCoder.decode(inStream),
>>>>>>
>>>>>> longCoder.decode(inStream),
>>>>>>
>>>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>>>>>                                             (Headers)
>>>>>> toHeaders(headerCoder.decode(inStream)),
>>>>>>
>>>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>>>>>
>>>>>> ValueCoder.decode(inStream)
>>>>>>                                             );
>>>>>>                 }
>>>>>>
>>>>>> Best regards,
>>>>>> Weiwen
>>>>>>
>>>>>

Re: [Question] Best Practice of Handling Null Key for KafkaRecordCoder

Posted by Boyuan Zhang <bo...@google.com>.
Considering the problem of populating KafkaRecord metadata(BEAM-12076
<https://issues.apache.org/jira/projects/BEAM/issues/BEAM-12076>) together,
what's the plan there? Are we going to make KafkaRecordCoder as a
well-known coder as well? The reason why I ask is because it might be a
good chance to revisit the KafkaRecordCoder implementation.

On Thu, Jun 3, 2021 at 2:17 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> Supporting the x-lang boundary is a good point. So you are suggesting
>> that:
>>
>>    1. We make NullableCoder as a standard coder.
>>    2. KafkaIO wraps the keyCoder with NullabeCoder directly if it
>>    requires.
>>
>> Is that correct?
>>
>
> Yeah.
>
>
>>
>>
>> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> I think we should make NullableCoder a standard coder for Beam [1] and
>>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might
>>> be the standard ByteArrayCoder for example)
>>> I think we have compatible Java and Python NullableCoder implementations
>>> already so implementing this should be relatively straightforward.
>>>
>>> Non-standard coders may not be supported by runners at the
>>> cross-language boundary.
>>>
>>> Thanks,
>>> Cham
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>>>
>>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <al...@google.com> wrote:
>>>
>>>> /cc folks who commented on the issue: @Robin Qiu <ro...@google.com> @Chamikara
>>>> Jayalath <ch...@google.com> @Alexey Romanenko
>>>> <ar...@gmail.com> @Daniel Collins <dp...@google.com>
>>>>
>>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <we...@google.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm working on [this issue](
>>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She
>>>>> was very helpful in identifying the issue which is that KafkaRecordCoder
>>>>> couldn't handle the case when key is null.
>>>>>
>>>>> We came out with two potential solutions. Yet both have its pros and
>>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to
>>>>> handle this issue. For our solutions:
>>>>>
>>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>>>> NullableCoder.of(keyCoder)
>>>>>     cons: backwards compatibility problem
>>>>>
>>>>> 2. writing a completely new class named something like
>>>>> NullableKeyKafkaRecordCoder
>>>>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder
>>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>>>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>>>
>>>>>               - [L63] encode(...){
>>>>>                        stringCoder.encode(topic, ...);
>>>>>                        intCoder.encode(partition, ...);
>>>>>                        longCoder.encode(offset, ...);
>>>>>                        longCoder.encode(timestamp, ...);
>>>>>                        intCoder.encode(timestamptype, ...);
>>>>>                        headerCoder.encode(...)
>>>>>                        if(Key!=null){
>>>>>                               BooleanCoder.encode(false, ...);
>>>>>                               KeyCoder.encode(key, ...);
>>>>>                        }else{
>>>>>                               BooleanCoder.encode(true, ...);
>>>>>                               // skips KeyCoder when key is null
>>>>>                        }
>>>>>                       ValueCoder.encode(value, ...);
>>>>>                 }
>>>>>
>>>>>               - [L74] decode(...){
>>>>>                       return new KafkaRecord<>(
>>>>>
>>>>> stringCoder.decode(inStream),
>>>>>                                             intCoder.decode(inStream),
>>>>>                                             longCoder.decode(inStream),
>>>>>                                             longCoder.decode(inStream),
>>>>>
>>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>>>>                                             (Headers)
>>>>> toHeaders(headerCoder.decode(inStream)),
>>>>>
>>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>>>>                                             ValueCoder.decode(inStream)
>>>>>                                             );
>>>>>                 }
>>>>>
>>>>> Best regards,
>>>>> Weiwen
>>>>>
>>>>

Re: [Question] Best Practice of Handling Null Key for KafkaRecordCoder

Posted by Chamikara Jayalath <ch...@google.com>.
On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang <bo...@google.com> wrote:

> Supporting the x-lang boundary is a good point. So you are suggesting that:
>
>    1. We make NullableCoder as a standard coder.
>    2. KafkaIO wraps the keyCoder with NullabeCoder directly if it
>    requires.
>
> Is that correct?
>

Yeah.


>
>
> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> I think we should make NullableCoder a standard coder for Beam [1] and
>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might
>> be the standard ByteArrayCoder for example)
>> I think we have compatible Java and Python NullableCoder implementations
>> already so implementing this should be relatively straightforward.
>>
>> Non-standard coders may not be supported by runners at the cross-language
>> boundary.
>>
>> Thanks,
>> Cham
>>
>> [1]
>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>>
>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <al...@google.com> wrote:
>>
>>> /cc folks who commented on the issue: @Robin Qiu <ro...@google.com> @Chamikara
>>> Jayalath <ch...@google.com> @Alexey Romanenko
>>> <ar...@gmail.com> @Daniel Collins <dp...@google.com>
>>>
>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <we...@google.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm working on [this issue](
>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was
>>>> very helpful in identifying the issue which is that KafkaRecordCoder
>>>> couldn't handle the case when key is null.
>>>>
>>>> We came out with two potential solutions. Yet both have its pros and
>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to
>>>> handle this issue. For our solutions:
>>>>
>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>>> NullableCoder.of(keyCoder)
>>>>     cons: backwards compatibility problem
>>>>
>>>> 2. writing a completely new class named something like
>>>> NullableKeyKafkaRecordCoder
>>>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder
>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>>
>>>>               - [L63] encode(...){
>>>>                        stringCoder.encode(topic, ...);
>>>>                        intCoder.encode(partition, ...);
>>>>                        longCoder.encode(offset, ...);
>>>>                        longCoder.encode(timestamp, ...);
>>>>                        intCoder.encode(timestamptype, ...);
>>>>                        headerCoder.encode(...)
>>>>                        if(Key!=null){
>>>>                               BooleanCoder.encode(false, ...);
>>>>                               KeyCoder.encode(key, ...);
>>>>                        }else{
>>>>                               BooleanCoder.encode(true, ...);
>>>>                               // skips KeyCoder when key is null
>>>>                        }
>>>>                       ValueCoder.encode(value, ...);
>>>>                 }
>>>>
>>>>               - [L74] decode(...){
>>>>                       return new KafkaRecord<>(
>>>>
>>>> stringCoder.decode(inStream),
>>>>                                             intCoder.decode(inStream),
>>>>                                             longCoder.decode(inStream),
>>>>                                             longCoder.decode(inStream),
>>>>
>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>>>                                             (Headers)
>>>> toHeaders(headerCoder.decode(inStream)),
>>>>
>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>>>                                             ValueCoder.decode(inStream)
>>>>                                             );
>>>>                 }
>>>>
>>>> Best regards,
>>>> Weiwen
>>>>
>>>

Re: [Question] Best Practice of Handling Null Key for KafkaRecordCoder

Posted by Boyuan Zhang <bo...@google.com>.
Supporting the x-lang boundary is a good point. So you are suggesting that:

   1. We make NullableCoder as a standard coder.
   2. KafkaIO wraps the keyCoder with NullabeCoder directly if it requires.

Is that correct?


On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath <ch...@google.com>
wrote:

> I think we should make NullableCoder a standard coder for Beam [1] and use
> a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might be
> the standard ByteArrayCoder for example)
> I think we have compatible Java and Python NullableCoder implementations
> already so implementing this should be relatively straightforward.
>
> Non-standard coders may not be supported by runners at the cross-language
> boundary.
>
> Thanks,
> Cham
>
> [1]
> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>
> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <al...@google.com> wrote:
>
>> /cc folks who commented on the issue: @Robin Qiu <ro...@google.com> @Chamikara
>> Jayalath <ch...@google.com> @Alexey Romanenko
>> <ar...@gmail.com> @Daniel Collins <dp...@google.com>
>>
>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <we...@google.com> wrote:
>>
>>> Hello,
>>>
>>> I'm working on [this issue](
>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was
>>> very helpful in identifying the issue which is that KafkaRecordCoder
>>> couldn't handle the case when key is null.
>>>
>>> We came out with two potential solutions. Yet both have its pros and
>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to
>>> handle this issue. For our solutions:
>>>
>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>> NullableCoder.of(keyCoder)
>>>     cons: backwards compatibility problem
>>>
>>> 2. writing a completely new class named something like
>>> NullableKeyKafkaRecordCoder
>>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder and
>>> ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>
>>>               - [L63] encode(...){
>>>                        stringCoder.encode(topic, ...);
>>>                        intCoder.encode(partition, ...);
>>>                        longCoder.encode(offset, ...);
>>>                        longCoder.encode(timestamp, ...);
>>>                        intCoder.encode(timestamptype, ...);
>>>                        headerCoder.encode(...)
>>>                        if(Key!=null){
>>>                               BooleanCoder.encode(false, ...);
>>>                               KeyCoder.encode(key, ...);
>>>                        }else{
>>>                               BooleanCoder.encode(true, ...);
>>>                               // skips KeyCoder when key is null
>>>                        }
>>>                       ValueCoder.encode(value, ...);
>>>                 }
>>>
>>>               - [L74] decode(...){
>>>                       return new KafkaRecord<>(
>>>                                             stringCoder.decode(inStream),
>>>                                             intCoder.decode(inStream),
>>>                                             longCoder.decode(inStream),
>>>                                             longCoder.decode(inStream),
>>>
>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>>                                             (Headers)
>>> toHeaders(headerCoder.decode(inStream)),
>>>
>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>>                                             ValueCoder.decode(inStream)
>>>                                             );
>>>                 }
>>>
>>> Best regards,
>>> Weiwen
>>>
>>

Re: [Question] Best Practice of Handling Null Key for KafkaRecordCoder

Posted by Chamikara Jayalath <ch...@google.com>.
I think we should make NullableCoder a standard coder for Beam [1] and use
a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might be
the standard ByteArrayCoder for example)
I think we have compatible Java and Python NullableCoder implementations
already so implementing this should be relatively straightforward.

Non-standard coders may not be supported by runners at the cross-language
boundary.

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784

On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay <al...@google.com> wrote:

> /cc folks who commented on the issue: @Robin Qiu <ro...@google.com> @Chamikara
> Jayalath <ch...@google.com> @Alexey Romanenko
> <ar...@gmail.com> @Daniel Collins <dp...@google.com>
>
> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <we...@google.com> wrote:
>
>> Hello,
>>
>> I'm working on [this issue](
>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was
>> very helpful in identifying the issue which is that KafkaRecordCoder
>> couldn't handle the case when key is null.
>>
>> We came out with two potential solutions. Yet both have its pros and cons
>> so I'm hoping to gather some suggestions/opinions or ideas of how to handle
>> this issue. For our solutions:
>>
>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>> NullableCoder.of(keyCoder)
>>     cons: backwards compatibility problem
>>
>> 2. writing a completely new class named something like
>> NullableKeyKafkaRecordCoder
>>     instead of using KVCoder and encode/decode KVs, we have KeyCoder and
>> ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>> present of null key. If key is null, KeyCoder will not encode/decode.
>>
>>               - [L63] encode(...){
>>                        stringCoder.encode(topic, ...);
>>                        intCoder.encode(partition, ...);
>>                        longCoder.encode(offset, ...);
>>                        longCoder.encode(timestamp, ...);
>>                        intCoder.encode(timestamptype, ...);
>>                        headerCoder.encode(...)
>>                        if(Key!=null){
>>                               BooleanCoder.encode(false, ...);
>>                               KeyCoder.encode(key, ...);
>>                        }else{
>>                               BooleanCoder.encode(true, ...);
>>                               // skips KeyCoder when key is null
>>                        }
>>                       ValueCoder.encode(value, ...);
>>                 }
>>
>>               - [L74] decode(...){
>>                       return new KafkaRecord<>(
>>                                             stringCoder.decode(inStream),
>>                                             intCoder.decode(inStream),
>>                                             longCoder.decode(inStream),
>>                                             longCoder.decode(inStream),
>>
>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>                                             (Headers)
>> toHeaders(headerCoder.decode(inStream)),
>>
>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>                                             ValueCoder.decode(inStream)
>>                                             );
>>                 }
>>
>> Best regards,
>> Weiwen
>>
>

Re: [Question] Best Practice of Handling Null Key for KafkaRecordCoder

Posted by Ahmet Altay <al...@google.com>.
/cc folks who commented on the issue: @Robin Qiu <ro...@google.com>
@Chamikara
Jayalath <ch...@google.com> @Alexey Romanenko <ar...@gmail.com>
 @Daniel Collins <dp...@google.com>

On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu <we...@google.com> wrote:

> Hello,
>
> I'm working on [this issue](
> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was
> very helpful in identifying the issue which is that KafkaRecordCoder
> couldn't handle the case when key is null.
>
> We came out with two potential solutions. Yet both have its pros and cons
> so I'm hoping to gather some suggestions/opinions or ideas of how to handle
> this issue. For our solutions:
>
> 1. directly wrapping the keyCoder with Nullablecoder i.e.
> NullableCoder.of(keyCoder)
>     cons: backwards compatibility problem
>
> 2. writing a completely new class named something like
> NullableKeyKafkaRecordCoder
>     instead of using KVCoder and encode/decode KVs, we have KeyCoder and
> ValueCoder as fields and another BooleanCoder to encode/decode T/F for
> present of null key. If key is null, KeyCoder will not encode/decode.
>
>               - [L63] encode(...){
>                        stringCoder.encode(topic, ...);
>                        intCoder.encode(partition, ...);
>                        longCoder.encode(offset, ...);
>                        longCoder.encode(timestamp, ...);
>                        intCoder.encode(timestamptype, ...);
>                        headerCoder.encode(...)
>                        if(Key!=null){
>                               BooleanCoder.encode(false, ...);
>                               KeyCoder.encode(key, ...);
>                        }else{
>                               BooleanCoder.encode(true, ...);
>                               // skips KeyCoder when key is null
>                        }
>                       ValueCoder.encode(value, ...);
>                 }
>
>               - [L74] decode(...){
>                       return new KafkaRecord<>(
>                                             stringCoder.decode(inStream),
>                                             intCoder.decode(inStream),
>                                             longCoder.decode(inStream),
>                                             longCoder.decode(inStream),
>
> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>                                             (Headers)
> toHeaders(headerCoder.decode(inStream)),
>                                             BooleanCoder.decode(inStream)?
> null:KeyCoder.decode(inStream),
>                                             ValueCoder.decode(inStream)
>                                             );
>                 }
>
> Best regards,
> Weiwen
>