You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/20 08:47:08 UTC

[incubator-seatunnel] branch dev updated: update (#3150)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2b4499275 update (#3150)
2b4499275 is described below

commit 2b44992750ec6a751ccec0f8d3d76731f144c9d7
Author: TaoZex <45...@users.noreply.github.com>
AuthorDate: Thu Oct 20 16:47:03 2022 +0800

    update (#3150)
---
 .../seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java     | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 9712ba331..583dc6c81 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -190,6 +190,9 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
 
     private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
                                                                     SeaTunnelRowType seaTunnelRowType) {
+        if (!pluginConfig.hasPath(PARTITION_KEY)){
+            return row -> null;
+        }
         String partitionKey = pluginConfig.getString(PARTITION_KEY);
         List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
         if (!fieldNames.contains(partitionKey)) {