You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/01 06:34:35 UTC
[inlong] branch master updated: [INLONG-7299][Sort] Optimize the options check for the Kafka connector (#7471)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 711685673 [INLONG-7299][Sort] Optimize the options check for the Kafka connector (#7471)
711685673 is described below
commit 7116856732e142037fbb7056005c8513a6b2455f
Author: Charles Zhang <do...@apache.org>
AuthorDate: Wed Mar 1 14:34:29 2023 +0800
[INLONG-7299][Sort] Optimize the options check for the Kafka connector (#7471)
---
.../protocol/node/extract/KafkaExtractNode.java | 72 ++++------------------
.../sort/kafka/table/KafkaDynamicTableFactory.java | 7 ++-
2 files changed, 16 insertions(+), 63 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 6a0501759..9713a83fc 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -18,8 +18,6 @@
package org.apache.inlong.sort.protocol.node.extract;
import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.Map.Entry;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.commons.lang3.StringUtils;
@@ -35,14 +33,7 @@ import org.apache.inlong.sort.protocol.Metadata;
import org.apache.inlong.sort.protocol.constant.KafkaConstant;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.format.AvroFormat;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.CsvFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
-import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
-import org.apache.inlong.sort.protocol.node.format.JsonFormat;
-import org.apache.inlong.sort.protocol.node.format.RawFormat;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
import javax.annotation.Nonnull;
@@ -155,63 +146,22 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
Map<String, String> options = super.tableOptions();
options.put(KafkaConstant.TOPIC, topic);
options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, bootstrapServers);
-
- boolean wrapWithInlongMsg = format instanceof InLongMsgFormat;
- Format realFormat = wrapWithInlongMsg ? ((InLongMsgFormat) format).getInnerFormat() : format;
- if (realFormat instanceof JsonFormat
- || realFormat instanceof AvroFormat
- || realFormat instanceof CsvFormat) {
- if (StringUtils.isEmpty(this.primaryKey)) {
- options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
- options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
- if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
- options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
- }
- if (StringUtils.isNotBlank(scanTimestampMillis)) {
- options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis);
- }
- options.putAll(delegateInlongFormat(realFormat.generateOptions(false), wrapWithInlongMsg));
- } else {
- options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
- options.putAll(delegateInlongFormat(realFormat.generateOptions(true), wrapWithInlongMsg));
- }
- } else if (realFormat instanceof CanalJsonFormat
- || realFormat instanceof DebeziumJsonFormat
- || realFormat instanceof RawFormat) {
+ options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
+ if (StringUtils.isEmpty(this.primaryKey)) {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
- options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
- if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
- options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
- }
- if (StringUtils.isNotBlank(scanTimestampMillis)) {
- options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis);
- }
- options.putAll(delegateInlongFormat(realFormat.generateOptions(false), wrapWithInlongMsg));
+ options.putAll(format.generateOptions(false));
} else {
- throw new IllegalArgumentException("kafka extract node format is IllegalArgument");
+ options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
+ options.putAll(format.generateOptions(true));
}
- if (StringUtils.isNotEmpty(groupId)) {
- options.put(KafkaConstant.PROPERTIES_GROUP_ID, groupId);
+ if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
+ options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
}
- return options;
- }
-
- private Map<String, String> delegateInlongFormat(
- Map<String, String> realOptions,
- boolean wrapWithInlongMsg) {
- if (!wrapWithInlongMsg) {
- return realOptions;
+ if (StringUtils.isNotBlank(scanTimestampMillis)) {
+ options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis);
}
- Map<String, String> options = new HashMap<>();
- for (Entry<String, String> entry : realOptions.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- if ("format".equals(key)) {
- options.put("format", "inlong-msg");
- options.put("inlong-msg.inner.format", value);
- } else {
- options.put("inlong-msg." + key, value);
- }
+ if (StringUtils.isNotEmpty(groupId)) {
+ options.put(KafkaConstant.PROPERTIES_GROUP_ID, groupId);
}
return options;
}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 0db49fd17..d92d5cfed 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -326,10 +326,13 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna
final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
getValueDecodingFormat(helper);
- helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX);
+ final String valueFormatPrefix = tableOptions.getOptional(FORMAT)
+ .orElse(tableOptions.get(VALUE_FORMAT));
+ // Validate the option data type.
+ helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX, valueFormatPrefix);
+ // Validate the option values.
validateTableSourceOptions(tableOptions);
-
validatePKConstraints(
context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);