You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/01 06:34:35 UTC

[inlong] branch master updated: [INLONG-7299][Sort] Optimize the options check for the Kafka connector (#7471)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 711685673 [INLONG-7299][Sort] Optimize the options check for the Kafka connector (#7471)
711685673 is described below

commit 7116856732e142037fbb7056005c8513a6b2455f
Author: Charles Zhang <do...@apache.org>
AuthorDate: Wed Mar 1 14:34:29 2023 +0800

    [INLONG-7299][Sort] Optimize the options check for the Kafka connector (#7471)
---
 .../protocol/node/extract/KafkaExtractNode.java    | 72 ++++------------------
 .../sort/kafka/table/KafkaDynamicTableFactory.java |  7 ++-
 2 files changed, 16 insertions(+), 63 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 6a0501759..9713a83fc 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -18,8 +18,6 @@
 package org.apache.inlong.sort.protocol.node.extract;
 
 import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.Map.Entry;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.commons.lang3.StringUtils;
@@ -35,14 +33,7 @@ import org.apache.inlong.sort.protocol.Metadata;
 import org.apache.inlong.sort.protocol.constant.KafkaConstant;
 import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.format.AvroFormat;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.CsvFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
-import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
-import org.apache.inlong.sort.protocol.node.format.JsonFormat;
-import org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
 
 import javax.annotation.Nonnull;
@@ -155,63 +146,22 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
         Map<String, String> options = super.tableOptions();
         options.put(KafkaConstant.TOPIC, topic);
         options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, bootstrapServers);
-
-        boolean wrapWithInlongMsg = format instanceof InLongMsgFormat;
-        Format realFormat = wrapWithInlongMsg ? ((InLongMsgFormat) format).getInnerFormat() : format;
-        if (realFormat instanceof JsonFormat
-                || realFormat instanceof AvroFormat
-                || realFormat instanceof CsvFormat) {
-            if (StringUtils.isEmpty(this.primaryKey)) {
-                options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
-                options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
-                if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
-                    options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
-                }
-                if (StringUtils.isNotBlank(scanTimestampMillis)) {
-                    options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis);
-                }
-                options.putAll(delegateInlongFormat(realFormat.generateOptions(false), wrapWithInlongMsg));
-            } else {
-                options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
-                options.putAll(delegateInlongFormat(realFormat.generateOptions(true), wrapWithInlongMsg));
-            }
-        } else if (realFormat instanceof CanalJsonFormat
-                || realFormat instanceof DebeziumJsonFormat
-                || realFormat instanceof RawFormat) {
+        options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
+        if (StringUtils.isEmpty(this.primaryKey)) {
             options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
-            options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
-            if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
-                options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
-            }
-            if (StringUtils.isNotBlank(scanTimestampMillis)) {
-                options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis);
-            }
-            options.putAll(delegateInlongFormat(realFormat.generateOptions(false), wrapWithInlongMsg));
+            options.putAll(format.generateOptions(false));
         } else {
-            throw new IllegalArgumentException("kafka extract node format is IllegalArgument");
+            options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
+            options.putAll(format.generateOptions(true));
         }
-        if (StringUtils.isNotEmpty(groupId)) {
-            options.put(KafkaConstant.PROPERTIES_GROUP_ID, groupId);
+        if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
+            options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
         }
-        return options;
-    }
-
-    private Map<String, String> delegateInlongFormat(
-            Map<String, String> realOptions,
-            boolean wrapWithInlongMsg) {
-        if (!wrapWithInlongMsg) {
-            return realOptions;
+        if (StringUtils.isNotBlank(scanTimestampMillis)) {
+            options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis);
         }
-        Map<String, String> options = new HashMap<>();
-        for (Entry<String, String> entry : realOptions.entrySet()) {
-            String key = entry.getKey();
-            String value = entry.getValue();
-            if ("format".equals(key)) {
-                options.put("format", "inlong-msg");
-                options.put("inlong-msg.inner.format", value);
-            } else {
-                options.put("inlong-msg." + key, value);
-            }
+        if (StringUtils.isNotEmpty(groupId)) {
+            options.put(KafkaConstant.PROPERTIES_GROUP_ID, groupId);
         }
         return options;
     }
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 0db49fd17..d92d5cfed 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -326,10 +326,13 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna
         final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
                 getValueDecodingFormat(helper);
 
-        helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX);
+        final String valueFormatPrefix = tableOptions.getOptional(FORMAT)
+                .orElse(tableOptions.get(VALUE_FORMAT));
 
+        // Validate the option data type.
+        helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX, valueFormatPrefix);
+        // Validate the option values.
         validateTableSourceOptions(tableOptions);
-
         validatePKConstraints(
                 context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);