You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/02 11:31:09 UTC

[inlong] branch master updated: [INLONG-7485][Sort] Kafka extract node decide connector option by format (#7486)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f0a4c843 [INLONG-7485][Sort] Kafka extract node decide connector option by format (#7486)
9f0a4c843 is described below

commit 9f0a4c8432854e6de7b7f250eb5d41543b4be05d
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Thu Mar 2 19:31:03 2023 +0800

    [INLONG-7485][Sort] Kafka extract node decide connector option by format (#7486)
---
 .../protocol/node/extract/KafkaExtractNode.java    | 26 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 9713a83fc..63c65f31b 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -33,7 +33,10 @@ import org.apache.inlong.sort.protocol.Metadata;
 import org.apache.inlong.sort.protocol.constant.KafkaConstant;
 import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.format.AvroFormat;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
 
 import javax.annotation.Nonnull;
@@ -147,12 +150,12 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
         options.put(KafkaConstant.TOPIC, topic);
         options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, bootstrapServers);
         options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
-        if (StringUtils.isEmpty(this.primaryKey)) {
-            options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
-            options.putAll(format.generateOptions(false));
-        } else {
+        if (isUpsertKafkaConnector(format, !StringUtils.isEmpty(this.primaryKey))) {
             options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
             options.putAll(format.generateOptions(true));
+        } else {
+            options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
+            options.putAll(format.generateOptions(false));
         }
         if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
             options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
@@ -166,6 +169,21 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
         return options;
     }
 
+    /**
+     * true is upsert kafka connector
+     * false is kafka connector
+     * @return Boolean variable that decides connector option
+     */
+    private boolean isUpsertKafkaConnector(Format format, boolean hasPrimaryKey) {
+        if (format instanceof JsonFormat && hasPrimaryKey) {
+            return true;
+        } else if (format instanceof CsvFormat && hasPrimaryKey) {
+            return true;
+        } else {
+            return format instanceof AvroFormat && hasPrimaryKey;
+        }
+    }
+
     @Override
     public String genTableName() {
         return String.format("table_%s", super.getId());