You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/05 03:10:38 UTC

[incubator-inlong] branch master updated: [INLONG-2917][Manager] Fix migrate all database task in manager (#2918)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b748b86  [INLONG-2917][Manager] Fix migrate all database task in manager (#2918)
b748b86 is described below

commit b748b86686ea471aa51ac696a5dd655276518a57
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat Mar 5 11:10:32 2022 +0800

    [INLONG-2917][Manager] Fix migrate all database task in manager (#2918)
---
 .../client/api/source/MySQLBinlogSource.java       |  3 +++
 .../api/util/InlongStreamSourceTransfer.java       |  2 ++
 .../inlong/manager/common/enums/SourceType.java    |  1 -
 .../common/pojo/source/binlog/BinlogSourceDTO.java |  5 +++++
 .../source/binlog/BinlogSourceListResponse.java    |  2 ++
 .../pojo/source/binlog/BinlogSourceRequest.java    |  3 +++
 .../pojo/source/binlog/BinlogSourceResponse.java   |  2 ++
 .../service/core/impl/AgentServiceImpl.java        | 23 ++++++++++++++++++++--
 .../thirdparty/sort/CreateSortConfigListener.java  | 21 ++++++++++----------
 .../thirdparty/sort/util/SinkInfoUtils.java        |  5 ++---
 .../thirdparty/sort/util/SourceInfoUtils.java      | 11 +++++++++++
 11 files changed, 62 insertions(+), 16 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index d23b40d..43e8718 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -62,4 +62,7 @@ public class MySQLBinlogSource extends StreamSource {
 
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
+
+    @ApiModelProperty("Need transfer total database")
+    private boolean allMigration = false;
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 28bec82..82ae09e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -86,6 +86,7 @@ public class InlongStreamSourceTransfer {
                 binlogSourceResponse.getUser(),
                 binlogSourceResponse.getPassword());
         binlogSource.setAuthentication(defaultAuthentication);
+        binlogSource.setAllMigration(binlogSourceResponse.isAllMigration());
         binlogSource.setTimeZone(binlogSourceResponse.getTimeZone());
         binlogSource.setTimestampFormatStandard(binlogSourceResponse.getTimestampFormatStandard());
         List<String> dbs = Splitter.on(",").splitToList(binlogSourceResponse.getWhitelist());
@@ -121,6 +122,7 @@ public class InlongStreamSourceTransfer {
         binlogSourceRequest.setPassword(authentication.getPassword());
         binlogSourceRequest.setHostname(binlogSource.getHostname());
         binlogSourceRequest.setPort(binlogSource.getPort());
+        binlogSourceRequest.setAllMigration(binlogSource.isAllMigration());
         String dbNames = Joiner.on(",").join(binlogSource.getDbNames());
         binlogSourceRequest.setWhitelist(dbNames);
         binlogSourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 93998e7..1dbcbed 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -23,7 +23,6 @@ import org.apache.inlong.common.enums.TaskTypeEnum;
 import java.util.Locale;
 
 public enum SourceType {
-    DATABASE_MIGRATION("DATABASE_MIGRATION",TaskTypeEnum.DATABASE_MIGRATION),
     FILE("FILE", TaskTypeEnum.FILE),
     SQL("SQL", TaskTypeEnum.SQL),
     BINLOG("BINLOG", TaskTypeEnum.BINLOG),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
index 388d952..eb888c1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
@@ -87,18 +87,23 @@ public class BinlogSourceDTO {
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
 
+    @ApiModelProperty("Need transfer total database")
+    private boolean allMigration;
+
     /**
      * Get the dto instance from the request
      */
     public static BinlogSourceDTO getFromRequest(BinlogSourceRequest request) {
         return BinlogSourceDTO.builder()
                 .user(request.getUser())
+                .port(request.getPort())
                 .password(request.getPassword())
                 .hostname(request.getHostname())
                 .whitelist(request.getWhitelist())
                 .timeZone(request.getTimeZone())
                 .intervalMs(request.getIntervalMs())
                 .snapshotMode(request.getSnapshotMode())
+                .allMigration(request.isAllMigration())
                 .storeHistoryFilename(request.getStoreHistoryFilename())
                 .build();
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index 3b960e6..0902564 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -61,4 +61,6 @@ public class BinlogSourceListResponse extends SourceListResponse {
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
 
+    @ApiModelProperty("Need transfer total database")
+    private boolean allMigration;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index d1aa653..2b4b1a1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -89,4 +89,7 @@ public class BinlogSourceRequest extends SourceRequest {
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
 
+    @ApiModelProperty("Need transfer total database")
+    private boolean allMigration = false;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index ccf25d1..8941bc1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -69,4 +69,6 @@ public class BinlogSourceResponse extends SourceResponse {
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
 
+    @ApiModelProperty("Need transfer total database")
+    private boolean allMigration;
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index fd2b8ae..e135a15 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -23,6 +23,7 @@ import com.google.gson.GsonBuilder;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.db.CommandEntity;
+import org.apache.inlong.common.enums.TaskTypeEnum;
 import org.apache.inlong.common.pojo.agent.CmdConfig;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
@@ -40,6 +41,7 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo.CommandInfoBean;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskConfig;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceDTO;
 import org.apache.inlong.manager.dao.entity.DataSourceCmdConfigEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
@@ -52,6 +54,7 @@ import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.service.core.AgentService;
 import org.apache.inlong.manager.service.source.SourceSnapshotOperation;
+import org.apache.inlong.manager.service.source.binlog.BinlogStreamSourceOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -84,6 +87,8 @@ public class AgentServiceImpl implements AgentService {
     private InlongStreamFieldEntityMapper streamFieldMapper;
     @Autowired
     private InlongStreamEntityMapper streamMapper;
+    @Autowired
+    private BinlogStreamSourceOperation binlogStreamSourceOperation;
 
     /**
      * If the reported task time and the modification time in the database exceed this value,
@@ -122,8 +127,7 @@ public class AgentServiceImpl implements AgentService {
         for (StreamSourceEntity entity : entityList) {
             DataConfig dataConfig = new DataConfig();
             dataConfig.setTaskId(entity.getId());
-            SourceType sourceType = SourceType.forType(entity.getSourceType());
-            dataConfig.setTaskType(sourceType.getTaskType().getType());
+            dataConfig.setTaskType(getTaskType(entity));
             dataConfig.setTaskName(entity.getSourceName());
             dataConfig.setOp(String.valueOf(entity.getStatus() % 100));
             dataConfig.setIp(entity.getAgentIp());
@@ -145,6 +149,21 @@ public class AgentServiceImpl implements AgentService {
         return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
     }
 
+    private int getTaskType(StreamSourceEntity sourceEntity) {
+        SourceType sourceType = SourceType.forType(sourceEntity.getSourceType());
+        if (sourceType != SourceType.BINLOG) {
+            return sourceType.getTaskType().getType();
+        } else {
+            BinlogSourceDTO binlogSourceDTO = binlogStreamSourceOperation.getFromEntity(sourceEntity,
+                    BinlogSourceDTO::new);
+            if (binlogSourceDTO.isAllMigration()) {
+                return TaskTypeEnum.DATABASE_MIGRATION.getType();
+            } else {
+                return sourceType.getTaskType().getType();
+            }
+        }
+    }
+
     /**
      * Update the task status by the request
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 0b91088..2e4e227 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -29,7 +29,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.Constant;
-import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
@@ -168,19 +167,20 @@ public class CreateSortConfigListener implements SortOperateListener {
         String middleWareType = groupInfo.getMiddlewareType();
 
         List<FieldInfo> fieldInfos = Lists.newArrayList();
-        if (SourceType.DATABASE_MIGRATION.getType().equalsIgnoreCase(sourceResponse.getSourceType())) {
+        if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
             fieldInfos.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
                     BuiltInField.MYSQL_METADATA_DATA));
-        }
 
-        if (!SourceType.DATABASE_MIGRATION.getType().equalsIgnoreCase(sourceResponse.getSourceType())
-                && CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
-            fieldInfos = streamInfo.getFieldList().stream().map(inlongStreamFieldInfo -> {
-                FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(
-                        inlongStreamFieldInfo.getFieldType().toLowerCase());
-                return new FieldInfo(inlongStreamFieldInfo.getFieldName(), formatInfo);
-            }).collect(Collectors.toList());
+        } else {
+            if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
+                fieldInfos = streamInfo.getFieldList().stream().map(inlongStreamFieldInfo -> {
+                    FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(
+                            inlongStreamFieldInfo.getFieldType().toLowerCase());
+                    return new FieldInfo(inlongStreamFieldInfo.getFieldName(), formatInfo);
+                }).collect(Collectors.toList());
+            }
         }
+
         DeserializationInfo deserializationInfo = SerializationUtils.createDeserializationInfo(sourceResponse,
                 streamInfo);
         if (Constant.MIDDLEWARE_PULSAR.equals(middleWareType)) {
@@ -191,6 +191,7 @@ public class CreateSortConfigListener implements SortOperateListener {
             throw new RuntimeException(
                     String.format("MiddleWare:{} not support in CreateSortConfigListener", middleWareType));
         }
+
     }
 
     private PulsarSourceInfo createPulsarSourceInfo(InlongGroupInfo groupInfo,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index 2ea0f4c..0ea1599 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -27,7 +27,6 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.SinkType;
-import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
@@ -89,7 +88,7 @@ public class SinkInfoUtils {
     private static KafkaSinkInfo createKafkaSinkInfo(InlongStreamInfo inlongStreamInfo, SourceResponse sourceResponse,
             KafkaSinkResponse kafkaSinkResponse) {
         List<FieldInfo> fieldInfoList = Lists.newArrayList();
-        if (SourceType.DATABASE_MIGRATION.getType().equalsIgnoreCase(sourceResponse.getSourceType())) {
+        if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
             fieldInfoList.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
                     BuiltInField.MYSQL_METADATA_DATA));
         } else {
@@ -159,7 +158,7 @@ public class SinkInfoUtils {
 
         // Get the sink field, if there is no partition field in the source field, add the partition field to the end
         List<FieldInfo> fieldInfoList = Lists.newArrayList();
-        if (SourceType.DATABASE_MIGRATION.getType().equalsIgnoreCase(sourceResponse.getSourceType())) {
+        if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
             fieldInfoList.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
                     BuiltInField.MYSQL_METADATA_DATA));
         } else {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 023d025..2027f20 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -20,8 +20,11 @@ package org.apache.inlong.manager.service.thirdparty.sort.util;
 import java.util.List;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
@@ -29,6 +32,14 @@ import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
 
 public class SourceInfoUtils {
 
+    public static boolean isBinlogMigrationSource(SourceResponse sourceResponse) {
+        if (SourceType.BINLOG.getType().equalsIgnoreCase(sourceResponse.getSourceType())) {
+            BinlogSourceResponse binlogSourceResponse = (BinlogSourceResponse) sourceResponse;
+            return binlogSourceResponse.isAllMigration();
+        }
+        return false;
+    }
+
     public static PulsarSourceInfo createPulsarSourceInfo(InlongGroupInfo groupInfo, String pulsarTopic,
                                                           DeserializationInfo deserializationInfo,
                                                           List<FieldInfo> fieldInfos, String appName,String tenant,