You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/06 11:41:47 UTC
[incubator-inlong] branch master updated: [INLONG-2912][Manager] Add fields for the binlog task (#2943)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3fe55c9 [INLONG-2912][Manager] Add fields for the binlog task (#2943)
3fe55c9 is described below
commit 3fe55c9c77ca5aa93e14d833d990cb409d9357bc
Author: healchow <he...@gmail.com>
AuthorDate: Sun Mar 6 19:41:41 2022 +0800
[INLONG-2912][Manager] Add fields for the binlog task (#2943)
* [INLONG-2912][Manager] Add fields for the binlog task
* [INLONG-2912][Manager] Fix the magic number
---
.../client/api/source/MySQLBinlogSource.java | 38 ++++++--
.../api/util/InlongStreamSourceTransfer.java | 102 +++++++++++++--------
.../common/pojo/source/binlog/BinlogSourceDTO.java | 40 +++++---
.../source/binlog/BinlogSourceListResponse.java | 24 +++--
.../pojo/source/binlog/BinlogSourceRequest.java | 43 ++++++---
.../pojo/source/binlog/BinlogSourceResponse.java | 25 +++--
.../service/core/impl/AgentServiceImpl.java | 25 +++--
7 files changed, 205 insertions(+), 92 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index 43e8718..b81f6c7 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -19,16 +19,19 @@ package org.apache.inlong.manager.client.api.source;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.StreamSource;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.common.enums.SourceType;
+import java.util.List;
+
@Data
+@EqualsAndHashCode(callSuper = true)
@AllArgsConstructor
@NoArgsConstructor
@ApiModel("Base configuration for MySQL binlog collection")
@@ -46,19 +49,42 @@ public class MySQLBinlogSource extends StreamSource {
@ApiModelProperty("Auth for binlog")
private DefaultAuthentication authentication;
+ @ApiModelProperty("Whether include schema, default is 'false'")
+ private String includeSchema;
+
@ApiModelProperty("Hostname of the DB server, for example: 127.0.0.1")
private String hostname;
- @ApiModelProperty("Exposed port the DB server")
+ @ApiModelProperty("Exposed port of the DB server")
private int port = 3306;
- @ApiModelProperty(value = "List of DBs to be collected, for example: db1.tb1,db2.tb2"
- + ",if all tables in db are collected , use db.*",
- notes = "DBs not in this list are excluded. By default, all DBs are monitored")
+ @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
+ + "separate them with commas, for example: db1,test_db*",
+ notes = "DBs not in this list are excluded. If not set, all DBs are monitored")
private List<String> dbNames;
+ @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions, "
+ + "separate them with commas, for example: tb1,user*",
+ notes = "Tables not in this list are excluded. By default, all tables are monitored")
+ private List<String> tableNames;
+
@ApiModelProperty("Database time zone, Default is UTC")
- private String timeZone = "UTF";
+ private String serverTimezone = "UTF";
+
+ @ApiModelProperty("The interval for recording an offset")
+ private String intervalMs;
+
+ @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
+ private String snapshotMode;
+
+ @ApiModelProperty("The file path to store offset info")
+ private String offsetFilename;
+
+ @ApiModelProperty("The file path to store history info")
+ private String historyFilename;
+
+ @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+ private String monitoredDdl;
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 195be65..d47f8cb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -18,8 +18,7 @@
package org.apache.inlong.manager.client.api.util;
import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import java.util.List;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.StreamSource;
import org.apache.inlong.manager.client.api.StreamSource.SyncType;
@@ -38,6 +37,11 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import java.util.Arrays;
+
+/**
+ * Transfer the inlong stream source.
+ */
public class InlongStreamSourceTransfer {
public static SourceRequest createSourceRequest(StreamSource streamSource, InlongStreamInfo streamInfo) {
@@ -106,39 +110,53 @@ public class InlongStreamSourceTransfer {
return kafkaSource;
}
- private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceResponse binlogSourceResponse) {
+ private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceResponse response) {
MySQLBinlogSource binlogSource = new MySQLBinlogSource();
- binlogSource.setSourceName(binlogSourceResponse.getSourceName());
- binlogSource.setHostname(binlogSourceResponse.getHostname());
+ binlogSource.setSourceName(response.getSourceName());
+ binlogSource.setHostname(response.getHostname());
binlogSource.setDataFormat(DataFormat.NONE);
- binlogSource.setPort(binlogSourceResponse.getPort());
+ binlogSource.setPort(response.getPort());
DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
- binlogSourceResponse.getUser(),
- binlogSourceResponse.getPassword());
- binlogSource.setAllMigration(binlogSourceResponse.isAllMigration());
+ response.getUser(),
+ response.getPassword());
binlogSource.setAuthentication(defaultAuthentication);
- binlogSource.setTimeZone(binlogSourceResponse.getTimeZone());
- binlogSource.setTimestampFormatStandard(binlogSourceResponse.getTimestampFormatStandard());
- List<String> dbs = Splitter.on(",").splitToList(binlogSourceResponse.getWhitelist());
- binlogSource.setDbNames(dbs);
+ binlogSource.setIncludeSchema(response.getIncludeSchema());
+ binlogSource.setServerTimezone(response.getServerTimezone());
+ binlogSource.setMonitoredDdl(response.getMonitoredDdl());
+ binlogSource.setTimestampFormatStandard(response.getTimestampFormatStandard());
+ binlogSource.setAllMigration(response.isAllMigration());
+
+ if (StringUtils.isNotBlank(response.getDatabaseWhiteList())) {
+ binlogSource.setDbNames(Arrays.asList(response.getDatabaseWhiteList().split(",")));
+ }
+ if (StringUtils.isNotBlank(response.getTableWhiteList())) {
+ binlogSource.setTableNames(Arrays.asList(response.getTableWhiteList().split(",")));
+ }
return binlogSource;
}
- private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceListResponse binlogSourceResponse) {
+ private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceListResponse response) {
MySQLBinlogSource binlogSource = new MySQLBinlogSource();
- binlogSource.setSourceName(binlogSourceResponse.getSourceName());
- binlogSource.setHostname(binlogSourceResponse.getHostname());
+ binlogSource.setSourceName(response.getSourceName());
+ binlogSource.setHostname(response.getHostname());
binlogSource.setDataFormat(DataFormat.NONE);
- binlogSource.setPort(binlogSourceResponse.getPort());
+ binlogSource.setPort(response.getPort());
DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
- binlogSourceResponse.getUser(),
- binlogSourceResponse.getPassword());
+ response.getUser(),
+ response.getPassword());
binlogSource.setAuthentication(defaultAuthentication);
- binlogSource.setAllMigration(binlogSourceResponse.isAllMigration());
- binlogSource.setTimeZone(binlogSourceResponse.getTimeZone());
- binlogSource.setTimestampFormatStandard(binlogSourceResponse.getTimestampFormatStandard());
- List<String> dbs = Splitter.on(",").splitToList(binlogSourceResponse.getWhitelist());
- binlogSource.setDbNames(dbs);
+ binlogSource.setIncludeSchema(response.getIncludeSchema());
+ binlogSource.setServerTimezone(response.getServerTimezone());
+ binlogSource.setMonitoredDdl(response.getMonitoredDdl());
+ binlogSource.setTimestampFormatStandard(response.getTimestampFormatStandard());
+ binlogSource.setAllMigration(response.isAllMigration());
+
+ if (StringUtils.isNotBlank(response.getDatabaseWhiteList())) {
+ binlogSource.setDbNames(Arrays.asList(response.getDatabaseWhiteList().split(",")));
+ }
+ if (StringUtils.isNotBlank(response.getTableWhiteList())) {
+ binlogSource.setTableNames(Arrays.asList(response.getTableWhiteList().split(",")));
+ }
return binlogSource;
}
@@ -160,23 +178,27 @@ public class InlongStreamSourceTransfer {
private static BinlogSourceRequest createBinlogSourceRequest(MySQLBinlogSource binlogSource,
InlongStreamInfo streamInfo) {
- BinlogSourceRequest binlogSourceRequest = new BinlogSourceRequest();
- binlogSourceRequest.setSourceName(binlogSource.getSourceName());
- binlogSourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
- binlogSourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
- binlogSourceRequest.setSourceType(binlogSource.getSourceType().name());
+ BinlogSourceRequest sourceRequest = new BinlogSourceRequest();
+ sourceRequest.setSourceName(binlogSource.getSourceName());
+ sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+ sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+ sourceRequest.setSourceType(binlogSource.getSourceType().name());
DefaultAuthentication authentication = binlogSource.getAuthentication();
- binlogSourceRequest.setUser(authentication.getUserName());
- binlogSourceRequest.setPassword(authentication.getPassword());
- binlogSourceRequest.setHostname(binlogSource.getHostname());
- binlogSourceRequest.setPort(binlogSource.getPort());
- binlogSourceRequest.setAllMigration(binlogSource.isAllMigration());
+ sourceRequest.setUser(authentication.getUserName());
+ sourceRequest.setPassword(authentication.getPassword());
+ sourceRequest.setHostname(binlogSource.getHostname());
+ sourceRequest.setPort(binlogSource.getPort());
+ sourceRequest.setIncludeSchema(binlogSource.getIncludeSchema());
+ sourceRequest.setServerTimezone(binlogSource.getServerTimezone());
+ sourceRequest.setMonitoredDdl(binlogSource.getMonitoredDdl());
+ sourceRequest.setAllMigration(binlogSource.isAllMigration());
String dbNames = Joiner.on(",").join(binlogSource.getDbNames());
- binlogSourceRequest.setWhitelist(dbNames);
- binlogSourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
- binlogSourceRequest.setTimeZone(binlogSource.getTimeZone());
- binlogSourceRequest.setSnapshotMode("initial");
- binlogSourceRequest.setIntervalMs("500");
- return binlogSourceRequest;
+ sourceRequest.setDatabaseWhiteList(dbNames);
+ String tableNames = Joiner.on(",").join(binlogSource.getTableNames());
+ sourceRequest.setTableWhiteList(tableNames);
+ sourceRequest.setSnapshotMode("initial");
+ sourceRequest.setIntervalMs("500");
+ sourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
+ return sourceRequest;
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
index eb888c1..d221cdd 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
@@ -49,16 +49,24 @@ public class BinlogSourceDTO {
@ApiModelProperty("Hostname of the DB server")
private String hostname;
- @ApiModelProperty("Exposed port the DB server")
+ @ApiModelProperty("Exposed port of the DB server")
private int port;
+ @ApiModelProperty("Whether include schema, default is 'false'")
+ private String includeSchema;
+
@ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
- + "separate them with commas, for example: db1.tb1,db2.tb2",
- notes = "DBs not in this list are excluded. By default, all DBs are monitored")
- private String whitelist;
+ + "separate them with commas, for example: db1,test_db*",
+ notes = "DBs not in this list are excluded. If not set, all DBs are monitored")
+ private String databaseWhiteList;
+
+ @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions, "
+ + "separate them with commas, for example: tb1,user*",
+ notes = "Tables not in this list are excluded. By default, all tables are monitored")
+ private String tableWhiteList;
@ApiModelProperty("Database time zone, Default is UTC")
- private String timeZone;
+ private String serverTimezone;
@ApiModelProperty("The interval for recording an offset")
private String intervalMs;
@@ -81,13 +89,19 @@ public class BinlogSourceDTO {
@ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
private String snapshotMode;
+ @ApiModelProperty("The file path to store offset info")
+ private String offsetFilename;
+
@ApiModelProperty("The file path to store history info")
- private String storeHistoryFilename;
+ private String historyFilename;
+
+ @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+ private String monitoredDdl;
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
- @ApiModelProperty("Need transfer total database")
+ @ApiModelProperty("Whether to migrate all databases")
private boolean allMigration;
/**
@@ -96,15 +110,19 @@ public class BinlogSourceDTO {
public static BinlogSourceDTO getFromRequest(BinlogSourceRequest request) {
return BinlogSourceDTO.builder()
.user(request.getUser())
- .port(request.getPort())
.password(request.getPassword())
.hostname(request.getHostname())
- .whitelist(request.getWhitelist())
- .timeZone(request.getTimeZone())
+ .port(request.getPort())
+ .includeSchema(request.getIncludeSchema())
+ .databaseWhiteList(request.getDatabaseWhiteList())
+ .tableWhiteList(request.getTableWhiteList())
+ .serverTimezone(request.getServerTimezone())
.intervalMs(request.getIntervalMs())
.snapshotMode(request.getSnapshotMode())
+ .offsetFilename(request.getOffsetFilename())
+ .historyFilename(request.getHistoryFilename())
+ .monitoredDdl(request.getMonitoredDdl())
.allMigration(request.isAllMigration())
- .storeHistoryFilename(request.getStoreHistoryFilename())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index 0902564..0f92668 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -40,17 +40,20 @@ public class BinlogSourceListResponse extends SourceListResponse {
@ApiModelProperty("Hostname of the DB server")
private String hostname;
- @ApiModelProperty("Exposed port the DB server")
+ @ApiModelProperty("Exposed port of the DB server")
private int port;
+ @ApiModelProperty("Whether include schema, default is 'false'")
+ private String includeSchema;
+
@ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions")
- private String whitelist;
+ private String databaseWhiteList;
- @ApiModelProperty("Database time zone, Default is UTC")
- private String timeZone;
+ @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions")
+ private String tableWhiteList;
- @ApiModelProperty("The file path to store history info")
- private String storeHistoryFilename;
+ @ApiModelProperty("Database time zone, Default is UTC")
+ private String serverTimezone;
@ApiModelProperty("The interval for recording an offset")
private String intervalMs;
@@ -58,6 +61,15 @@ public class BinlogSourceListResponse extends SourceListResponse {
@ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
private String snapshotMode;
+ @ApiModelProperty("The file path to store offset info")
+ private String offsetFilename;
+
+ @ApiModelProperty("The file path to store history info")
+ private String historyFilename;
+
+ @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+ private String monitoredDdl;
+
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index 2b4b1a1..cdb1e2f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -37,33 +37,33 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@JsonTypeDefine(value = Constant.SOURCE_BINLOG)
public class BinlogSourceRequest extends SourceRequest {
- public BinlogSourceRequest() {
- this.setSourceType(SourceType.BINLOG.toString());
- }
-
@ApiModelProperty("Username of the DB server")
private String user;
@ApiModelProperty("Password of the DB server")
private String password;
- @ApiModelProperty("Hostname or Ip of the DB server, for example: 127.0.0.1")
+ @ApiModelProperty("Hostname of the DB server")
private String hostname;
- @ApiModelProperty("Exposed port the DB server")
+ @ApiModelProperty("Exposed port of the DB server")
private int port = 3306;
+ @ApiModelProperty("Whether include schema, default is 'false'")
+ private String includeSchema;
+
@ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
- + "separate them with commas, for example: db1.tb1,db2.tb2 "
- + "if all tables in db are collected , use db.*",
- notes = "DBs not in this list are excluded. By default, all DBs are monitored")
- private String whitelist;
+ + "separate them with commas, for example: db1,test_db*",
+ notes = "DBs not in this list are excluded. If not set, all DBs are monitored")
+ private String databaseWhiteList;
- @ApiModelProperty("Database time zone, Default is UTC")
- private String timeZone;
+ @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions, "
+ + "separate them with commas, for example: tb1,user*",
+ notes = "Tables not in this list are excluded. By default, all tables are monitored")
+ private String tableWhiteList;
- @ApiModelProperty("The file path to store history info, default path : /data/history")
- private String storeHistoryFilename = "/data/history";
+ @ApiModelProperty("Database time zone, Default is UTC")
+ private String serverTimezone;
@ApiModelProperty("The interval for recording an offset")
private String intervalMs;
@@ -84,7 +84,16 @@ public class BinlogSourceRequest extends SourceRequest {
* generally not used.
*/
@ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
- private String snapshotMode = "initial";
+ private String snapshotMode;
+
+ @ApiModelProperty("The file path to store offset info")
+ private String offsetFilename;
+
+ @ApiModelProperty("The file path to store history info")
+ private String historyFilename;
+
+ @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+ private String monitoredDdl;
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
@@ -92,4 +101,8 @@ public class BinlogSourceRequest extends SourceRequest {
@ApiModelProperty("Need transfer total database")
private boolean allMigration = false;
+ public BinlogSourceRequest() {
+ this.setSourceType(SourceType.BINLOG.toString());
+ }
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index 1cce061..557fba6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -50,17 +50,17 @@ public class BinlogSourceResponse extends SourceResponse {
@ApiModelProperty("Exposed port of the DB server")
private int port;
+ @ApiModelProperty("Whether include schema, default is 'false'")
+ private String includeSchema;
+
@ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions")
- private String whitelist;
+ private String databaseWhiteList;
- @ApiModelProperty("Database time zone, Default is UTC")
- private String timeZone;
+ @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions")
+ private String tableWhiteList;
- @ApiModelProperty("The file path to store history info")
- private String storeHistoryFilename;
-
- @ApiModelProperty("Offset of the task")
- private String offset;
+ @ApiModelProperty("Database time zone, Default is UTC")
+ private String serverTimezone;
@ApiModelProperty("The interval for recording an offset")
private String intervalMs;
@@ -68,6 +68,15 @@ public class BinlogSourceResponse extends SourceResponse {
@ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
private String snapshotMode;
+ @ApiModelProperty("The file path to store offset info")
+ private String offsetFilename;
+
+ @ApiModelProperty("The file path to store history info")
+ private String historyFilename;
+
+ @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+ private String monitoredDdl;
+
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 87790fb..a95e691 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -79,6 +79,7 @@ public class AgentServiceImpl implements AgentService {
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final int UNISSUED_STATUS = 2;
private static final int ISSUED_STATUS = 3;
+ private static final int MODULUS_100 = 100;
@Autowired
private StreamSourceEntityMapper sourceMapper;
@@ -147,7 +148,7 @@ public class AgentServiceImpl implements AgentService {
int previousStatus = current.getStatus();
int nextStatus = SourceState.SOURCE_NORMAL.getCode();
// Change the status from 30x to normal / disable / frozen
- if (previousStatus / 100 == ISSUED_STATUS) {
+ if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
if (Constants.RESULT_SUCCESS == result) {
if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
nextStatus = SourceState.SOURCE_NORMAL.getCode();
@@ -176,13 +177,25 @@ public class AgentServiceImpl implements AgentService {
List<DataConfig> dataConfigs = Lists.newArrayList();
List<StreamSourceEntity> entityList = sourceMapper.selectByIpAndUuid(agentIp, uuid);
for (StreamSourceEntity entity : entityList) {
+ // Change 20x to 30x
+ int id = entity.getId();
+ int status = entity.getStatus();
+ int op = status % MODULUS_100;
+ if (status / MODULUS_100 == UNISSUED_STATUS) {
+ sourceMapper.updateStatus(id, ISSUED_STATUS * MODULUS_100 + op);
+ } else {
+ LOGGER.info("skip task status not in 20x, id={}", id);
+ continue;
+ }
+
DataConfig dataConfig = new DataConfig();
+ dataConfig.setIp(entity.getAgentIp());
+ dataConfig.setUuid(entity.getUuid());
+ dataConfig.setOp(String.valueOf(op));
dataConfig.setTaskId(entity.getId());
dataConfig.setTaskType(getTaskType(entity));
dataConfig.setTaskName(entity.getSourceName());
- dataConfig.setOp(String.valueOf(entity.getStatus() % 100));
- dataConfig.setIp(entity.getAgentIp());
- dataConfig.setUuid(entity.getUuid());
+ dataConfig.setSnapshot(entity.getSnapshot());
dataConfig.setExtParams(entity.getExtParams());
LocalDateTime dateTime = LocalDateTime.ofInstant(entity.getModifyTime().toInstant(),
ZoneId.systemDefault());
@@ -192,8 +205,8 @@ public class AgentServiceImpl implements AgentService {
String streamId = entity.getInlongStreamId();
dataConfig.setInlongGroupId(groupId);
dataConfig.setInlongStreamId(streamId);
- InlongStreamEntity inlongStreamEntity = streamMapper.selectByIdentifier(groupId, streamId);
- dataConfig.setSyncSend(inlongStreamEntity.getSyncSend());
+ InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+ dataConfig.setSyncSend(streamEntity.getSyncSend());
dataConfigs.add(dataConfig);
}
// Query pending special commands