You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/02 11:31:09 UTC
[inlong] branch master updated: [INLONG-7485][Sort] Kafka extract node decide connector option by format (#7486)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9f0a4c843 [INLONG-7485][Sort] Kafka extract node decide connector option by format (#7486)
9f0a4c843 is described below
commit 9f0a4c8432854e6de7b7f250eb5d41543b4be05d
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Thu Mar 2 19:31:03 2023 +0800
[INLONG-7485][Sort] Kafka extract node decide connector option by format (#7486)
---
.../protocol/node/extract/KafkaExtractNode.java | 26 ++++++++++++++++++----
1 file changed, 22 insertions(+), 4 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 9713a83fc..63c65f31b 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -33,7 +33,10 @@ import org.apache.inlong.sort.protocol.Metadata;
import org.apache.inlong.sort.protocol.constant.KafkaConstant;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.format.AvroFormat;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
import javax.annotation.Nonnull;
@@ -147,12 +150,12 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
options.put(KafkaConstant.TOPIC, topic);
options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, bootstrapServers);
options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
- if (StringUtils.isEmpty(this.primaryKey)) {
- options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
- options.putAll(format.generateOptions(false));
- } else {
+ if (isUpsertKafkaConnector(format, !StringUtils.isEmpty(this.primaryKey))) {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
options.putAll(format.generateOptions(true));
+ } else {
+ options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
+ options.putAll(format.generateOptions(false));
}
if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
@@ -166,6 +169,21 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
return options;
}
+ /**
+ * true is upsert kafka connector
+ * false is kafka connector
+ * @return Boolean variable that decides connector option
+ */
+ private boolean isUpsertKafkaConnector(Format format, boolean hasPrimaryKey) {
+ if (format instanceof JsonFormat && hasPrimaryKey) {
+ return true;
+ } else if (format instanceof CsvFormat && hasPrimaryKey) {
+ return true;
+ } else {
+ return format instanceof AvroFormat && hasPrimaryKey;
+ }
+ }
+
@Override
public String genTableName() {
return String.format("table_%s", super.getId());