You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/20 06:58:56 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5940][Sort] Compatible with the old version of sort protocol with mysql and kafka extract node (#5945)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 7f002a390 [INLONG-5940][Sort] Compatible with the old version of sort protocol with mysql and kafka extract node (#5945)
7f002a390 is described below
commit 7f002a3902d405f1ca90276b1f4e64da444f3f01
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Tue Sep 20 14:46:33 2022 +0800
[INLONG-5940][Sort] Compatible with the old version of sort protocol with mysql and kafka extract node (#5945)
---
.../protocol/node/extract/KafkaExtractNode.java | 22 +++++++++++++++++++++-
.../protocol/node/extract/MySqlExtractNode.java | 15 ++++++---------
2 files changed, 27 insertions(+), 10 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index f9848f4bd..4ef7ec9aa 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -22,6 +22,8 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.common.enums.MetaField;
@@ -52,6 +54,7 @@ import java.util.Set;
*/
@EqualsAndHashCode(callSuper = true)
@JsonTypeName("kafkaExtract")
+@JsonInclude(Include.NON_NULL)
@Data
public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metadata, Serializable {
@@ -79,6 +82,21 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
@JsonProperty("scanSpecificOffsets")
private String scanSpecificOffsets;
+ public KafkaExtractNode(@JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @Nullable @JsonProperty("watermarkField") WatermarkField watermarkField,
+ @JsonProperty("properties") Map<String, String> properties,
+ @Nonnull @JsonProperty("topic") String topic,
+ @Nonnull @JsonProperty("bootstrapServers") String bootstrapServers,
+ @Nonnull @JsonProperty("format") Format format,
+ @JsonProperty("scanStartupMode") KafkaScanStartupMode kafkaScanStartupMode,
+ @JsonProperty("primaryKey") String primaryKey,
+ @JsonProperty("groupId") String groupId) {
+ this(id, name, fields, watermarkField, properties, topic, bootstrapServers, format, kafkaScanStartupMode,
+ primaryKey, groupId, null);
+ }
+
@JsonCreator
public KafkaExtractNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@@ -130,7 +148,9 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
} else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
- options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
+ if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
+ options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
+ }
options.putAll(format.generateOptions(false));
} else {
throw new IllegalArgumentException("kafka extract node format is IllegalArgument");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
index b1d0f2783..62bc19530 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
@@ -47,12 +47,12 @@ import java.util.Set;
*/
@EqualsAndHashCode(callSuper = true)
@JsonTypeName("mysqlExtract")
+@JsonInclude(Include.NON_NULL)
@Data
public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMetric, Serializable {
private static final long serialVersionUID = -5521981462461235277L;
- @JsonInclude(Include.NON_NULL)
@JsonProperty("primaryKey")
private String primaryKey;
@JsonProperty("tableNames")
@@ -66,16 +66,12 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
private String password;
@JsonProperty("database")
private String database;
- @JsonInclude(Include.NON_NULL)
@JsonProperty("port")
private Integer port;
- @JsonInclude(Include.NON_NULL)
@JsonProperty("serverId")
private Integer serverId;
- @JsonInclude(Include.NON_NULL)
@JsonProperty("incrementalSnapshotEnabled")
private Boolean incrementalSnapshotEnabled;
- @JsonInclude(Include.NON_NULL)
@JsonProperty("serverTimeZone")
private String serverTimeZone;
@Nonnull
@@ -193,13 +189,14 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
super(id, name, fields, watermarkField, properties);
this.tableNames = Preconditions.checkNotNull(tableNames, "tableNames is null");
Preconditions.checkState(!tableNames.isEmpty(), "tableNames is empty");
- if (extractMode == ExtractMode.CDC) {
- this.hostname = Preconditions.checkNotNull(hostname, "hostname is null");
- this.database = Preconditions.checkNotNull(database, "database is null");
- } else {
+ if (extractMode == ExtractMode.SCAN) {
this.hostname = hostname;
this.database = database;
this.url = Preconditions.checkNotNull(url, "url is null");
+ } else {
+ extractMode = ExtractMode.CDC;
+ this.hostname = Preconditions.checkNotNull(hostname, "hostname is null");
+ this.database = Preconditions.checkNotNull(database, "database is null");
}
this.username = Preconditions.checkNotNull(username, "username is null");
this.password = Preconditions.checkNotNull(password, "password is null");