You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/24 04:27:03 UTC

[flink-statefun] 04/04: [hotfix] [kafka] Non-set Kafka keys should be empty strings

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 34bcee8ffe5dff7c52ab0551021c82334fb15e98
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:23:04 2020 +0800

    [hotfix] [kafka] Non-set Kafka keys should be empty strings
---
 .../flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
index 19af022..e20bdf1 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
@@ -61,7 +61,7 @@ public final class GenericKafkaEgressSerializer implements KafkaEgressSerializer
     final String topic = protobufProducerRecord.getTopic();
     final byte[] valueBytes = protobufProducerRecord.getValueBytes().toByteArray();
 
-    if (key == null) {
+    if (key == null || key.isEmpty()) {
       return new ProducerRecord<>(topic, valueBytes);
     } else {
       return new ProducerRecord<>(topic, key.getBytes(StandardCharsets.UTF_8), valueBytes);