You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/04 02:00:13 UTC
[inlong] 01/02: [INLONG-7130][Manager] Fix null JDBC URL when mysql stream sink init (#7132)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit aa6c0809927ad60e56f875158fa58ca61cb1b595
Author: haifxu <xh...@gmail.com>
AuthorDate: Tue Jan 3 18:58:25 2023 +0800
[INLONG-7130][Manager] Fix null JDBC URL when mysql stream sink init (#7132)
---
.../resource/sink/mysql/MySQLResourceOperator.java | 24 +++++++++++++++++++++-
1 file changed, 23 insertions(+), 1 deletion(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
index 6c859cf2a..fb834e3fa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java
@@ -18,16 +18,21 @@
package org.apache.inlong.manager.service.resource.sink.mysql;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
@@ -53,6 +58,9 @@ public class MySQLResourceOperator implements SinkResourceOperator {
@Autowired
private StreamSinkFieldEntityMapper fieldEntityMapper;
+ @Autowired
+ private DataNodeOperateHelper dataNodeHelper;
+
@Override
public Boolean accept(String sinkType) {
return SinkType.MYSQL.equals(sinkType);
@@ -90,7 +98,7 @@ public class MySQLResourceOperator implements SinkResourceOperator {
columnList.add(columnInfo);
}
- MySQLSinkDTO sinkDTO = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
+ MySQLSinkDTO sinkDTO = this.getMysqlInfo(sinkInfo);
MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(sinkDTO, columnList);
try (Connection conn = MySQLJdbcUtils.getConnection(sinkDTO.getJdbcUrl(), sinkDTO.getUsername(),
sinkDTO.getPassword())) {
@@ -114,4 +122,18 @@ public class MySQLResourceOperator implements SinkResourceOperator {
LOG.info("success create MySQL table for data sink [" + sinkInfo.getId() + "]");
}
+ private MySQLSinkDTO getMysqlInfo(SinkInfo sinkInfo) {
+ MySQLSinkDTO mysqlInfo = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+ if (StringUtils.isBlank(mysqlInfo.getJdbcUrl())) {
+ String dataNodeName = sinkInfo.getDataNodeName();
+ Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not specified and data node is empty");
+ DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(dataNodeName, sinkInfo.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, mysqlInfo);
+ mysqlInfo.setJdbcUrl(dataNodeInfo.getUrl());
+ mysqlInfo.setPassword(dataNodeInfo.getToken());
+ }
+ return mysqlInfo;
+ }
+
}
\ No newline at end of file