You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/05 03:10:38 UTC
[incubator-inlong] branch master updated: [INLONG-2917][Manager] Fix migrate all database task in manager (#2918)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b748b86 [INLONG-2917][Manager] Fix migrate all database task in manager (#2918)
b748b86 is described below
commit b748b86686ea471aa51ac696a5dd655276518a57
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat Mar 5 11:10:32 2022 +0800
[INLONG-2917][Manager] Fix migrate all database task in manager (#2918)
---
.../client/api/source/MySQLBinlogSource.java | 3 +++
.../api/util/InlongStreamSourceTransfer.java | 2 ++
.../inlong/manager/common/enums/SourceType.java | 1 -
.../common/pojo/source/binlog/BinlogSourceDTO.java | 5 +++++
.../source/binlog/BinlogSourceListResponse.java | 2 ++
.../pojo/source/binlog/BinlogSourceRequest.java | 3 +++
.../pojo/source/binlog/BinlogSourceResponse.java | 2 ++
.../service/core/impl/AgentServiceImpl.java | 23 ++++++++++++++++++++--
.../thirdparty/sort/CreateSortConfigListener.java | 21 ++++++++++----------
.../thirdparty/sort/util/SinkInfoUtils.java | 5 ++---
.../thirdparty/sort/util/SourceInfoUtils.java | 11 +++++++++++
11 files changed, 62 insertions(+), 16 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index d23b40d..43e8718 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -62,4 +62,7 @@ public class MySQLBinlogSource extends StreamSource {
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
+
+ @ApiModelProperty("Need transfer total database")
+ private boolean allMigration = false;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 28bec82..82ae09e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -86,6 +86,7 @@ public class InlongStreamSourceTransfer {
binlogSourceResponse.getUser(),
binlogSourceResponse.getPassword());
binlogSource.setAuthentication(defaultAuthentication);
+ binlogSource.setAllMigration(binlogSourceResponse.isAllMigration());
binlogSource.setTimeZone(binlogSourceResponse.getTimeZone());
binlogSource.setTimestampFormatStandard(binlogSourceResponse.getTimestampFormatStandard());
List<String> dbs = Splitter.on(",").splitToList(binlogSourceResponse.getWhitelist());
@@ -121,6 +122,7 @@ public class InlongStreamSourceTransfer {
binlogSourceRequest.setPassword(authentication.getPassword());
binlogSourceRequest.setHostname(binlogSource.getHostname());
binlogSourceRequest.setPort(binlogSource.getPort());
+ binlogSourceRequest.setAllMigration(binlogSource.isAllMigration());
String dbNames = Joiner.on(",").join(binlogSource.getDbNames());
binlogSourceRequest.setWhitelist(dbNames);
binlogSourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 93998e7..1dbcbed 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -23,7 +23,6 @@ import org.apache.inlong.common.enums.TaskTypeEnum;
import java.util.Locale;
public enum SourceType {
- DATABASE_MIGRATION("DATABASE_MIGRATION",TaskTypeEnum.DATABASE_MIGRATION),
FILE("FILE", TaskTypeEnum.FILE),
SQL("SQL", TaskTypeEnum.SQL),
BINLOG("BINLOG", TaskTypeEnum.BINLOG),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
index 388d952..eb888c1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
@@ -87,18 +87,23 @@ public class BinlogSourceDTO {
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
+ @ApiModelProperty("Need transfer total database")
+ private boolean allMigration;
+
/**
* Get the dto instance from the request
*/
public static BinlogSourceDTO getFromRequest(BinlogSourceRequest request) {
return BinlogSourceDTO.builder()
.user(request.getUser())
+ .port(request.getPort())
.password(request.getPassword())
.hostname(request.getHostname())
.whitelist(request.getWhitelist())
.timeZone(request.getTimeZone())
.intervalMs(request.getIntervalMs())
.snapshotMode(request.getSnapshotMode())
+ .allMigration(request.isAllMigration())
.storeHistoryFilename(request.getStoreHistoryFilename())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index 3b960e6..0902564 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -61,4 +61,6 @@ public class BinlogSourceListResponse extends SourceListResponse {
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
+ @ApiModelProperty("Need transfer total database")
+ private boolean allMigration;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index d1aa653..2b4b1a1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -89,4 +89,7 @@ public class BinlogSourceRequest extends SourceRequest {
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
+ @ApiModelProperty("Need transfer total database")
+ private boolean allMigration = false;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index ccf25d1..8941bc1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -69,4 +69,6 @@ public class BinlogSourceResponse extends SourceResponse {
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
+ @ApiModelProperty("Need transfer total database")
+ private boolean allMigration;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index fd2b8ae..e135a15 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -23,6 +23,7 @@ import com.google.gson.GsonBuilder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.db.CommandEntity;
+import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.CmdConfig;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
@@ -40,6 +41,7 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo;
import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo.CommandInfoBean;
import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskConfig;
import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceDTO;
import org.apache.inlong.manager.dao.entity.DataSourceCmdConfigEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
@@ -52,6 +54,7 @@ import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperation;
+import org.apache.inlong.manager.service.source.binlog.BinlogStreamSourceOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -84,6 +87,8 @@ public class AgentServiceImpl implements AgentService {
private InlongStreamFieldEntityMapper streamFieldMapper;
@Autowired
private InlongStreamEntityMapper streamMapper;
+ @Autowired
+ private BinlogStreamSourceOperation binlogStreamSourceOperation;
/**
* If the reported task time and the modification time in the database exceed this value,
@@ -122,8 +127,7 @@ public class AgentServiceImpl implements AgentService {
for (StreamSourceEntity entity : entityList) {
DataConfig dataConfig = new DataConfig();
dataConfig.setTaskId(entity.getId());
- SourceType sourceType = SourceType.forType(entity.getSourceType());
- dataConfig.setTaskType(sourceType.getTaskType().getType());
+ dataConfig.setTaskType(getTaskType(entity));
dataConfig.setTaskName(entity.getSourceName());
dataConfig.setOp(String.valueOf(entity.getStatus() % 100));
dataConfig.setIp(entity.getAgentIp());
@@ -145,6 +149,21 @@ public class AgentServiceImpl implements AgentService {
return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
}
+ private int getTaskType(StreamSourceEntity sourceEntity) {
+ SourceType sourceType = SourceType.forType(sourceEntity.getSourceType());
+ if (sourceType != SourceType.BINLOG) {
+ return sourceType.getTaskType().getType();
+ } else {
+ BinlogSourceDTO binlogSourceDTO = binlogStreamSourceOperation.getFromEntity(sourceEntity,
+ BinlogSourceDTO::new);
+ if (binlogSourceDTO.isAllMigration()) {
+ return TaskTypeEnum.DATABASE_MIGRATION.getType();
+ } else {
+ return sourceType.getTaskType().getType();
+ }
+ }
+ }
+
/**
* Update the task status by the request
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 0b91088..2e4e227 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -29,7 +29,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
-import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
@@ -168,19 +167,20 @@ public class CreateSortConfigListener implements SortOperateListener {
String middleWareType = groupInfo.getMiddlewareType();
List<FieldInfo> fieldInfos = Lists.newArrayList();
- if (SourceType.DATABASE_MIGRATION.getType().equalsIgnoreCase(sourceResponse.getSourceType())) {
+ if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
fieldInfos.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
BuiltInField.MYSQL_METADATA_DATA));
- }
- if (!SourceType.DATABASE_MIGRATION.getType().equalsIgnoreCase(sourceResponse.getSourceType())
- && CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
- fieldInfos = streamInfo.getFieldList().stream().map(inlongStreamFieldInfo -> {
- FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(
- inlongStreamFieldInfo.getFieldType().toLowerCase());
- return new FieldInfo(inlongStreamFieldInfo.getFieldName(), formatInfo);
- }).collect(Collectors.toList());
+ } else {
+ if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
+ fieldInfos = streamInfo.getFieldList().stream().map(inlongStreamFieldInfo -> {
+ FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(
+ inlongStreamFieldInfo.getFieldType().toLowerCase());
+ return new FieldInfo(inlongStreamFieldInfo.getFieldName(), formatInfo);
+ }).collect(Collectors.toList());
+ }
}
+
DeserializationInfo deserializationInfo = SerializationUtils.createDeserializationInfo(sourceResponse,
streamInfo);
if (Constant.MIDDLEWARE_PULSAR.equals(middleWareType)) {
@@ -191,6 +191,7 @@ public class CreateSortConfigListener implements SortOperateListener {
throw new RuntimeException(
String.format("MiddleWare:{} not support in CreateSortConfigListener", middleWareType));
}
+
}
private PulsarSourceInfo createPulsarSourceInfo(InlongGroupInfo groupInfo,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index 2ea0f4c..0ea1599 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -27,7 +27,6 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.SinkType;
-import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
@@ -89,7 +88,7 @@ public class SinkInfoUtils {
private static KafkaSinkInfo createKafkaSinkInfo(InlongStreamInfo inlongStreamInfo, SourceResponse sourceResponse,
KafkaSinkResponse kafkaSinkResponse) {
List<FieldInfo> fieldInfoList = Lists.newArrayList();
- if (SourceType.DATABASE_MIGRATION.getType().equalsIgnoreCase(sourceResponse.getSourceType())) {
+ if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
fieldInfoList.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
BuiltInField.MYSQL_METADATA_DATA));
} else {
@@ -159,7 +158,7 @@ public class SinkInfoUtils {
// Get the sink field, if there is no partition field in the source field, add the partition field to the end
List<FieldInfo> fieldInfoList = Lists.newArrayList();
- if (SourceType.DATABASE_MIGRATION.getType().equalsIgnoreCase(sourceResponse.getSourceType())) {
+ if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
fieldInfoList.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
BuiltInField.MYSQL_METADATA_DATA));
} else {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 023d025..2027f20 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -20,8 +20,11 @@ package org.apache.inlong.manager.service.thirdparty.sort.util;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
@@ -29,6 +32,14 @@ import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
public class SourceInfoUtils {
+ public static boolean isBinlogMigrationSource(SourceResponse sourceResponse) {
+ if (SourceType.BINLOG.getType().equalsIgnoreCase(sourceResponse.getSourceType())) {
+ BinlogSourceResponse binlogSourceResponse = (BinlogSourceResponse) sourceResponse;
+ return binlogSourceResponse.isAllMigration();
+ }
+ return false;
+ }
+
public static PulsarSourceInfo createPulsarSourceInfo(InlongGroupInfo groupInfo, String pulsarTopic,
DeserializationInfo deserializationInfo,
List<FieldInfo> fieldInfos, String appName,String tenant,