You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Daniel Collins (Jira)" <ji...@apache.org> on 2021/03/17 19:19:00 UTC

[jira] [Created] (BEAM-12008) KafkaIO does not handle null keys

Daniel Collins created BEAM-12008:
-------------------------------------

             Summary: KafkaIO does not handle null keys
                 Key: BEAM-12008
                 URL: https://issues.apache.org/jira/browse/BEAM-12008
             Project: Beam
          Issue Type: Improvement
          Components: io-java-kafka
            Reporter: Daniel Collins


Kafka [ConsumerRecord|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html#key--] and [ProducerRecord|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#key--] 'key' fields are explicitly allowed to be null. In addition, on the producer side, setting a null key is the way that the user indicates that they want a [random partition for their message|[https://github.com/apache/kafka/blob/9adfac280392da0837cfd8d582bc540951e94087/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L67].]

 

Beam KafkaIO does not support null keys in byte[] mode (read side: [https://github.com/apache/beam/blob/9e0997760cf3320f1a1d0c4342d3dff559a25775/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L727|https://github.com/apache/beam/blob/9e0997760cf3320f1a1d0c4342d3dff559a25775/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L727)]

write side: [https://github.com/apache/beam/blob/9e0997760cf3320f1a1d0c4342d3dff559a25775/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java#L58])

 

since it would defer to ByteArrayCoder which does not support null arrays.

 

BeamKafkaTable suffers the same issue https://github.com/apache/beam/blob/9e0997760cf3320f1a1d0c4342d3dff559a25775/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java#L144



--
This message was sent by Atlassian Jira
(v8.3.4#803005)