You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/20 06:58:56 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5940][Sort] Compatible with the old version of sort protocol with mysql and kafka extract node (#5945)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 7f002a390 [INLONG-5940][Sort] Compatible with the old version of sort protocol with mysql and kafka extract node (#5945)
7f002a390 is described below

commit 7f002a3902d405f1ca90276b1f4e64da444f3f01
Author: Charles <44...@users.noreply.github.com>
AuthorDate: Tue Sep 20 14:46:33 2022 +0800

    [INLONG-5940][Sort] Compatible with the old version of sort protocol with mysql and kafka extract node (#5945)
---
 .../protocol/node/extract/KafkaExtractNode.java    | 22 +++++++++++++++++++++-
 .../protocol/node/extract/MySqlExtractNode.java    | 15 ++++++---------
 2 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index f9848f4bd..4ef7ec9aa 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -22,6 +22,8 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.inlong.common.enums.MetaField;
@@ -52,6 +54,7 @@ import java.util.Set;
  */
 @EqualsAndHashCode(callSuper = true)
 @JsonTypeName("kafkaExtract")
+@JsonInclude(Include.NON_NULL)
 @Data
 public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metadata, Serializable {
 
@@ -79,6 +82,21 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
     @JsonProperty("scanSpecificOffsets")
     private String scanSpecificOffsets;
 
+    public KafkaExtractNode(@JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @Nullable @JsonProperty("watermarkField") WatermarkField watermarkField,
+            @JsonProperty("properties") Map<String, String> properties,
+            @Nonnull @JsonProperty("topic") String topic,
+            @Nonnull @JsonProperty("bootstrapServers") String bootstrapServers,
+            @Nonnull @JsonProperty("format") Format format,
+            @JsonProperty("scanStartupMode") KafkaScanStartupMode kafkaScanStartupMode,
+            @JsonProperty("primaryKey") String primaryKey,
+            @JsonProperty("groupId") String groupId) {
+        this(id, name, fields, watermarkField, properties, topic, bootstrapServers, format, kafkaScanStartupMode,
+                primaryKey, groupId, null);
+    }
+
     @JsonCreator
     public KafkaExtractNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
@@ -130,7 +148,9 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
         } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
             options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
             options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
-            options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
+            if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
+                options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, scanSpecificOffsets);
+            }
             options.putAll(format.generateOptions(false));
         } else {
             throw new IllegalArgumentException("kafka extract node format is IllegalArgument");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
index b1d0f2783..62bc19530 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
@@ -47,12 +47,12 @@ import java.util.Set;
  */
 @EqualsAndHashCode(callSuper = true)
 @JsonTypeName("mysqlExtract")
+@JsonInclude(Include.NON_NULL)
 @Data
 public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMetric, Serializable {
 
     private static final long serialVersionUID = -5521981462461235277L;
 
-    @JsonInclude(Include.NON_NULL)
     @JsonProperty("primaryKey")
     private String primaryKey;
     @JsonProperty("tableNames")
@@ -66,16 +66,12 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
     private String password;
     @JsonProperty("database")
     private String database;
-    @JsonInclude(Include.NON_NULL)
     @JsonProperty("port")
     private Integer port;
-    @JsonInclude(Include.NON_NULL)
     @JsonProperty("serverId")
     private Integer serverId;
-    @JsonInclude(Include.NON_NULL)
     @JsonProperty("incrementalSnapshotEnabled")
     private Boolean incrementalSnapshotEnabled;
-    @JsonInclude(Include.NON_NULL)
     @JsonProperty("serverTimeZone")
     private String serverTimeZone;
     @Nonnull
@@ -193,13 +189,14 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
         super(id, name, fields, watermarkField, properties);
         this.tableNames = Preconditions.checkNotNull(tableNames, "tableNames is null");
         Preconditions.checkState(!tableNames.isEmpty(), "tableNames is empty");
-        if (extractMode == ExtractMode.CDC) {
-            this.hostname = Preconditions.checkNotNull(hostname, "hostname is null");
-            this.database = Preconditions.checkNotNull(database, "database is null");
-        } else {
+        if (extractMode == ExtractMode.SCAN) {
             this.hostname = hostname;
             this.database = database;
             this.url = Preconditions.checkNotNull(url, "url is null");
+        } else {
+            extractMode = ExtractMode.CDC;
+            this.hostname = Preconditions.checkNotNull(hostname, "hostname is null");
+            this.database = Preconditions.checkNotNull(database, "database is null");
         }
         this.username = Preconditions.checkNotNull(username, "username is null");
         this.password = Preconditions.checkNotNull(password, "password is null");