You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/06 11:41:47 UTC

[incubator-inlong] branch master updated: [INLONG-2912][Manager] Add fields for the binlog task (#2943)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fe55c9  [INLONG-2912][Manager] Add fields for the binlog task (#2943)
3fe55c9 is described below

commit 3fe55c9c77ca5aa93e14d833d990cb409d9357bc
Author: healchow <he...@gmail.com>
AuthorDate: Sun Mar 6 19:41:41 2022 +0800

    [INLONG-2912][Manager] Add fields for the binlog task (#2943)
    
    * [INLONG-2912][Manager] Add fields for the binlog task
    
    * [INLONG-2912][Manager] Fix the magic number
---
 .../client/api/source/MySQLBinlogSource.java       |  38 ++++++--
 .../api/util/InlongStreamSourceTransfer.java       | 102 +++++++++++++--------
 .../common/pojo/source/binlog/BinlogSourceDTO.java |  40 +++++---
 .../source/binlog/BinlogSourceListResponse.java    |  24 +++--
 .../pojo/source/binlog/BinlogSourceRequest.java    |  43 ++++++---
 .../pojo/source/binlog/BinlogSourceResponse.java   |  25 +++--
 .../service/core/impl/AgentServiceImpl.java        |  25 +++--
 7 files changed, 205 insertions(+), 92 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index 43e8718..b81f6c7 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -19,16 +19,19 @@ package org.apache.inlong.manager.client.api.source;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.List;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.StreamSource;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.common.enums.SourceType;
 
+import java.util.List;
+
 @Data
+@EqualsAndHashCode(callSuper = true)
 @AllArgsConstructor
 @NoArgsConstructor
 @ApiModel("Base configuration for MySQL binlog collection")
@@ -46,19 +49,42 @@ public class MySQLBinlogSource extends StreamSource {
     @ApiModelProperty("Auth for binlog")
     private DefaultAuthentication authentication;
 
+    @ApiModelProperty("Whether include schema, default is 'false'")
+    private String includeSchema;
+
     @ApiModelProperty("Hostname of the DB server, for example: 127.0.0.1")
     private String hostname;
 
-    @ApiModelProperty("Exposed port the DB server")
+    @ApiModelProperty("Exposed port of the DB server")
     private int port = 3306;
 
-    @ApiModelProperty(value = "List of DBs to be collected, for example: db1.tb1,db2.tb2"
-            + ",if all tables in db are collected , use db.*",
-            notes = "DBs not in this list are excluded. By default, all DBs are monitored")
+    @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
+            + "separate them with commas, for example: db1,test_db*",
+            notes = "DBs not in this list are excluded. If not set, all DBs are monitored")
     private List<String> dbNames;
 
+    @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions, "
+            + "separate them with commas, for example: tb1,user*",
+            notes = "Tables not in this list are excluded. By default, all tables are monitored")
+    private List<String> tableNames;
+
     @ApiModelProperty("Database time zone, Default is UTC")
-    private String timeZone = "UTF";
+    private String serverTimezone = "UTF";
+
+    @ApiModelProperty("The interval for recording an offset")
+    private String intervalMs;
+
+    @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
+    private String snapshotMode;
+
+    @ApiModelProperty("The file path to store offset info")
+    private String offsetFilename;
+
+    @ApiModelProperty("The file path to store history info")
+    private String historyFilename;
+
+    @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+    private String monitoredDdl;
 
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 195be65..d47f8cb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -18,8 +18,7 @@
 package org.apache.inlong.manager.client.api.util;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import java.util.List;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.StreamSource;
 import org.apache.inlong.manager.client.api.StreamSource.SyncType;
@@ -38,6 +37,11 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
+import java.util.Arrays;
+
+/**
+ * Transfer the inlong stream source.
+ */
 public class InlongStreamSourceTransfer {
 
     public static SourceRequest createSourceRequest(StreamSource streamSource, InlongStreamInfo streamInfo) {
@@ -106,39 +110,53 @@ public class InlongStreamSourceTransfer {
         return kafkaSource;
     }
 
-    private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceResponse binlogSourceResponse) {
+    private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceResponse response) {
         MySQLBinlogSource binlogSource = new MySQLBinlogSource();
-        binlogSource.setSourceName(binlogSourceResponse.getSourceName());
-        binlogSource.setHostname(binlogSourceResponse.getHostname());
+        binlogSource.setSourceName(response.getSourceName());
+        binlogSource.setHostname(response.getHostname());
         binlogSource.setDataFormat(DataFormat.NONE);
-        binlogSource.setPort(binlogSourceResponse.getPort());
+        binlogSource.setPort(response.getPort());
         DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
-                binlogSourceResponse.getUser(),
-                binlogSourceResponse.getPassword());
-        binlogSource.setAllMigration(binlogSourceResponse.isAllMigration());
+                response.getUser(),
+                response.getPassword());
         binlogSource.setAuthentication(defaultAuthentication);
-        binlogSource.setTimeZone(binlogSourceResponse.getTimeZone());
-        binlogSource.setTimestampFormatStandard(binlogSourceResponse.getTimestampFormatStandard());
-        List<String> dbs = Splitter.on(",").splitToList(binlogSourceResponse.getWhitelist());
-        binlogSource.setDbNames(dbs);
+        binlogSource.setIncludeSchema(response.getIncludeSchema());
+        binlogSource.setServerTimezone(response.getServerTimezone());
+        binlogSource.setMonitoredDdl(response.getMonitoredDdl());
+        binlogSource.setTimestampFormatStandard(response.getTimestampFormatStandard());
+        binlogSource.setAllMigration(response.isAllMigration());
+
+        if (StringUtils.isNotBlank(response.getDatabaseWhiteList())) {
+            binlogSource.setDbNames(Arrays.asList(response.getDatabaseWhiteList().split(",")));
+        }
+        if (StringUtils.isNotBlank(response.getTableWhiteList())) {
+            binlogSource.setTableNames(Arrays.asList(response.getTableWhiteList().split(",")));
+        }
         return binlogSource;
     }
 
