You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 02:00:13 UTC

[inlong] 01/02: [INLONG-7130][Manager] Fix null JDBC URL when mysql stream sink init (#7132)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit aa6c0809927ad60e56f875158fa58ca61cb1b595
Author: haifxu <xh...@gmail.com>
AuthorDate: Tue Jan 3 18:58:25 2023 +0800

    [INLONG-7130][Manager] Fix null JDBC URL when mysql stream sink init (#7132)
---
 .../resource/sink/mysql/MySQLResourceOperator.java | 24 +++++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
index 6c859cf2a..fb834e3fa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
@@ -18,16 +18,21 @@
 package org.apache.inlong.manager.service.resource.sink.mysql;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.slf4j.Logger;
@@ -53,6 +58,9 @@ public class MySQLResourceOperator implements SinkResourceOperator {
     @Autowired
     private StreamSinkFieldEntityMapper fieldEntityMapper;
 
+    @Autowired
+    private DataNodeOperateHelper dataNodeHelper;
+
     @Override
     public Boolean accept(String sinkType) {
         return SinkType.MYSQL.equals(sinkType);
@@ -90,7 +98,7 @@ public class MySQLResourceOperator implements SinkResourceOperator {
             columnList.add(columnInfo);
         }
 
-        MySQLSinkDTO sinkDTO = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
+        MySQLSinkDTO sinkDTO = this.getMysqlInfo(sinkInfo);
         MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(sinkDTO, columnList);
         try (Connection conn = MySQLJdbcUtils.getConnection(sinkDTO.getJdbcUrl(), sinkDTO.getUsername(),
                 sinkDTO.getPassword())) {
@@ -114,4 +122,18 @@ public class MySQLResourceOperator implements SinkResourceOperator {
         LOG.info("success create MySQL table for data sink [" + sinkInfo.getId() + "]");
     }
 
+    private MySQLSinkDTO getMysqlInfo(SinkInfo sinkInfo) {
+        MySQLSinkDTO mysqlInfo = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+        if (StringUtils.isBlank(mysqlInfo.getJdbcUrl())) {
+            String dataNodeName = sinkInfo.getDataNodeName();
+            Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not specified and data node is empty");
+            DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(dataNodeName, sinkInfo.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, mysqlInfo);
+            mysqlInfo.setJdbcUrl(dataNodeInfo.getUrl());
+            mysqlInfo.setPassword(dataNodeInfo.getToken());
+        }
+        return mysqlInfo;
+    }
+
 }
\ No newline at end of file