You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/24 04:27:03 UTC
[flink-statefun] 04/04: [hotfix] [kafka] Non-set Kafka keys should
be empty strings
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 34bcee8ffe5dff7c52ab0551021c82334fb15e98
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:23:04 2020 +0800
[hotfix] [kafka] Non-set Kafka keys should be empty strings
---
.../flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
index 19af022..e20bdf1 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
@@ -61,7 +61,7 @@ public final class GenericKafkaEgressSerializer implements KafkaEgressSerializer
final String topic = protobufProducerRecord.getTopic();
final byte[] valueBytes = protobufProducerRecord.getValueBytes().toByteArray();
- if (key == null) {
+ if (key == null || key.isEmpty()) {
return new ProducerRecord<>(topic, valueBytes);
} else {
return new ProducerRecord<>(topic, key.getBytes(StandardCharsets.UTF_8), valueBytes);