-    private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceListResponse binlogSourceResponse) {
+    private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceListResponse response) {
         MySQLBinlogSource binlogSource = new MySQLBinlogSource();
-        binlogSource.setSourceName(binlogSourceResponse.getSourceName());
-        binlogSource.setHostname(binlogSourceResponse.getHostname());
+        binlogSource.setSourceName(response.getSourceName());
+        binlogSource.setHostname(response.getHostname());
         binlogSource.setDataFormat(DataFormat.NONE);
-        binlogSource.setPort(binlogSourceResponse.getPort());
+        binlogSource.setPort(response.getPort());
         DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
-                binlogSourceResponse.getUser(),
-                binlogSourceResponse.getPassword());
+                response.getUser(),
+                response.getPassword());
         binlogSource.setAuthentication(defaultAuthentication);
-        binlogSource.setAllMigration(binlogSourceResponse.isAllMigration());
-        binlogSource.setTimeZone(binlogSourceResponse.getTimeZone());
-        binlogSource.setTimestampFormatStandard(binlogSourceResponse.getTimestampFormatStandard());
-        List<String> dbs = Splitter.on(",").splitToList(binlogSourceResponse.getWhitelist());
-        binlogSource.setDbNames(dbs);
+        binlogSource.setIncludeSchema(response.getIncludeSchema());
+        binlogSource.setServerTimezone(response.getServerTimezone());
+        binlogSource.setMonitoredDdl(response.getMonitoredDdl());
+        binlogSource.setTimestampFormatStandard(response.getTimestampFormatStandard());
+        binlogSource.setAllMigration(response.isAllMigration());
+
+        if (StringUtils.isNotBlank(response.getDatabaseWhiteList())) {
+            binlogSource.setDbNames(Arrays.asList(response.getDatabaseWhiteList().split(",")));
+        }
+        if (StringUtils.isNotBlank(response.getTableWhiteList())) {
+            binlogSource.setTableNames(Arrays.asList(response.getTableWhiteList().split(",")));
+        }
         return binlogSource;
     }
 
