You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 08:02:07 UTC

[inlong] 02/07: [INLONG-7151][Manager] Fix failure to create node when init sort (#7152)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 871027aa47d7ccde99a326266da81b73d310bef0
Author: haifxu <xh...@gmail.com>
AuthorDate: Thu Jan 5 09:49:52 2023 +0800

    [INLONG-7151][Manager] Fix failure to create node when init sort (#7152)
---
 .../inlong/manager/service/sink/mysql/MySQLSinkOperator.java   | 10 ++++++++++
 .../manager/service/source/kafka/KafkaSourceOperator.java      |  6 +++++-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
index cafa79125..a81b9ff34 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
@@ -18,9 +18,11 @@
 package org.apache.inlong.manager.service.sink.mysql;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -81,6 +83,14 @@ public class MySQLSinkOperator extends AbstractSinkOperator {
         }
 
         MySQLSinkDTO dto = MySQLSinkDTO.getFromJson(entity.getExtParams());
+        if (StringUtils.isBlank(dto.getJdbcUrl())) {
+            String dataNodeName = entity.getDataNodeName();
+            Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not specified and data node is empty");
+            DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(dataNodeName, entity.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+            dto.setJdbcUrl(dataNodeInfo.getUrl());
+            dto.setPassword(dataNodeInfo.getToken());
+        }
         CommonBeanUtils.copyProperties(entity, sink, true);
         CommonBeanUtils.copyProperties(dto, sink, true);
         List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 6b6dff0cb..686b81c39 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source.kafka;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
@@ -118,7 +119,10 @@ public class KafkaSourceOperator extends AbstractSourceOperator {
                 if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) {
                     continue;
                 }
-                kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+                if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && StringUtils.isNotEmpty(
+                        sourceInfo.getSerializationType())) {
+                    kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+                }
             }
 
             kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());