You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/03/09 09:33:12 UTC
[incubator-inlong] branch master updated: [INLONG-3012][Manager] Support built-in field for source and sink info (#3016)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cb8500e [INLONG-3012][Manager] Support built-in field for source and sink info (#3016)
cb8500e is described below
commit cb8500e913fa65c3674822fda2f67b7287c04e1f
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 9 17:33:06 2022 +0800
[INLONG-3012][Manager] Support built-in field for source and sink info (#3016)
---
.../inlong/manager/common/enums/Constant.java | 1 +
.../common/pojo/stream/InlongStreamInfo.java | 7 +-
.../thirdparty/sort/CreateSortConfigListener.java | 168 +++++++--------------
.../thirdparty/sort/PushSortConfigListener.java | 105 +++++++------
.../thirdparty/sort/ZkDisabledEventSelector.java | 5 +-
.../thirdparty/sort/ZkEnabledEventSelector.java | 5 +-
.../thirdparty/sort/util/SerializationUtils.java | 74 +++++----
.../thirdparty/sort/util/SinkInfoUtils.java | 68 ++++-----
.../thirdparty/sort/util/SourceInfoUtils.java | 163 +++++++++++++++++---
9 files changed, 331 insertions(+), 265 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 76e8f93..dc04265 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -23,6 +23,7 @@ package org.apache.inlong.manager.common.enums;
public class Constant {
public static final Integer UN_DELETED = 0;
+ public static final Integer IS_DELETED = 1;
public static final String SOURCE_FILE = "FILE";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
index 7f2724b..b82cb37 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
@@ -20,11 +20,12 @@ package org.apache.inlong.manager.common.pojo.stream;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
-import java.util.List;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import java.util.Date;
+import java.util.List;
+
/**
* Inlong stream info
*/
@@ -52,7 +53,7 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
@ApiModelProperty(value = "Data storage period, unit: day (required when dataSourceType=AUTO_PUSH)")
private Integer storagePeriod;
- @ApiModelProperty(value = "Data type, only support: TEXT")
+ @ApiModelProperty(value = "Data type, including: TEXT, KV, etc.")
private String dataType;
@ApiModelProperty(value = "Data encoding format: UTF-8, GBK (required when dataSourceType=FILE, AUTO_PUSH)")
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 0b439eb..969dd69 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -17,22 +17,17 @@
package org.apache.inlong.manager.service.thirdparty.sort;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
@@ -42,42 +37,43 @@ import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessF
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.service.thirdparty.sort.util.SerializationUtils;
import org.apache.inlong.manager.service.thirdparty.sort.util.SinkInfoUtils;
-import org.apache.inlong.manager.service.thirdparty.sort.util.SortFieldFormatUtils;
import org.apache.inlong.manager.service.thirdparty.sort.util.SourceInfoUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
-import org.apache.inlong.sort.formats.common.FormatInfo;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.DataFlowInfo;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
import org.apache.inlong.sort.protocol.sink.SinkInfo;
import org.apache.inlong.sort.protocol.source.SourceInfo;
-import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-@Slf4j
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Create sort config when disable the ZooKeeper
+ */
@Component
public class CreateSortConfigListener implements SortOperateListener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CreateSortConfigListener.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
+
@Autowired
private CommonOperateService commonOperateService;
@Autowired
private ClusterBean clusterBean;
@Autowired
- private InlongStreamService inlongStreamService;
+ private InlongStreamService streamService;
@Autowired
private StreamSinkService streamSinkService;
@Autowired
@@ -90,33 +86,28 @@ public class CreateSortConfigListener implements SortOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
- log.info("Create sort config for context={}", context);
+ LOGGER.info("Create sort config for context={}", context);
ProcessForm form = context.getProcessForm();
- InlongGroupInfo groupInfo = getGroupInfo(form);
+ InlongGroupInfo groupInfo = this.getGroupInfo(form);
String groupId = groupInfo.getInlongGroupId();
if (StringUtils.isEmpty(groupId)) {
- log.warn("GroupId is null for context={}", context);
+ LOGGER.warn("GroupId is null for context={}", context);
return ListenerResult.success();
}
- List<StreamBriefResponse> streamBriefResponses = inlongStreamService.getBriefList(groupId);
+
+ List<StreamBriefResponse> streamBriefResponses = streamService.getBriefList(groupId);
if (CollectionUtils.isEmpty(streamBriefResponses)) {
- log.warn("Stream not found by groupId={}", groupId);
+ LOGGER.warn("Stream not found by groupId={}", groupId);
return ListenerResult.success();
}
- Map<String, DataFlowInfo> dataFlowInfoMap = streamBriefResponses.stream().map(streamBriefResponse -> {
- DataFlowInfo flowInfo = createDataFlow(streamBriefResponse, groupInfo);
- if (flowInfo != null) {
- return Pair.of(streamBriefResponse.getInlongStreamId(), flowInfo);
- } else {
- return null;
- }
- }
- ).filter(pair -> pair != null)
- .collect(Collectors.toMap(pair -> pair.getKey(),
- pair -> pair.getValue()));
- final ObjectMapper objectMapper = new ObjectMapper();
- String dataFlows = objectMapper.writeValueAsString(dataFlowInfoMap);
+ Map<String, DataFlowInfo> dataFlowInfoMap = streamBriefResponses.stream().map(streamResponse -> {
+ DataFlowInfo flowInfo = createDataFlow(streamResponse, groupInfo);
+ return Pair.of(streamResponse.getInlongStreamId(), flowInfo);
+ }
+ ).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ String dataFlows = OBJECT_MAPPER.writeValueAsString(dataFlowInfoMap);
InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
extInfo.setInlongGroupId(groupId);
extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
@@ -125,101 +116,46 @@ public class CreateSortConfigListener implements SortOperateListener {
groupInfo.setExtList(Lists.newArrayList());
}
upsertDataFlow(groupInfo, extInfo);
+
return ListenerResult.success();
}
private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
- Iterator<InlongGroupExtInfo> inlongGroupExtInfoIterator = groupInfo.getExtList().iterator();
- while (inlongGroupExtInfoIterator.hasNext()) {
- InlongGroupExtInfo inlongGroupExtInfo = inlongGroupExtInfoIterator.next();
- if (InlongGroupSettings.DATA_FLOW.equals(inlongGroupExtInfo.getKeyName())) {
- inlongGroupExtInfoIterator.remove();
- }
- }
+ groupInfo.getExtList().removeIf(ext -> InlongGroupSettings.DATA_FLOW.equals(ext.getKeyName()));
groupInfo.getExtList().add(extInfo);
}
- private DataFlowInfo createDataFlow(StreamBriefResponse streamBriefResponse,
- InlongGroupInfo inlongGroupInfo) {
- //TODO only support one source and one sink
- final String groupId = streamBriefResponse.getInlongGroupId();
- final String streamId = streamBriefResponse.getInlongStreamId();
- final InlongStreamInfo streamInfo = inlongStreamService.get(groupId, streamId);
+ private DataFlowInfo createDataFlow(StreamBriefResponse streamResponse, InlongGroupInfo groupInfo) {
+ // TODO only support one source and one sink
+ final String groupId = streamResponse.getInlongGroupId();
+ final String streamId = streamResponse.getInlongStreamId();
+ final InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
List<SourceResponse> sourceResponses = streamSourceService.listSource(groupId, streamId);
if (CollectionUtils.isEmpty(sourceResponses)) {
- throw new RuntimeException(String.format("No source found by stream=%s", streamBriefResponse));
+ throw new WorkflowListenerException(String.format("Source not found by groupId=%s and streamId=%s",
+ groupId, streamId));
}
final SourceResponse sourceResponse = sourceResponses.get(0);
- List<SinkBriefResponse> sinkBriefResponses = streamBriefResponse.getSinkList();
+ List<SinkBriefResponse> sinkBriefResponses = streamResponse.getSinkList();
if (CollectionUtils.isEmpty(sinkBriefResponses)) {
- throw new RuntimeException(String.format("No sink found by stream=%s", streamBriefResponse));
+ throw new WorkflowListenerException(String.format("Sink not found by groupId=%s and streamId=%s",
+ groupId, streamId));
}
+
final SinkBriefResponse sinkBriefResponse = sinkBriefResponses.get(0);
String sinkType = sinkBriefResponse.getSinkType();
int sinkId = sinkBriefResponse.getId();
final SinkResponse sinkResponse = streamSinkService.get(sinkId, sinkType);
- SourceInfo sourceInfo = createSourceInfo(inlongGroupInfo, streamInfo, sourceResponse);
- SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(streamInfo, sourceResponse, sinkResponse);
- return new DataFlowInfo(sinkId, sourceInfo, sinkInfo);
- }
- private SourceInfo createSourceInfo(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
- SourceResponse sourceResponse) {
- String middleWareType = groupInfo.getMiddlewareType();
-
- List<FieldInfo> fieldInfos = Lists.newArrayList();
- if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
- fieldInfos.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
- BuiltInField.MYSQL_METADATA_DATA));
-
- } else {
- if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
- fieldInfos = streamInfo.getFieldList().stream().map(inlongStreamFieldInfo -> {
- FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(
- inlongStreamFieldInfo.getFieldType().toLowerCase());
- return new FieldInfo(inlongStreamFieldInfo.getFieldName(), formatInfo);
- }).collect(Collectors.toList());
- }
- }
-
- DeserializationInfo deserializationInfo = SerializationUtils.createDeserializationInfo(sourceResponse,
- streamInfo);
- if (Constant.MIDDLEWARE_PULSAR.equals(middleWareType)) {
- return createPulsarSourceInfo(groupInfo, streamInfo, deserializationInfo, fieldInfos);
- } else if (Constant.MIDDLEWARE_TUBE.equals(middleWareType)) {
- return createTubeSourceInfo(groupInfo, deserializationInfo, fieldInfos);
- } else {
- throw new RuntimeException(
- String.format("MiddleWare:{} not support in CreateSortConfigListener", middleWareType));
- }
-
- }
-
- private SourceInfo createPulsarSourceInfo(InlongGroupInfo groupInfo,
- InlongStreamInfo streamInfo,
- DeserializationInfo deserializationInfo,
- List<FieldInfo> fieldInfos) {
- String topicName = streamInfo.getMqResourceObj();
- PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
- InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupInfo.getMqExtInfo();
- String tenant = clusterBean.getDefaultTenant();
- if (StringUtils.isNotEmpty(pulsarInfo.getTenant())) {
- tenant = pulsarInfo.getTenant();
- }
- return SourceInfoUtils.createPulsarSourceInfo(groupInfo, topicName, deserializationInfo,
- fieldInfos, clusterBean.getAppName(), pulsarClusterInfo, tenant);
- }
-
- private TubeSourceInfo createTubeSourceInfo(InlongGroupInfo groupInfo,
- DeserializationInfo deserializationInfo,
- List<FieldInfo> fieldInfos) {
+ // Get source info
String masterAddress = commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL);
- Preconditions.checkNotNull(masterAddress, "tube cluster address cannot be empty");
- String topic = groupInfo.getMqResourceObj();
- // The consumer group name is: taskName_topicName_consumer_group
- String consumerGroup = clusterBean.getAppName() + "_" + topic + "_consumer_group";
- return new TubeSourceInfo(topic, masterAddress, consumerGroup,
- deserializationInfo, fieldInfos.toArray(new FieldInfo[0]));
+ PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo();
+ SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean, groupInfo,
+ streamInfo, sourceResponse, sinkResponse);
+ // Get sink info
+ SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sourceResponse, sinkResponse);
+
+ return new DataFlowInfo(sinkId, sourceInfo, sinkInfo);
}
private InlongGroupInfo getGroupInfo(ProcessForm processForm) {
@@ -230,9 +166,9 @@ public class CreateSortConfigListener implements SortOperateListener {
UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
return updateGroupProcessForm.getGroupInfo();
} else {
- log.error("Illegal ProcessForm {} to get inlong group info", processForm.getFormName());
- throw new RuntimeException(String.format("Unsupport ProcessForm {} in CreateSortConfigListener",
- processForm.getFormName()));
+ LOGGER.error("Illegal ProcessForm {} to get inlong group info", processForm.getFormName());
+ throw new WorkflowListenerException(
+ String.format("Unsupported ProcessForm {%s}", processForm.getFormName()));
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
index deaea4a..5dc1c34 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
@@ -17,16 +17,16 @@
package org.apache.inlong.manager.service.thirdparty.sort;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
-import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.util.JsonUtils;
@@ -38,32 +38,33 @@ import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.thirdparty.sort.util.SinkInfoUtils;
-import org.apache.inlong.manager.service.thirdparty.sort.util.SortFieldFormatUtils;
import org.apache.inlong.manager.service.thirdparty.sort.util.SourceInfoUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.inlong.sort.ZkTools;
-import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.protocol.DataFlowInfo;
-import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
import org.apache.inlong.sort.protocol.sink.SinkInfo;
import org.apache.inlong.sort.protocol.source.SourceInfo;
-import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.ArrayList;
import java.util.List;
-@Slf4j
+/**
+ * Push sort config when enable the ZooKeeper
+ */
@Component
public class PushSortConfigListener implements SortOperateListener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PushSortConfigListener.class);
private static final String DATA_FLOW_GROUP_ID_KEY = "inlong.group.id";
@Autowired
@@ -75,6 +76,8 @@ public class PushSortConfigListener implements SortOperateListener {
@Autowired
private InlongStreamService streamService;
@Autowired
+ private StreamSourceService streamSourceService;
+ @Autowired
private StreamSinkService streamSinkService;
@Autowired
private StreamSinkFieldEntityMapper streamSinkFieldMapper;
@@ -86,36 +89,36 @@ public class PushSortConfigListener implements SortOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- if (log.isDebugEnabled()) {
- log.debug("begin push hive config to sort, context={}", context);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("begin to push sort config by context={}", context);
}
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
InlongGroupInfo groupInfo = form.getGroupInfo();
String groupId = groupInfo.getInlongGroupId();
-
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
- if (groupEntity == null || EntityStatus.IS_DELETED.getCode().equals(groupEntity.getIsDeleted())) {
- log.warn("skip to push sort hive config for groupId={}, as biz not exists or has been deleted", groupId);
+ if (groupEntity == null || Constant.IS_DELETED.equals(groupEntity.getIsDeleted())) {
+ LOGGER.warn("skip to push sort config for groupId={}, as biz not exists or has been deleted", groupId);
return ListenerResult.success();
}
// if streamId not null, just push the config belongs to the groupId and the streamId
String streamId = form.getInlongStreamId();
List<SinkResponse> sinkResponses = streamSinkService.listSink(groupId, streamId);
- for (SinkResponse sinkResponse : sinkResponses) {
- Integer sinkId = sinkResponse.getId();
- if (log.isDebugEnabled()) {
- log.debug("sink info: {}", sinkResponse);
+ for (SinkResponse sinkResponse : sinkResponses) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("sink info: {}", sinkResponse);
}
DataFlowInfo dataFlowInfo = getDataFlowInfo(groupInfo, sinkResponse);
// add extra properties for flow info
dataFlowInfo.getProperties().put(DATA_FLOW_GROUP_ID_KEY, groupId);
- if (log.isDebugEnabled()) {
- log.debug("try to push hive config to sort: {}", JsonUtils.toJson(dataFlowInfo));
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("try to push config to sort: {}", JsonUtils.toJson(dataFlowInfo));
}
+
+ Integer sinkId = sinkResponse.getId();
try {
String zkUrl = clusterBean.getZkUrl();
String zkRoot = clusterBean.getZkRoot();
@@ -125,8 +128,8 @@ public class PushSortConfigListener implements SortOperateListener {
// add sink id to zk
ZkTools.addDataFlowToCluster(sortClusterName, sinkId, zkUrl, zkRoot);
} catch (Exception e) {
- log.error("add or update inlong stream information to zk failed, sinkId={} ", sinkId, e);
- throw new WorkflowListenerException("push hive config to sort failed, reason: " + e.getMessage());
+ LOGGER.error("push sort config to zookeeper failed, sinkId={} ", sinkId, e);
+ throw new WorkflowListenerException("push sort config to zookeeper failed: " + e.getMessage());
}
}
@@ -139,25 +142,37 @@ public class PushSortConfigListener implements SortOperateListener {
List<StreamSinkFieldEntity> fieldList = streamSinkFieldMapper.selectFields(groupId, streamId);
if (fieldList == null || fieldList.size() == 0) {
- throw new WorkflowListenerException("no hive fields for groupId=" + groupId + ", streamId=" + streamId);
+ throw new WorkflowListenerException(
+ String.format("no fields for groupId=%s streamId=%s", groupId, streamId));
}
- SourceInfo sourceInfo = getSourceInfo(groupInfo, sinkResponse, fieldList);
- SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sinkResponse);
+ List<SourceResponse> sourceList = streamSourceService.listSource(groupId, streamId);
+ if (CollectionUtils.isEmpty(sourceList)) {
+ throw new WorkflowListenerException(String.format("Source not found by groupId=%s and streamId=%s",
+ groupId, streamId));
+ }
- // push information
+ String masterAddress = commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL);
+ PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo();
+ InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
+ final SourceResponse sourceResponse = sourceList.get(0);
+ SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean, groupInfo,
+ streamInfo, sourceResponse, sinkResponse);
+
+ SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sourceResponse, sinkResponse);
return new DataFlowInfo(sinkResponse.getId(), sourceInfo, sinkInfo);
}
/**
- * Get source info
+ * Get source info for DataFlowInfo
*/
- private SourceInfo getSourceInfo(InlongGroupInfo groupInfo,
- SinkResponse hiveResponse, List<StreamSinkFieldEntity> fieldList) {
+ @Deprecated
+ private SourceInfo getSourceInfo(InlongGroupInfo groupInfo, String streamId,
+ List<StreamSinkFieldEntity> fieldList) {
DeserializationInfo deserializationInfo = null;
String groupId = groupInfo.getInlongGroupId();
- String streamId = hiveResponse.getInlongStreamId();
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
+
boolean isDbType = Constant.DATA_SOURCE_DB.equals(streamInfo.getDataType());
if (!isDbType) {
// FILE and auto push source, the data format is TEXT or KEY-VALUE, temporarily use InLongMsgCsv
@@ -172,15 +187,14 @@ public class PushSortConfigListener implements SortOperateListener {
escape = info.getDataEscapeChar().charAt(0);
}*/
// Whether to delete the first separator, the default is false for the time being
- deserializationInfo = new InLongMsgCsvDeserializationInfo(hiveResponse.getInlongStreamId(), separator);
+ deserializationInfo = new InLongMsgCsvDeserializationInfo(streamId, separator);
}
}
// The number and order of the source fields must be the same as the target fields
SourceInfo sourceInfo = null;
// Get the source field, if there is no partition field in source, add the partition field to the end
- List<FieldInfo> sourceFields = getSourceFields(fieldList);
-
+ // List<FieldInfo> sourceFields = getSourceFields(fieldList);
String middleWare = groupInfo.getMiddlewareType();
if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middleWare)) {
String masterAddress = commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL);
@@ -188,8 +202,8 @@ public class PushSortConfigListener implements SortOperateListener {
String topic = groupInfo.getMqResourceObj();
// The consumer group name is: taskName_topicName_consumer_group
String consumerGroup = clusterBean.getAppName() + "_" + topic + "_consumer_group";
- sourceInfo = new TubeSourceInfo(topic, masterAddress, consumerGroup,
- deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
+ // sourceInfo = new TubeSourceInfo(topic, masterAddress, consumerGroup,
+ // deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
} else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middleWare)) {
PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
String tenant = clusterBean.getDefaultTenant();
@@ -197,30 +211,13 @@ public class PushSortConfigListener implements SortOperateListener {
if (StringUtils.isNotEmpty(pulsarInfo.getTenant())) {
tenant = pulsarInfo.getTenant();
}
- sourceInfo = SourceInfoUtils.createPulsarSourceInfo(groupInfo, streamInfo.getMqResourceObj(),
- deserializationInfo, sourceFields, clusterBean.getAppName(),
- pulsarClusterInfo, tenant);
+ // sourceInfo = SourceInfoUtils.createPulsarSourceInfo(groupInfo, streamInfo.getMqResourceObj(),
+ // deserializationInfo, sourceFields, clusterBean.getAppName(),
+ // pulsarClusterInfo, tenant);
}
return sourceInfo;
}
- /**
- * Get source field list
- * TODO support BuiltInField
- */
- private List<FieldInfo> getSourceFields(List<StreamSinkFieldEntity> fieldList) {
- List<FieldInfo> fieldInfoList = new ArrayList<>();
- for (StreamSinkFieldEntity field : fieldList) {
- FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getSourceFieldType().toLowerCase());
- String fieldName = field.getSourceFieldName();
-
- FieldInfo fieldInfo = new FieldInfo(fieldName, formatInfo);
- fieldInfoList.add(fieldInfo);
- }
-
- return fieldInfoList;
- }
-
@Override
public boolean async() {
return false;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
index ddfca00..d6c529e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
@@ -17,7 +17,6 @@
package org.apache.inlong.manager.service.thirdparty.sort;
-import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -27,8 +26,10 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventSelector;
import org.springframework.stereotype.Component;
+/**
+ * Event selector for whether ZooKeeper is disabled.
+ */
@Component
-@Slf4j
public class ZkDisabledEventSelector implements EventSelector {
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
index ba80aec..a8ee6e3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
@@ -17,7 +17,6 @@
package org.apache.inlong.manager.service.thirdparty.sort;
-import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -26,8 +25,10 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventSelector;
import org.springframework.stereotype.Component;
+/**
+ * Event selector for whether ZooKeeper is enabled.
+ */
@Component
-@Slf4j
public class ZkEnabledEventSelector implements EventSelector {
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index f2c77d2..55d76f3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -38,62 +38,76 @@ import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
import org.springframework.util.Assert;
+/**
+ * Utils for Serialization and Deserialization info
+ */
public class SerializationUtils {
+ /**
+ * Create deserialization info
+ */
public static DeserializationInfo createDeserializationInfo(SourceResponse sourceResponse,
InlongStreamInfo streamInfo) {
SourceType sourceType = SourceType.forType(sourceResponse.getSourceType());
switch (sourceType) {
case BINLOG:
- return forBinlog((BinlogSourceResponse) sourceResponse, streamInfo);
+ return deserializeForBinlog((BinlogSourceResponse) sourceResponse);
case KAFKA:
- return forKafka((KafkaSourceResponse) sourceResponse, streamInfo);
+ return deserializeForKafka((KafkaSourceResponse) sourceResponse, streamInfo);
case FILE:
- return forFile(sourceResponse, streamInfo);
+ return deserializeForFile(sourceResponse, streamInfo);
default:
- throw new IllegalArgumentException(String.format("Unsupport sourceType for Inlong:%s", sourceType));
+ throw new IllegalArgumentException(String.format("Unsupported sourceType: %s", sourceType));
}
}
- public static SerializationInfo createSerializationInfo(SourceResponse sourceResponse, SinkResponse sinkResponse,
- InlongStreamInfo inlongStreamInfo) {
+ /**
+ * Create serialization info
+ */
+ public static SerializationInfo createSerializationInfo(SourceResponse sourceResponse, SinkResponse sinkResponse) {
SinkType sinkType = SinkType.forType(sinkResponse.getSinkType());
switch (sinkType) {
case HIVE:
return null;
case KAFKA:
- return forKafka(sourceResponse, (KafkaSinkResponse) sinkResponse, inlongStreamInfo);
+ return serializeForKafka(sourceResponse, (KafkaSinkResponse) sinkResponse);
default:
- throw new IllegalArgumentException(String.format("Unsupport sinkType for Inlong:%s", sinkType));
+ throw new IllegalArgumentException(String.format("Unsupported sinkType: %s", sinkType));
}
}
- public static DeserializationInfo forBinlog(BinlogSourceResponse binlogSourceResponse,
- InlongStreamInfo streamInfo) {
- return new DebeziumDeserializationInfo(true, binlogSourceResponse.getTimestampFormatStandard());
+ /**
+ * Get serialization info for Binlog
+ */
+ private static DeserializationInfo deserializeForBinlog(BinlogSourceResponse sourceResponse) {
+ return new DebeziumDeserializationInfo(true, sourceResponse.getTimestampFormatStandard());
}
- public static DeserializationInfo forKafka(KafkaSourceResponse kafkaSourceResponse,
- InlongStreamInfo streamInfo) {
- String serializationType = kafkaSourceResponse.getSerializationType();
+ /**
+ * Get deserialization info for Kafka
+ */
+ private static DeserializationInfo deserializeForKafka(KafkaSourceResponse source, InlongStreamInfo stream) {
+ String serializationType = source.getSerializationType();
DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
switch (dataType) {
case CSV:
- char seperator = streamInfo.getDataSeparator().toCharArray()[0];
- return new CsvDeserializationInfo(seperator);
+ char separator = stream.getDataSeparator().toCharArray()[0];
+ return new CsvDeserializationInfo(separator);
case AVRO:
return new AvroDeserializationInfo();
case JSON:
return new JsonDeserializationInfo();
default:
throw new IllegalArgumentException(
- String.format("Unsupport serializationType for Kafka source:%s", serializationType));
+ String.format("Unsupported serializationType for Kafka source: %s", serializationType));
}
}
- public static SerializationInfo forKafka(SourceResponse sourceResponse, KafkaSinkResponse kafkaSinkResponse,
- InlongStreamInfo streamInfo) {
- String serializationType = kafkaSinkResponse.getSerializationType();
+ /**
+ * Get serialization info for Kafka
+ */
+ private static SerializationInfo serializeForKafka(SourceResponse sourceResponse, KafkaSinkResponse sinkResponse) {
+ String serializationType = sinkResponse.getSerializationType();
DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
switch (dataType) {
case AVRO:
@@ -104,31 +118,33 @@ public class SerializationUtils {
return new CanalSerializationInfo();
case DEBEZIUM_JSON:
Assert.isInstanceOf(BinlogSourceResponse.class, sourceResponse,
- "Unsupport serializationType for Kafka;");
- BinlogSourceResponse binlogSourceResponse = (BinlogSourceResponse) sourceResponse;
- return new DebeziumSerializationInfo(binlogSourceResponse.getTimestampFormatStandard(),
+ "Unsupported serializationType for Kafka");
+ BinlogSourceResponse binlogSource = (BinlogSourceResponse) sourceResponse;
+ return new DebeziumSerializationInfo(binlogSource.getTimestampFormatStandard(),
"FAIL", "", false);
default:
throw new IllegalArgumentException(
- String.format("Unsupport serializationType for Kafka sink:%s", serializationType));
+ String.format("Unsupported serializationType for Kafka sink: %s", serializationType));
}
}
- public static DeserializationInfo forFile(SourceResponse sourceResponse,
- InlongStreamInfo streamInfo) {
+ /**
+ * Get deserialization info for File
+ */
+ private static DeserializationInfo deserializeForFile(SourceResponse sourceResponse, InlongStreamInfo streamInfo) {
String serializationType = sourceResponse.getSerializationType();
DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
switch (dataType) {
case CSV:
- char seperator = streamInfo.getDataSeparator().toCharArray()[0];
- return new CsvDeserializationInfo(seperator);
+ char separator = streamInfo.getDataSeparator().toCharArray()[0];
+ return new CsvDeserializationInfo(separator);
case AVRO:
return new AvroDeserializationInfo();
case JSON:
return new JsonDeserializationInfo();
default:
throw new IllegalArgumentException(
- String.format("Unsupport type for File source:%s", serializationType));
+ String.format("Unsupported type for File source:%s", serializationType));
}
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index 0583ab1..1edb0a8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -28,12 +28,8 @@ import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.sort.formats.common.FormatInfo;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
@@ -67,45 +63,39 @@ public class SinkInfoUtils {
PARTITION_TIME_UNIT_MAP.put("I", TimeUnit.MINUTES);
}
- public static SinkInfo createSinkInfo(SinkResponse sinkResponse) {
- return createSinkInfo(null, null, sinkResponse);
- }
-
+ /**
+ * Create sink info for DataFlowInfo.
+ */
public static SinkInfo createSinkInfo(SourceResponse sourceResponse, SinkResponse sinkResponse) {
- return createSinkInfo(null, sourceResponse, sinkResponse);
- }
-
- public static SinkInfo createSinkInfo(InlongStreamInfo inlongStreamInfo, SourceResponse sourceResponse,
- SinkResponse sinkResponse) {
+ boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
String sinkType = sinkResponse.getSinkType();
SinkInfo sinkInfo;
if (SinkType.forType(sinkType) == SinkType.HIVE) {
- sinkInfo = createHiveSinkInfo(sourceResponse, (HiveSinkResponse) sinkResponse);
+ sinkInfo = createHiveSinkInfo(isAllMigration, (HiveSinkResponse) sinkResponse);
} else if (SinkType.forType(sinkType) == SinkType.KAFKA) {
- sinkInfo = createKafkaSinkInfo(inlongStreamInfo, sourceResponse, (KafkaSinkResponse) sinkResponse);
+ sinkInfo = createKafkaSinkInfo(isAllMigration, sourceResponse, (KafkaSinkResponse) sinkResponse);
} else if (SinkType.forType(sinkType) == SinkType.CLICKHOUSE) {
sinkInfo = createClickhouseSinkInfo((ClickHouseSinkResponse) sinkResponse);
} else {
- throw new RuntimeException(
- String.format("SinkType:{} not support in CreateSortConfigListener", sinkType));
+ throw new RuntimeException(String.format("Unsupported SinkType {%s}", sinkType));
}
return sinkInfo;
}
private static ClickHouseSinkInfo createClickhouseSinkInfo(ClickHouseSinkResponse sinkResponse) {
if (StringUtils.isEmpty(sinkResponse.getJdbcUrl())) {
- throw new RuntimeException(String.format("clickHouseSink={} server url cannot be empty", sinkResponse));
+ throw new RuntimeException(String.format("ClickHouse={%s} server url cannot be empty", sinkResponse));
} else if (CollectionUtils.isEmpty(sinkResponse.getFieldList())) {
- throw new RuntimeException(String.format("clickHouseSink={} fields cannot be empty", sinkResponse));
+ throw new RuntimeException(String.format("ClickHouse={%s} fields cannot be empty", sinkResponse));
} else if (StringUtils.isEmpty(sinkResponse.getTableName())) {
- throw new RuntimeException(String.format("clickHouseSink={} table name cannot be empty", sinkResponse));
+ throw new RuntimeException(String.format("ClickHouse={%s} table name cannot be empty", sinkResponse));
} else if (StringUtils.isEmpty(sinkResponse.getDatabaseName())) {
- throw new RuntimeException(String.format("clickHouseSink={} database name cannot be empty", sinkResponse));
+ throw new RuntimeException(String.format("ClickHouse={%s} database name cannot be empty", sinkResponse));
}
if (sinkResponse.getDistributedTable() == null) {
- throw new RuntimeException(
- String.format("clickHouseSink={} distribute is or not cannot be empty", sinkResponse));
+ throw new RuntimeException(String.format("ClickHouse={%s} distribute cannot be empty", sinkResponse));
}
+
ClickHouseSinkInfo.PartitionStrategy partitionStrategy;
if (PartitionStrategy.BALANCE.name().equalsIgnoreCase(sinkResponse.getPartitionStrategy())) {
partitionStrategy = PartitionStrategy.BALANCE;
@@ -116,6 +106,7 @@ public class SinkInfoUtils {
} else {
partitionStrategy = PartitionStrategy.RANDOM;
}
+
List<FieldInfo> fieldInfoList = getClickHouseSinkFields(sinkResponse.getFieldList());
return new ClickHouseSinkInfo(sinkResponse.getJdbcUrl(), sinkResponse.getDatabaseName(),
sinkResponse.getTableName(), sinkResponse.getUsername(), sinkResponse.getPassword(),
@@ -125,28 +116,30 @@ public class SinkInfoUtils {
sinkResponse.getWriteMaxRetryTimes());
}
- private static KafkaSinkInfo createKafkaSinkInfo(InlongStreamInfo inlongStreamInfo, SourceResponse sourceResponse,
- KafkaSinkResponse kafkaSinkResponse) {
+ private static KafkaSinkInfo createKafkaSinkInfo(boolean isAllMigration, SourceResponse sourceResponse,
+ KafkaSinkResponse sinkResponse) {
List<FieldInfo> fieldInfoList = Lists.newArrayList();
- if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
- fieldInfoList.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
- BuiltInField.MYSQL_METADATA_DATA));
+ if (isAllMigration) {
+ fieldInfoList.add(SourceInfoUtils.getAllMigrationBuiltInField());
} else {
- fieldInfoList = getSinkFields(kafkaSinkResponse.getFieldList(), null);
+ fieldInfoList = getSinkFields(sinkResponse.getFieldList(), null);
}
- String addressUrl = kafkaSinkResponse.getAddress();
- String topicName = kafkaSinkResponse.getTopicName();
+ String addressUrl = sinkResponse.getAddress();
+ String topicName = sinkResponse.getTopicName();
SerializationInfo serializationInfo = SerializationUtils.createSerializationInfo(sourceResponse,
- kafkaSinkResponse, inlongStreamInfo);
+ sinkResponse);
return new KafkaSinkInfo(fieldInfoList.toArray(new FieldInfo[0]), addressUrl, topicName, serializationInfo);
}
- private static HiveSinkInfo createHiveSinkInfo(SourceResponse sourceResponse, HiveSinkResponse hiveInfo) {
+ /**
+ * Create Hive sink info.
+ */
+ private static HiveSinkInfo createHiveSinkInfo(boolean isAllMigration, HiveSinkResponse hiveInfo) {
if (hiveInfo.getJdbcUrl() == null) {
- throw new RuntimeException(String.format("hiveSink={} server url cannot be empty", hiveInfo));
+ throw new RuntimeException(String.format("HiveSink={%s} server url cannot be empty", hiveInfo));
}
if (CollectionUtils.isEmpty(hiveInfo.getFieldList())) {
- throw new RuntimeException(String.format("hiveSink={} fields cannot be empty", hiveInfo));
+ throw new RuntimeException(String.format("HiveSink={%s} fields cannot be empty", hiveInfo));
}
// Use the field separator in Hive, the default is TextFile
Character separator = (char) Integer.parseInt(hiveInfo.getDataSeparator());
@@ -198,9 +191,8 @@ public class SinkInfoUtils {
// Get the sink field, if there is no partition field in the source field, add the partition field to the end
List<FieldInfo> fieldInfoList = Lists.newArrayList();
- if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
- fieldInfoList.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
- BuiltInField.MYSQL_METADATA_DATA));
+ if (isAllMigration) {
+ fieldInfoList.add(SourceInfoUtils.getAllMigrationBuiltInField());
} else {
fieldInfoList = getSinkFields(hiveInfo.getFieldList(), hiveInfo.getPrimaryPartition());
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 59c0386..f850f94 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -17,25 +17,62 @@
package org.apache.inlong.manager.service.thirdparty.sort.util;
-import java.util.Locale;
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
+import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
+import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
-
-import java.util.List;
import org.apache.inlong.sort.protocol.source.SourceInfo;
import org.apache.inlong.sort.protocol.source.TDMQPulsarSourceInfo;
+import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+/**
+ * Utils for source info
+ */
public class SourceInfoUtils {
- public static boolean isBinlogMigrationSource(SourceResponse sourceResponse) {
+ /**
+ * Built in field map, key is field name, value is built in field name
+ */
+ public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new HashMap<>();
+
+ static {
+ BUILT_IN_FIELD_MAP.put("data_time", BuiltInField.DATA_TIME);
+ BUILT_IN_FIELD_MAP.put("database", BuiltInField.MYSQL_METADATA_DATABASE);
+ BUILT_IN_FIELD_MAP.put("table", BuiltInField.MYSQL_METADATA_TABLE);
+ BUILT_IN_FIELD_MAP.put("event_time", BuiltInField.MYSQL_METADATA_EVENT_TIME);
+ BUILT_IN_FIELD_MAP.put("is_ddl", BuiltInField.MYSQL_METADATA_IS_DDL);
+ BUILT_IN_FIELD_MAP.put("event_type", BuiltInField.MYSQL_METADATA_EVENT_TYPE);
+ }
+
+ /**
+ * Whether the source is all binlog migration.
+ */
+ public static boolean isBinlogAllMigration(SourceResponse sourceResponse) {
if (SourceType.BINLOG.getType().equalsIgnoreCase(sourceResponse.getSourceType())) {
BinlogSourceResponse binlogSourceResponse = (BinlogSourceResponse) sourceResponse;
return binlogSourceResponse.isAllMigration();
@@ -43,27 +80,111 @@ public class SourceInfoUtils {
return false;
}
- public static SourceInfo createPulsarSourceInfo(InlongGroupInfo groupInfo,
- String pulsarTopic,
- DeserializationInfo deserializationInfo,
- List<FieldInfo> fieldInfos,
- String appName,
- PulsarClusterInfo pulsarClusterInfo,
- String tenant) {
+ /**
+ * Get all migration built-in field for binlog source.
+ */
+ public static BuiltInFieldInfo getAllMigrationBuiltInField() {
+ return new BuiltInFieldInfo("data", StringFormatInfo.INSTANCE,
+ BuiltInField.MYSQL_METADATA_DATA);
+ }
+
+ /**
+ * Create source info for DataFlowInfo.
+ */
+ public static SourceInfo createSourceInfo(PulsarClusterInfo pulsarCluster, String masterAddress,
+ ClusterBean clusterBean, InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+ SourceResponse sourceResponse, SinkResponse sinkResponse) {
+
+ List<FieldInfo> fieldInfos = Lists.newArrayList();
+ boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
+ if (isAllMigration) {
+ fieldInfos.add(SourceInfoUtils.getAllMigrationBuiltInField());
+ } else {
+ if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
+ fieldInfos = getSourceFields(sinkResponse.getFieldList());
+ }
+ }
+
+ String middleWareType = groupInfo.getMiddlewareType();
+ DeserializationInfo deserializationInfo = SerializationUtils.createDeserializationInfo(sourceResponse,
+ streamInfo);
+ SourceInfo sourceInfo;
+ if (Constant.MIDDLEWARE_PULSAR.equals(middleWareType)) {
+ sourceInfo = createPulsarSourceInfo(pulsarCluster, clusterBean, groupInfo, streamInfo, deserializationInfo,
+ fieldInfos);
+ } else if (Constant.MIDDLEWARE_TUBE.equals(middleWareType)) {
+ // InlongGroupInfo groupInfo, String masterAddress,
+ sourceInfo = createTubeSourceInfo(groupInfo, masterAddress, clusterBean, deserializationInfo, fieldInfos);
+ } else {
+ throw new WorkflowListenerException(String.format("Unsupported middleware {%s}", middleWareType));
+ }
+
+ return sourceInfo;
+ }
+
+ /**
+ * Create source info for Pulsar
+ */
+ private static SourceInfo createPulsarSourceInfo(PulsarClusterInfo pulsarCluster, ClusterBean clusterBean,
+ InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+ DeserializationInfo deserializationInfo, List<FieldInfo> fieldInfos) {
+ String topicName = streamInfo.getMqResourceObj();
+ InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupInfo.getMqExtInfo();
+ String tenant = clusterBean.getDefaultTenant();
+ if (StringUtils.isNotEmpty(pulsarInfo.getTenant())) {
+ tenant = pulsarInfo.getTenant();
+ }
+
final String namespace = groupInfo.getMqResourceObj();
- // Full name of Topic in Pulsar
- final String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + pulsarTopic;
- final String consumerGroup = appName + "_" + pulsarTopic + "_consumer_group";
- String type = pulsarClusterInfo.getType();
+ // Full name of topic in Pulsar
+ final String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + topicName;
+ final String consumerGroup = clusterBean.getAppName() + "_" + topicName + "_consumer_group";
+ FieldInfo[] fieldInfosArr = fieldInfos.toArray(new FieldInfo[0]);
+ String type = pulsarCluster.getType();
if (StringUtils.isNotEmpty(type) && type.toUpperCase(Locale.ROOT).contains(Constant.MIDDLEWARE_TDMQ)) {
- return new TDMQPulsarSourceInfo(pulsarClusterInfo.getBrokerServiceUrl(),
- fullTopicName, consumerGroup, pulsarClusterInfo.getToken(), deserializationInfo,
- fieldInfos.toArray(new FieldInfo[0]));
+ return new TDMQPulsarSourceInfo(pulsarCluster.getBrokerServiceUrl(),
+ fullTopicName, consumerGroup, pulsarCluster.getToken(), deserializationInfo, fieldInfosArr);
} else {
- return new PulsarSourceInfo(pulsarClusterInfo.getAdminUrl(), pulsarClusterInfo.getBrokerServiceUrl(),
- fullTopicName, consumerGroup, deserializationInfo, fieldInfos.toArray(new FieldInfo[0]),
- pulsarClusterInfo.getToken());
+ return new PulsarSourceInfo(pulsarCluster.getAdminUrl(), pulsarCluster.getBrokerServiceUrl(),
+ fullTopicName, consumerGroup, deserializationInfo, fieldInfosArr, pulsarCluster.getToken());
}
+ }
+ /**
+ * Create source info TubeMQ
+ */
+ private static TubeSourceInfo createTubeSourceInfo(InlongGroupInfo groupInfo, String masterAddress,
+ ClusterBean clusterBean, DeserializationInfo deserializationInfo, List<FieldInfo> fieldInfos) {
+ Preconditions.checkNotNull(masterAddress, "tube cluster address cannot be empty");
+ String topic = groupInfo.getMqResourceObj();
+ String consumerGroup = clusterBean.getAppName() + "_" + topic + "_consumer_group";
+ return new TubeSourceInfo(topic, masterAddress, consumerGroup, deserializationInfo,
+ fieldInfos.toArray(new FieldInfo[0]));
}
+
+ /**
+ * Get source field list.
+ *
+ * TODO 1. Support partition field, 2. Add is_metadata field in StreamSinkFieldEntity
+ */
+ private static List<FieldInfo> getSourceFields(List<SinkFieldResponse> fieldList) {
+ List<FieldInfo> fieldInfoList = new ArrayList<>();
+ for (SinkFieldResponse field : fieldList) {
+ FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getSourceFieldType().toLowerCase());
+ String fieldName = field.getSourceFieldName();
+
+ FieldInfo fieldInfo;
+ // If the field name equals to build-in field, new a build-in field info
+ BuiltInField builtInField = SourceInfoUtils.BUILT_IN_FIELD_MAP.get(fieldName);
+ if (builtInField == null) {
+ fieldInfo = new FieldInfo(fieldName, formatInfo);
+ } else {
+ fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
+ }
+ fieldInfoList.add(fieldInfo);
+ }
+
+ return fieldInfoList;
+ }
+
}