@@ -160,23 +178,27 @@ public class InlongStreamSourceTransfer {
 
     private static BinlogSourceRequest createBinlogSourceRequest(MySQLBinlogSource binlogSource,
             InlongStreamInfo streamInfo) {
-        BinlogSourceRequest binlogSourceRequest = new BinlogSourceRequest();
-        binlogSourceRequest.setSourceName(binlogSource.getSourceName());
-        binlogSourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
-        binlogSourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
-        binlogSourceRequest.setSourceType(binlogSource.getSourceType().name());
+        BinlogSourceRequest sourceRequest = new BinlogSourceRequest();
+        sourceRequest.setSourceName(binlogSource.getSourceName());
+        sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+        sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+        sourceRequest.setSourceType(binlogSource.getSourceType().name());
         DefaultAuthentication authentication = binlogSource.getAuthentication();
-        binlogSourceRequest.setUser(authentication.getUserName());
-        binlogSourceRequest.setPassword(authentication.getPassword());
-        binlogSourceRequest.setHostname(binlogSource.getHostname());
-        binlogSourceRequest.setPort(binlogSource.getPort());
-        binlogSourceRequest.setAllMigration(binlogSource.isAllMigration());
+        sourceRequest.setUser(authentication.getUserName());
+        sourceRequest.setPassword(authentication.getPassword());
+        sourceRequest.setHostname(binlogSource.getHostname());
+        sourceRequest.setPort(binlogSource.getPort());
+        sourceRequest.setIncludeSchema(binlogSource.getIncludeSchema());
+        sourceRequest.setServerTimezone(binlogSource.getServerTimezone());
+        sourceRequest.setMonitoredDdl(binlogSource.getMonitoredDdl());
+        sourceRequest.setAllMigration(binlogSource.isAllMigration());
         String dbNames = Joiner.on(",").join(binlogSource.getDbNames());
-        binlogSourceRequest.setWhitelist(dbNames);
-        binlogSourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
-        binlogSourceRequest.setTimeZone(binlogSource.getTimeZone());
-        binlogSourceRequest.setSnapshotMode("initial");
-        binlogSourceRequest.setIntervalMs("500");
-        return binlogSourceRequest;
+        sourceRequest.setDatabaseWhiteList(dbNames);
+        String tableNames = Joiner.on(",").join(binlogSource.getTableNames());
+        sourceRequest.setTableWhiteList(tableNames);
+        sourceRequest.setSnapshotMode("initial");
+        sourceRequest.setIntervalMs("500");
+        sourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
+        return sourceRequest;
     }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
index eb888c1..d221cdd 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
@@ -49,16 +49,24 @@ public class BinlogSourceDTO {
     @ApiModelProperty("Hostname of the DB server")
     private String hostname;
 
-    @ApiModelProperty("Exposed port the DB server")
+    @ApiModelProperty("Exposed port of the DB server")
     private int port;
 
+    @ApiModelProperty("Whether include schema, default is 'false'")
+    private String includeSchema;
+
     @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
-            + "separate them with commas, for example: db1.tb1,db2.tb2",
-            notes = "DBs not in this list are excluded. By default, all DBs are monitored")
-    private String whitelist;
+            + "separate them with commas, for example: db1,test_db*",
+            notes = "DBs not in this list are excluded. If not set, all DBs are monitored")
+    private String databaseWhiteList;
+
+    @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions, "
+            + "separate them with commas, for example: tb1,user*",
+            notes = "Tables not in this list are excluded. By default, all tables are monitored")
+    private String tableWhiteList;
 
     @ApiModelProperty("Database time zone, Default is UTC")
-    private String timeZone;
+    private String serverTimezone;
 
     @ApiModelProperty("The interval for recording an offset")
     private String intervalMs;
