You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/25 10:53:59 UTC
[incubator-inlong] branch master updated: [INLONG-2715][Manager] Support more parameters for the Binlog source entity (#2726)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bd11bcd [INLONG-2715][Manager] Support more parameters for the Binlog source entity (#2726)
bd11bcd is described below
commit bd11bcdfac5aa96189c83be349990dadfe70bdfd
Author: healchow <he...@gmail.com>
AuthorDate: Fri Feb 25 18:53:40 2022 +0800
[INLONG-2715][Manager] Support more parameters for the Binlog source entity (#2726)
---
.../manager/common/pojo/source/SourceRequest.java | 7 +-
.../manager/common/pojo/source/SourceResponse.java | 4 +-
.../common/pojo/source/binlog/BinlogSourceDTO.java | 92 ++++++++++------------
.../source/binlog/BinlogSourceListResponse.java | 30 ++++---
.../pojo/source/binlog/BinlogSourceRequest.java | 81 ++++++++++---------
.../pojo/source/binlog/BinlogSourceResponse.java | 52 +++++-------
.../pojo/source/kafka/KafkaSourceRequest.java | 2 -
7 files changed, 120 insertions(+), 148 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index f773a6e..be23e24 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -65,7 +65,10 @@ public class SourceRequest {
@ApiModelProperty("Name of the cluster that collected this source")
private String clusterName;
- @ApiModelProperty("Heartbeat of the source task")
- private String heartbeat;
+ @ApiModelProperty("Snapshot of the source task")
+ private String snapshot;
+
+ @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+ private String serializationType;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
index d221846..4d90669 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
@@ -60,8 +60,8 @@ public class SourceResponse {
@ApiModelProperty("Name of the cluster that collected this source")
private String clusterName;
- @ApiModelProperty("Heartbeat of this source task")
- private String heartbeat;
+ @ApiModelProperty("Snapshot of this source task")
+ private String snapshot;
@ApiModelProperty("Status")
private Integer status;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
index 286c173..a7b54c1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
@@ -40,70 +40,60 @@ public class BinlogSourceDTO {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
- @ApiModelProperty("Source database name")
- private String dbName;
+ @ApiModelProperty("Username of the DB server")
+ private String user;
- @ApiModelProperty("Source table name")
- private String tableName;
+ @ApiModelProperty("Password of the DB server")
+ private String password;
- @ApiModelProperty("Data charset")
- private String charset;
+ @ApiModelProperty("Hostname of the DB server")
+ private String hostname;
- @ApiModelProperty(value = "Table fields, separated by commas")
- private String tableFields;
+ @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
+ + "separate them with commas, for example: db1.tb1,db2.tb2",
+ notes = "DBs not in this list are excluded. By default, all DBs are monitored")
+ private String whitelist;
- @ApiModelProperty(value = "Data separator, default is 0x01")
- private String dataSeparator = "0x01";
+ @ApiModelProperty("Database time zone, Default is UTC")
+ private String timeZone;
- @ApiModelProperty(value = "Middleware type, such as: TUBE, PULSAR")
- private String middlewareType;
+ @ApiModelProperty("The interval for recording an offset")
+ private String intervalMs;
- @ApiModelProperty(value = "Topic of Tube")
- private String tubeTopic;
-
- @ApiModelProperty(value = "Cluster address of Tube")
- private String tubeCluster;
-
- @ApiModelProperty(value = "Namespace of Pulsar")
- private String pulsarNamespace;
-
- @ApiModelProperty(value = "Topic of Pulsar")
- private String pulsarTopic;
-
- @ApiModelProperty(value = "Cluster address of Pulsar")
- private String pulsarCluster;
-
- @ApiModelProperty(value = "Whether to skip delete events in binlog, default: 1, that is skip")
- private Integer skipDelete;
-
- @ApiModelProperty(value = "Collect starts from the specified binlog location, and it is modified after delivery."
- + "If it is empty, an empty string is returned")
- private String startPosition;
+ /**
+ * <code>initial</code>: Default mode, do a snapshot when no offset is found.
+ * <p/>
+ * <code>when_needed</code>: Similar to initial, do a snapshot when the binlog position
+ * has been purged on the DB server.
+ * <p/>
+ * <code>never</code>: Do not snapshot.
+ * <p/>
+ * <code>schema_only</code>: All tables' column name will be taken, but the table data will not be exported,
+ * and it will only be consumed from the end of the binlog at the task is started.
+ * So it is very suitable for not caring about historical data, but only about recent changes. the
+ * <p/>
+ * <code>schema_only_recovery</code>: When <code>schema_only</code> mode fails, use this mode to recover, which is
+ * generally not used.
+ */
+ @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
+ private String snapshotMode;
- @ApiModelProperty(value = "When the field value is null, the replaced field defaults to 'null'")
- private String nullFieldChar;
+ @ApiModelProperty("The file path to store history info")
+ private String storeHistoryFilename;
/**
* Get the dto instance from the request
*/
public static BinlogSourceDTO getFromRequest(BinlogSourceRequest request) {
return BinlogSourceDTO.builder()
- .dbName(request.getDbName())
- .tableName(request.getTableName())
- .charset(request.getCharset())
- .dbName(request.getDbName())
- .tableName(request.getTableName())
- .tableFields(request.getTableFields())
- .dataSeparator(request.getDataSeparator())
- .middlewareType(request.getMiddlewareType())
- .tubeCluster(request.getTubeCluster())
- .tubeTopic(request.getTubeTopic())
- .pulsarCluster(request.getPulsarCluster())
- .pulsarNamespace(request.getPulsarNamespace())
- .pulsarTopic(request.getPulsarTopic())
- .skipDelete(request.getSkipDelete())
- .startPosition(request.getStartPosition())
- .nullFieldChar(request.getNullFieldChar())
+ .user(request.getUser())
+ .password(request.getPassword())
+ .hostname(request.getHostname())
+ .whitelist(request.getWhitelist())
+ .timeZone(request.getTimeZone())
+ .intervalMs(request.getIntervalMs())
+ .snapshotMode(request.getSnapshotMode())
+ .storeHistoryFilename(request.getStoreHistoryFilename())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index cd2009e..dbee39d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -31,28 +31,26 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
@ApiModel("Response of binlog source paging list")
public class BinlogSourceListResponse extends SourceListResponse {
- @ApiModelProperty("Source database name")
- private String dbName;
+ @ApiModelProperty("Username of the DB server")
+ private String user;
- @ApiModelProperty("Source table name")
- private String tableName;
+ @ApiModelProperty("Hostname of the DB server")
+ private String hostname;
- @ApiModelProperty(value = "Middleware type, such as: TUBE, PULSAR")
- private String middlewareType;
+ @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions")
+ private String whitelist;
- @ApiModelProperty(value = "Topic of Tube")
- private String tubeTopic;
+ @ApiModelProperty("Database time zone, Default is UTC")
+ private String timeZone;
- @ApiModelProperty(value = "Cluster address of Tube")
- private String tubeCluster;
+ @ApiModelProperty("The file path to store history info")
+ private String storeHistoryFilename;
- @ApiModelProperty(value = "Namespace of Pulsar")
- private String pulsarNamespace;
+ @ApiModelProperty("The interval for recording an offset")
+ private String intervalMs;
- @ApiModelProperty(value = "Topic of Pulsar")
- private String pulsarTopic;
+ @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
+ private String snapshotMode;
- @ApiModelProperty(value = "Cluster address of Pulsar")
- private String pulsarCluster;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index 81517b3..37efa92 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -36,47 +36,46 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@JsonTypeDefine(value = Constant.SOURCE_DB_BINLOG)
public class BinlogSourceRequest extends SourceRequest {
- @ApiModelProperty("Source database name")
- private String dbName;
+ @ApiModelProperty("Username of the DB server")
+ private String user;
+
+ @ApiModelProperty("Password of the DB server")
+ private String password;
+
+ @ApiModelProperty("Hostname of the DB server")
+ private String hostname;
+
+ @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
+ + "separate them with commas, for example: db1.tb1,db2.tb2",
+ notes = "DBs not in this list are excluded. By default, all DBs are monitored")
+ private String whitelist;
+
+ @ApiModelProperty("Database time zone, Default is UTC")
+ private String timeZone;
+
+ @ApiModelProperty("The file path to store history info")
+ private String storeHistoryFilename;
+
+ @ApiModelProperty("The interval for recording an offset")
+ private String intervalMs;
+
+ /**
+ * <code>initial</code>: Default mode, do a snapshot when no offset is found.
+ * <p/>
+ * <code>when_needed</code>: Similar to initial, do a snapshot when the binlog position
+ * has been purged on the DB server.
+ * <p/>
+ * <code>never</code>: Do not snapshot.
+ * <p/>
+ * <code>schema_only</code>: All tables' column name will be taken, but the table data will not be exported,
+ * and it will only be consumed from the end of the binlog at the task is started.
+ * So it is very suitable for not caring about historical data, but only about recent changes. the
+ * <p/>
+ * <code>schema_only_recovery</code>: When <code>schema_only</code> mode fails, use this mode to recover, which is
+ * generally not used.
+ */
+ @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
+ private String snapshotMode;
- @ApiModelProperty("Source table name")
- private String tableName;
-
- @ApiModelProperty("Data charset")
- private String charset;
-
- @ApiModelProperty(value = "Table fields, separated by commas")
- private String tableFields;
-
- @ApiModelProperty(value = "Data separator, default is 0x01")
- private String dataSeparator = "0x01";
-
- @ApiModelProperty(value = "Middleware type, such as: TUBE, PULSAR")
- private String middlewareType;
-
- @ApiModelProperty(value = "Topic of Tube")
- private String tubeTopic;
-
- @ApiModelProperty(value = "Cluster address of Tube")
- private String tubeCluster;
-
- @ApiModelProperty(value = "Namespace of Pulsar")
- private String pulsarNamespace;
-
- @ApiModelProperty(value = "Topic of Pulsar")
- private String pulsarTopic;
-
- @ApiModelProperty(value = "Cluster address of Pulsar")
- private String pulsarCluster;
-
- @ApiModelProperty(value = "Whether to skip delete events in binlog, default: 1, that is skip")
- private Integer skipDelete;
-
- @ApiModelProperty(value = "Collect starts from the specified binlog location, and it is modified after delivery."
- + "If it is empty, an empty string is returned")
- private String startPosition;
-
- @ApiModelProperty(value = "When the field value is null, the replaced field defaults to 'null'")
- private String nullFieldChar;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index 810bca2..ae01fb3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -36,47 +36,31 @@ public class BinlogSourceResponse extends SourceResponse {
private String sourceType = Constant.SOURCE_DB_BINLOG;
- @ApiModelProperty("Source database name")
- private String dbName;
+ @ApiModelProperty("Username of the DB server")
+ private String user;
- @ApiModelProperty("Source table name")
- private String tableName;
+ @ApiModelProperty("Password of the DB server")
+ private String password;
- @ApiModelProperty("Data charset")
- private String charset;
+ @ApiModelProperty("Hostname of the DB server")
+ private String hostname;
- @ApiModelProperty(value = "Table fields, separated by commas")
- private String tableFields;
+ @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions")
+ private String whitelist;
- @ApiModelProperty(value = "Data separator, default is 0x01")
- private String dataSeparator = "0x01";
+ @ApiModelProperty("Database time zone, Default is UTC")
+ private String timeZone;
- @ApiModelProperty(value = "Middleware type, such as: TUBE, PULSAR")
- private String middlewareType;
+ @ApiModelProperty("The file path to store history info")
+ private String storeHistoryFilename;
- @ApiModelProperty(value = "Topic of Tube")
- private String tubeTopic;
+ @ApiModelProperty("Offset of the task")
+ private String offset;
- @ApiModelProperty(value = "Cluster address of Tube")
- private String tubeCluster;
+ @ApiModelProperty("The interval for recording an offset")
+ private String intervalMs;
- @ApiModelProperty(value = "Namespace of Pulsar")
- private String pulsarNamespace;
-
- @ApiModelProperty(value = "Topic of Pulsar")
- private String pulsarTopic;
-
- @ApiModelProperty(value = "Cluster address of Pulsar")
- private String pulsarCluster;
-
- @ApiModelProperty(value = "Whether to skip delete events in binlog, default: 1, that is skip")
- private Integer skipDelete;
-
- @ApiModelProperty(value = "Collect starts from the specified binlog location, and it is modified after delivery."
- + "If it is empty, an empty string is returned")
- private String startPosition;
-
- @ApiModelProperty(value = "When the field value is null, the replaced field defaults to 'null'")
- private String nullFieldChar;
+ @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
+ private String snapshotMode;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 2fd40b0..764c5c8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -42,6 +42,4 @@ public class KafkaSourceRequest extends SourceRequest {
@ApiModelProperty("Kafka topicName")
private String topicName;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
- private String serializationType;
}