You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 01:49:57 UTC
[inlong] branch master updated: [INLONG-7151][Manager] Fix failure to create node when init sort (#7152)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 15da36a0e [INLONG-7151][Manager] Fix failure to create node when init sort (#7152)
15da36a0e is described below
commit 15da36a0ef58f1401bd558ee80d7814b890739fa
Author: haifxu <xh...@gmail.com>
AuthorDate: Thu Jan 5 09:49:52 2023 +0800
[INLONG-7151][Manager] Fix failure to create node when init sort (#7152)
---
.../inlong/manager/service/sink/mysql/MySQLSinkOperator.java | 10 ++++++++++
.../manager/service/source/kafka/KafkaSourceOperator.java | 6 +++++-
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
index cafa79125..a81b9ff34 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
@@ -18,9 +18,11 @@
package org.apache.inlong.manager.service.sink.mysql;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -81,6 +83,14 @@ public class MySQLSinkOperator extends AbstractSinkOperator {
}
MySQLSinkDTO dto = MySQLSinkDTO.getFromJson(entity.getExtParams());
+ if (StringUtils.isBlank(dto.getJdbcUrl())) {
+ String dataNodeName = entity.getDataNodeName();
+ Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not specified and data node is empty");
+ DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(dataNodeName, entity.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+ dto.setJdbcUrl(dataNodeInfo.getUrl());
+ dto.setPassword(dataNodeInfo.getToken());
+ }
CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 6b6dff0cb..686b81c39 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
@@ -118,7 +119,10 @@ public class KafkaSourceOperator extends AbstractSourceOperator {
if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) {
continue;
}
- kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+ if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && StringUtils.isNotEmpty(
+ sourceInfo.getSerializationType())) {
+ kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+ }
}
kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());