@@ -81,13 +89,19 @@ public class BinlogSourceDTO {
     @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
     private String snapshotMode;
 
+    @ApiModelProperty("The file path to store offset info")
+    private String offsetFilename;
+
     @ApiModelProperty("The file path to store history info")
-    private String storeHistoryFilename;
+    private String historyFilename;
+
+    @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+    private String monitoredDdl;
 
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
 
-    @ApiModelProperty("Need transfer total database")
+    @ApiModelProperty("Whether to migrate all databases")
     private boolean allMigration;
 
     /**
@@ -96,15 +110,19 @@ public class BinlogSourceDTO {
     public static BinlogSourceDTO getFromRequest(BinlogSourceRequest request) {
         return BinlogSourceDTO.builder()
                 .user(request.getUser())
-                .port(request.getPort())
                 .password(request.getPassword())
                 .hostname(request.getHostname())
-                .whitelist(request.getWhitelist())
-                .timeZone(request.getTimeZone())
+                .port(request.getPort())
+                .includeSchema(request.getIncludeSchema())
+                .databaseWhiteList(request.getDatabaseWhiteList())
+                .tableWhiteList(request.getTableWhiteList())
+                .serverTimezone(request.getServerTimezone())
                 .intervalMs(request.getIntervalMs())
                 .snapshotMode(request.getSnapshotMode())
+                .offsetFilename(request.getOffsetFilename())
+                .historyFilename(request.getHistoryFilename())
+                .monitoredDdl(request.getMonitoredDdl())
                 .allMigration(request.isAllMigration())
-                .storeHistoryFilename(request.getStoreHistoryFilename())
                 .build();
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index 0902564..0f92668 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -40,17 +40,20 @@ public class BinlogSourceListResponse extends SourceListResponse {
     @ApiModelProperty("Hostname of the DB server")
     private String hostname;
 
-    @ApiModelProperty("Exposed port the DB server")
+    @ApiModelProperty("Exposed port of the DB server")
     private int port;
 
+    @ApiModelProperty("Whether include schema, default is 'false'")
+    private String includeSchema;
+
     @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions")
-    private String whitelist;
+    private String databaseWhiteList;
 
-    @ApiModelProperty("Database time zone, Default is UTC")
-    private String timeZone;
+    @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions")
+    private String tableWhiteList;
 
-    @ApiModelProperty("The file path to store history info")
-    private String storeHistoryFilename;
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String serverTimezone;
 
     @ApiModelProperty("The interval for recording an offset")
     private String intervalMs;
@@ -58,6 +61,15 @@ public class BinlogSourceListResponse extends SourceListResponse {
     @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
     private String snapshotMode;
 
+    @ApiModelProperty("The file path to store offset info")
+    private String offsetFilename;
+
+    @ApiModelProperty("The file path to store history info")
+    private String historyFilename;
+
+    @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+    private String monitoredDdl;
+
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index 2b4b1a1..cdb1e2f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -37,33 +37,33 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @JsonTypeDefine(value = Constant.SOURCE_BINLOG)
 public class BinlogSourceRequest extends SourceRequest {
 
-    public BinlogSourceRequest() {
-        this.setSourceType(SourceType.BINLOG.toString());
-    }
-
     @ApiModelProperty("Username of the DB server")
     private String user;
 
     @ApiModelProperty("Password of the DB server")
     private String password;
 
-    @ApiModelProperty("Hostname or Ip of the DB server, for example: 127.0.0.1")
+    @ApiModelProperty("Hostname of the DB server")
     private String hostname;
 
-    @ApiModelProperty("Exposed port the DB server")
+    @ApiModelProperty("Exposed port of the DB server")
     private int port = 3306;
 
+    @ApiModelProperty("Whether include schema, default is 'false'")
+    private String includeSchema;
+
     @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
-            + "separate them with commas, for example: db1.tb1,db2.tb2 "
-            + "if all tables in db are collected , use db.*",
-            notes = "DBs not in this list are excluded. By default, all DBs are monitored")
-    private String whitelist;
+            + "separate them with commas, for example: db1,test_db*",
+            notes = "DBs not in this list are excluded. If not set, all DBs are monitored")
+    private String databaseWhiteList;
 
-    @ApiModelProperty("Database time zone, Default is UTC")
-    private String timeZone;
+    @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions, "
+            + "separate them with commas, for example: tb1,user*",
+            notes = "Tables not in this list are excluded. By default, all tables are monitored")
+    private String tableWhiteList;
 
-    @ApiModelProperty("The file path to store history info, default path : /data/history")
-    private String storeHistoryFilename = "/data/history";
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String serverTimezone;
 
     @ApiModelProperty("The interval for recording an offset")
     private String intervalMs;
@@ -84,7 +84,16 @@ public class BinlogSourceRequest extends SourceRequest {
      * generally not used.
      */
     @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
-    private String snapshotMode = "initial";
+    private String snapshotMode;
+
+    @ApiModelProperty("The file path to store offset info")
+    private String offsetFilename;
+
+    @ApiModelProperty("The file path to store history info")
+    private String historyFilename;
+
+    @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+    private String monitoredDdl;
 
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
@@ -92,4 +101,8 @@ public class BinlogSourceRequest extends SourceRequest {
     @ApiModelProperty("Need transfer total database")
     private boolean allMigration = false;
 
+    public BinlogSourceRequest() {
+        this.setSourceType(SourceType.BINLOG.toString());
+    }
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index 1cce061..557fba6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -50,17 +50,17 @@ public class BinlogSourceResponse extends SourceResponse {
     @ApiModelProperty("Exposed port of the DB server")
     private int port;
 
+    @ApiModelProperty("Whether include schema, default is 'false'")
+    private String includeSchema;
+
     @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions")
-    private String whitelist;
+    private String databaseWhiteList;
 
-    @ApiModelProperty("Database time zone, Default is UTC")
-    private String timeZone;
+    @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions")
+    private String tableWhiteList;
 
-    @ApiModelProperty("The file path to store history info")
-    private String storeHistoryFilename;
-
-    @ApiModelProperty("Offset of the task")
-    private String offset;
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String serverTimezone;
 
     @ApiModelProperty("The interval for recording an offset")
     private String intervalMs;
@@ -68,6 +68,15 @@ public class BinlogSourceResponse extends SourceResponse {
     @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
     private String snapshotMode;
 
+    @ApiModelProperty("The file path to store offset info")
+    private String offsetFilename;
+
+    @ApiModelProperty("The file path to store history info")
+    private String historyFilename;
+
+    @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+    private String monitoredDdl;
+
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 87790fb..a95e691 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -79,6 +79,7 @@ public class AgentServiceImpl implements AgentService {
     private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     private static final int UNISSUED_STATUS = 2;
     private static final int ISSUED_STATUS = 3;
+    private static final int MODULUS_100 = 100;
 
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
@@ -147,7 +148,7 @@ public class AgentServiceImpl implements AgentService {
             int previousStatus = current.getStatus();
             int nextStatus = SourceState.SOURCE_NORMAL.getCode();
             // Change the status from 30x to normal / disable / frozen
-            if (previousStatus / 100 == ISSUED_STATUS) {
+            if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
                 if (Constants.RESULT_SUCCESS == result) {
                     if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
                         nextStatus = SourceState.SOURCE_NORMAL.getCode();
@@ -176,13 +177,25 @@ public class AgentServiceImpl implements AgentService {
         List<DataConfig> dataConfigs = Lists.newArrayList();
         List<StreamSourceEntity> entityList = sourceMapper.selectByIpAndUuid(agentIp, uuid);
         for (StreamSourceEntity entity : entityList) {
+            // Change 20x to 30x
+            int id = entity.getId();
+            int status = entity.getStatus();
+            int op = status % MODULUS_100;
+            if (status / MODULUS_100 == UNISSUED_STATUS) {
+                sourceMapper.updateStatus(id, ISSUED_STATUS * MODULUS_100 + op);
+            } else {
+                LOGGER.info("skip task status not in 20x, id={}", id);
+                continue;
+            }
+
             DataConfig dataConfig = new DataConfig();
+            dataConfig.setIp(entity.getAgentIp());
+            dataConfig.setUuid(entity.getUuid());
+            dataConfig.setOp(String.valueOf(op));
             dataConfig.setTaskId(entity.getId());
             dataConfig.setTaskType(getTaskType(entity));
             dataConfig.setTaskName(entity.getSourceName());
-            dataConfig.setOp(String.valueOf(entity.getStatus() % 100));
-            dataConfig.setIp(entity.getAgentIp());
-            dataConfig.setUuid(entity.getUuid());
+            dataConfig.setSnapshot(entity.getSnapshot());
             dataConfig.setExtParams(entity.getExtParams());
             LocalDateTime dateTime = LocalDateTime.ofInstant(entity.getModifyTime().toInstant(),
                     ZoneId.systemDefault());
@@ -192,8 +205,8 @@ public class AgentServiceImpl implements AgentService {
             String streamId = entity.getInlongStreamId();
             dataConfig.setInlongGroupId(groupId);
             dataConfig.setInlongStreamId(streamId);
-            InlongStreamEntity inlongStreamEntity = streamMapper.selectByIdentifier(groupId, streamId);
-            dataConfig.setSyncSend(inlongStreamEntity.getSyncSend());
+            InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+            dataConfig.setSyncSend(streamEntity.getSyncSend());
             dataConfigs.add(dataConfig);
         }
         // Query pending special commands