You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/20 08:47:08 UTC
[incubator-seatunnel] branch dev updated: update (#3150)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2b4499275 update (#3150)
2b4499275 is described below
commit 2b44992750ec6a751ccec0f8d3d76731f144c9d7
Author: TaoZex <45...@users.noreply.github.com>
AuthorDate: Thu Oct 20 16:47:03 2022 +0800
update (#3150)
---
.../seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java | 3 +++
1 file changed, 3 insertions(+)
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 9712ba331..583dc6c81 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -190,6 +190,9 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
SeaTunnelRowType seaTunnelRowType) {
+ if (!pluginConfig.hasPath(PARTITION_KEY)){
+ return row -> null;
+ }
String partitionKey = pluginConfig.getString(PARTITION_KEY);
List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
if (!fieldNames.contains(partitionKey)) {