You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/25 10:53:59 UTC

[incubator-inlong] branch master updated: [INLONG-2715][Manager] Support more parameters for the Binlog source entity (#2726)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bd11bcd  [INLONG-2715][Manager] Support more parameters for the Binlog source entity (#2726)
bd11bcd is described below

commit bd11bcdfac5aa96189c83be349990dadfe70bdfd
Author: healchow <he...@gmail.com>
AuthorDate: Fri Feb 25 18:53:40 2022 +0800

    [INLONG-2715][Manager] Support more parameters for the Binlog source entity (#2726)
---
 .../manager/common/pojo/source/SourceRequest.java  |  7 +-
 .../manager/common/pojo/source/SourceResponse.java |  4 +-
 .../common/pojo/source/binlog/BinlogSourceDTO.java | 92 ++++++++++------------
 .../source/binlog/BinlogSourceListResponse.java    | 30 ++++---
 .../pojo/source/binlog/BinlogSourceRequest.java    | 81 ++++++++++---------
 .../pojo/source/binlog/BinlogSourceResponse.java   | 52 +++++-------
 .../pojo/source/kafka/KafkaSourceRequest.java      |  2 -
 7 files changed, 120 insertions(+), 148 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index f773a6e..be23e24 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -65,7 +65,10 @@ public class SourceRequest {
     @ApiModelProperty("Name of the cluster that collected this source")
     private String clusterName;
 
-    @ApiModelProperty("Heartbeat of the source task")
-    private String heartbeat;
+    @ApiModelProperty("Snapshot of the source task")
+    private String snapshot;
+
+    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+    private String serializationType;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
index d221846..4d90669 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
@@ -60,8 +60,8 @@ public class SourceResponse {
     @ApiModelProperty("Name of the cluster that collected this source")
     private String clusterName;
 
-    @ApiModelProperty("Heartbeat of this source task")
-    private String heartbeat;
+    @ApiModelProperty("Snapshot of this source task")
+    private String snapshot;
 
     @ApiModelProperty("Status")
     private Integer status;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
index 286c173..a7b54c1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
@@ -40,70 +40,60 @@ public class BinlogSourceDTO {
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
 
-    @ApiModelProperty("Source database name")
-    private String dbName;
+    @ApiModelProperty("Username of the DB server")
+    private String user;
 
-    @ApiModelProperty("Source table name")
-    private String tableName;
+    @ApiModelProperty("Password of the DB server")
+    private String password;
 
-    @ApiModelProperty("Data charset")
-    private String charset;
+    @ApiModelProperty("Hostname of the DB server")
+    private String hostname;
 
-    @ApiModelProperty(value = "Table fields, separated by commas")
-    private String tableFields;
+    @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
+            + "separate them with commas, for example: db1.tb1,db2.tb2",
+            notes = "DBs not in this list are excluded. By default, all DBs are monitored")
+    private String whitelist;
 
-    @ApiModelProperty(value = "Data separator, default is 0x01")
-    private String dataSeparator = "0x01";
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String timeZone;
 
-    @ApiModelProperty(value = "Middleware type, such as: TUBE, PULSAR")
-    private String middlewareType;
+    @ApiModelProperty("The interval for recording an offset")
+    private String intervalMs;
 
-    @ApiModelProperty(value = "Topic of Tube")
-    private String tubeTopic;
-
-    @ApiModelProperty(value = "Cluster address of Tube")
-    private String tubeCluster;
-
-    @ApiModelProperty(value = "Namespace of Pulsar")
-    private String pulsarNamespace;
-
-    @ApiModelProperty(value = "Topic of Pulsar")
-    private String pulsarTopic;
-
-    @ApiModelProperty(value = "Cluster address of Pulsar")
-    private String pulsarCluster;
-
-    @ApiModelProperty(value = "Whether to skip delete events in binlog, default: 1, that is skip")
-    private Integer skipDelete;
-
-    @ApiModelProperty(value = "Collect starts from the specified binlog location, and it is modified after delivery."
-            + "If it is empty, an empty string is returned")
-    private String startPosition;
+    /**
+     * <code>initial</code>: Default mode, do a snapshot when no offset is found.
+     * <p/>
+     * <code>when_needed</code>: Similar to initial, do a snapshot when the binlog position
+     * has been purged on the DB server.
+     * <p/>
+     * <code>never</code>: Do not snapshot.
+     * <p/>
+     * <code>schema_only</code>: All tables' column name will be taken, but the table data will not be exported,
+     * and it will only be consumed from the end of the binlog at the task is started.
+     * So it is very suitable for not caring about historical data, but only about recent changes. the
+     * <p/>
+     * <code>schema_only_recovery</code>: When <code>schema_only</code> mode fails, use this mode to recover, which is
+     * generally not used.
+     */
+    @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
+    private String snapshotMode;
 
-    @ApiModelProperty(value = "When the field value is null, the replaced field defaults to 'null'")
-    private String nullFieldChar;
+    @ApiModelProperty("The file path to store history info")
+    private String storeHistoryFilename;
 
     /**
      * Get the dto instance from the request
      */
     public static BinlogSourceDTO getFromRequest(BinlogSourceRequest request) {
         return BinlogSourceDTO.builder()
-                .dbName(request.getDbName())
-                .tableName(request.getTableName())
-                .charset(request.getCharset())
-                .dbName(request.getDbName())
-                .tableName(request.getTableName())
-                .tableFields(request.getTableFields())
-                .dataSeparator(request.getDataSeparator())
-                .middlewareType(request.getMiddlewareType())
-                .tubeCluster(request.getTubeCluster())
-                .tubeTopic(request.getTubeTopic())
-                .pulsarCluster(request.getPulsarCluster())
-                .pulsarNamespace(request.getPulsarNamespace())
-                .pulsarTopic(request.getPulsarTopic())
-                .skipDelete(request.getSkipDelete())
-                .startPosition(request.getStartPosition())
-                .nullFieldChar(request.getNullFieldChar())
+                .user(request.getUser())
+                .password(request.getPassword())
+                .hostname(request.getHostname())
+                .whitelist(request.getWhitelist())
+                .timeZone(request.getTimeZone())
+                .intervalMs(request.getIntervalMs())
+                .snapshotMode(request.getSnapshotMode())
+                .storeHistoryFilename(request.getStoreHistoryFilename())
                 .build();
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index cd2009e..dbee39d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -31,28 +31,26 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 @ApiModel("Response of binlog source paging list")
 public class BinlogSourceListResponse extends SourceListResponse {
 
-    @ApiModelProperty("Source database name")
-    private String dbName;
+    @ApiModelProperty("Username of the DB server")
+    private String user;
 
-    @ApiModelProperty("Source table name")
-    private String tableName;
+    @ApiModelProperty("Hostname of the DB server")
+    private String hostname;
 
-    @ApiModelProperty(value = "Middleware type, such as: TUBE, PULSAR")
-    private String middlewareType;
+    @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions")
+    private String whitelist;
 
-    @ApiModelProperty(value = "Topic of Tube")
-    private String tubeTopic;
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String timeZone;
 
-    @ApiModelProperty(value = "Cluster address of Tube")
-    private String tubeCluster;
+    @ApiModelProperty("The file path to store history info")
+    private String storeHistoryFilename;
 
-    @ApiModelProperty(value = "Namespace of Pulsar")
-    private String pulsarNamespace;
+    @ApiModelProperty("The interval for recording an offset")
+    private String intervalMs;
 
-    @ApiModelProperty(value = "Topic of Pulsar")
-    private String pulsarTopic;
+    @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
+    private String snapshotMode;
 
-    @ApiModelProperty(value = "Cluster address of Pulsar")
-    private String pulsarCluster;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index 81517b3..37efa92 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -36,47 +36,46 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @JsonTypeDefine(value = Constant.SOURCE_DB_BINLOG)
 public class BinlogSourceRequest extends SourceRequest {
 
-    @ApiModelProperty("Source database name")
-    private String dbName;
+    @ApiModelProperty("Username of the DB server")
+    private String user;
+
+    @ApiModelProperty("Password of the DB server")
+    private String password;
+
+    @ApiModelProperty("Hostname of the DB server")
+    private String hostname;
+
+    @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
+            + "separate them with commas, for example: db1.tb1,db2.tb2",
+            notes = "DBs not in this list are excluded. By default, all DBs are monitored")
+    private String whitelist;
+
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String timeZone;
+
+    @ApiModelProperty("The file path to store history info")
+    private String storeHistoryFilename;
+
+    @ApiModelProperty("The interval for recording an offset")
+    private String intervalMs;
+
+    /**
+     * <code>initial</code>: Default mode, do a snapshot when no offset is found.
+     * <p/>
+     * <code>when_needed</code>: Similar to initial, do a snapshot when the binlog position
+     * has been purged on the DB server.
+     * <p/>
+     * <code>never</code>: Do not snapshot.
+     * <p/>
+     * <code>schema_only</code>: All tables' column name will be taken, but the table data will not be exported,
+     * and it will only be consumed from the end of the binlog at the task is started.
+     * So it is very suitable for not caring about historical data, but only about recent changes. the
+     * <p/>
+     * <code>schema_only_recovery</code>: When <code>schema_only</code> mode fails, use this mode to recover, which is
+     * generally not used.
+     */
+    @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
+    private String snapshotMode;
 
-    @ApiModelProperty("Source table name")
-    private String tableName;
-
-    @ApiModelProperty("Data charset")
-    private String charset;
-
-    @ApiModelProperty(value = "Table fields, separated by commas")
-    private String tableFields;
-
-    @ApiModelProperty(value = "Data separator, default is 0x01")
-    private String dataSeparator = "0x01";
-
-    @ApiModelProperty(value = "Middleware type, such as: TUBE, PULSAR")
-    private String middlewareType;
-
-    @ApiModelProperty(value = "Topic of Tube")
-    private String tubeTopic;
-
-    @ApiModelProperty(value = "Cluster address of Tube")
-    private String tubeCluster;
-
-    @ApiModelProperty(value = "Namespace of Pulsar")
-    private String pulsarNamespace;
-
-    @ApiModelProperty(value = "Topic of Pulsar")
-    private String pulsarTopic;
-
-    @ApiModelProperty(value = "Cluster address of Pulsar")
-    private String pulsarCluster;
-
-    @ApiModelProperty(value = "Whether to skip delete events in binlog, default: 1, that is skip")
-    private Integer skipDelete;
-
-    @ApiModelProperty(value = "Collect starts from the specified binlog location, and it is modified after delivery."
-            + "If it is empty, an empty string is returned")
-    private String startPosition;
-
-    @ApiModelProperty(value = "When the field value is null, the replaced field defaults to 'null'")
-    private String nullFieldChar;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index 810bca2..ae01fb3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -36,47 +36,31 @@ public class BinlogSourceResponse extends SourceResponse {
 
     private String sourceType = Constant.SOURCE_DB_BINLOG;
 
-    @ApiModelProperty("Source database name")
-    private String dbName;
+    @ApiModelProperty("Username of the DB server")
+    private String user;
 
-    @ApiModelProperty("Source table name")
-    private String tableName;
+    @ApiModelProperty("Password of the DB server")
+    private String password;
 
-    @ApiModelProperty("Data charset")
-    private String charset;
+    @ApiModelProperty("Hostname of the DB server")
+    private String hostname;
 
-    @ApiModelProperty(value = "Table fields, separated by commas")
-    private String tableFields;
+    @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions")
+    private String whitelist;
 
-    @ApiModelProperty(value = "Data separator, default is 0x01")
-    private String dataSeparator = "0x01";
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String timeZone;
 
-    @ApiModelProperty(value = "Middleware type, such as: TUBE, PULSAR")
-    private String middlewareType;
+    @ApiModelProperty("The file path to store history info")
+    private String storeHistoryFilename;
 
-    @ApiModelProperty(value = "Topic of Tube")
-    private String tubeTopic;
+    @ApiModelProperty("Offset of the task")
+    private String offset;
 
-    @ApiModelProperty(value = "Cluster address of Tube")
-    private String tubeCluster;
+    @ApiModelProperty("The interval for recording an offset")
+    private String intervalMs;
 
-    @ApiModelProperty(value = "Namespace of Pulsar")
-    private String pulsarNamespace;
-
-    @ApiModelProperty(value = "Topic of Pulsar")
-    private String pulsarTopic;
-
-    @ApiModelProperty(value = "Cluster address of Pulsar")
-    private String pulsarCluster;
-
-    @ApiModelProperty(value = "Whether to skip delete events in binlog, default: 1, that is skip")
-    private Integer skipDelete;
-
-    @ApiModelProperty(value = "Collect starts from the specified binlog location, and it is modified after delivery."
-            + "If it is empty, an empty string is returned")
-    private String startPosition;
-
-    @ApiModelProperty(value = "When the field value is null, the replaced field defaults to 'null'")
-    private String nullFieldChar;
+    @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
+    private String snapshotMode;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 2fd40b0..764c5c8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -42,6 +42,4 @@ public class KafkaSourceRequest extends SourceRequest {
     @ApiModelProperty("Kafka topicName")
     private String topicName;
 
-    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
-    private String serializationType;
 }