You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/27 02:34:12 UTC
[incubator-inlong] branch master updated: [INLONG-2734][Manager] Support multi serialization type for Sort in Manager (#2750)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4d3dc50 [INLONG-2734][Manager] Support multi serialization type for Sort in Manager (#2750)
4d3dc50 is described below
commit 4d3dc505d6c246a3c8a3b4951cb58fbe683eb563
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sun Feb 27 10:34:07 2022 +0800
[INLONG-2734][Manager] Support multi serialization type for Sort in Manager (#2750)
---
.../inlong/manager/common/enums/SinkType.java | 2 +-
.../inlong/manager/common/enums/SourceType.java | 4 +-
.../manager/common/pojo/source/SourceResponse.java | 2 +
.../pojo/source/binlog/BinlogSourceRequest.java | 2 +
.../pojo/source/binlog/BinlogSourceResponse.java | 3 +
.../common/pojo/source/kafka/KafkaSourceDTO.java | 4 +
.../service/sink/StreamSinkServiceImpl.java | 8 +-
.../service/source/StreamSourceServiceImpl.java | 8 +-
.../thirdparty/sort/CreateSortConfigListener.java | 40 +++---
.../thirdparty/sort/PushSortConfigListener.java | 3 +
.../thirdparty/sort/utils/SerializationUtils.java | 138 +++++++++++++++++++++
.../thirdparty/sort/{ => utils}/SinkInfoUtils.java | 30 ++++-
.../sort/{ => utils}/SortFieldFormatUtils.java | 2 +-
.../sort/{ => utils}/SourceInfoUtils.java | 2 +-
14 files changed, 219 insertions(+), 29 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index f0335e7..19a547b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -26,7 +26,7 @@ public enum SinkType {
/**
* Get the SinkType enum via the given sinkType string
*/
- public static SinkType getType(String sinkType) {
+ public static SinkType forType(String sinkType) {
for (SinkType type : values()) {
if (type.name().equals(sinkType)) {
return type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index f224b49..1b4f0b4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -35,7 +35,7 @@ public enum SourceType {
/**
* Get the SourceType enum via the given sourceType string
*/
- public static SourceType getType(String sourceType) {
+ public static SourceType forType(String sourceType) {
for (SourceType type : values()) {
if (type.name().equals(sourceType)) {
return type;
@@ -44,7 +44,7 @@ public enum SourceType {
throw new IllegalArgumentException(String.format("Illegal sink type for %s", sourceType));
}
- public String getType() {
+ public String forType() {
return this.type;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
index 4d90669..d596c1e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
@@ -81,4 +81,6 @@ public class SourceResponse {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date modifyTime;
+ @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+ private String serializationType;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index 37efa92..0d352a4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -77,5 +77,7 @@ public class BinlogSourceRequest extends SourceRequest {
@ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
private String snapshotMode;
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ private String timestampFormatStandard = "SQL";
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index ae01fb3..a6e2fa5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -63,4 +63,7 @@ public class BinlogSourceResponse extends SourceResponse {
@ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
private String snapshotMode;
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ private String timestampFormatStandard = "SQL";
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index f793223..dffdda0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -61,6 +61,9 @@ public class KafkaSourceDTO {
notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
private String topicPartitionOffset;
+ @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+ private String serializationType;
+
/**
* Get the dto instance from the request
*/
@@ -72,6 +75,7 @@ public class KafkaSourceDTO {
.recordSpeedLimit(request.getRecordSpeedLimit())
.byteSpeedLimit(request.getByteSpeedLimit())
.topicPartitionOffset(request.getTopicPartitionOffset())
+ .serializationType(request.getSerializationType())
.build();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 17fbc61..5f8f2bf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -107,7 +107,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
Preconditions.checkEmpty(sinkExist, ErrorCodeEnum.SINK_ALREADY_EXISTS.getMessage());
// According to the sink type, save sink information
- StreamSinkOperation operation = operationFactory.getInstance(SinkType.getType(sinkType));
+ StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
int id = operation.saveOpt(request, operator);
// If the inlong group status is [Configuration Successful], then asynchronously initiate
@@ -123,7 +123,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
public SinkResponse get(Integer id, String sinkType) {
LOGGER.debug("begin to get sink by id={}, sinkType={}", id, sinkType);
- StreamSinkOperation operation = operationFactory.getInstance(SinkType.getType(sinkType));
+ StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
SinkResponse sinkResponse = operation.getById(sinkType, id);
LOGGER.debug("success to get sink info");
return sinkResponse;
@@ -178,7 +178,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
Page<StreamSinkEntity> entityPage = (Page<StreamSinkEntity>) sinkMapper.selectByCondition(request);
// Encapsulate the paging query results into the PageInfo object to obtain related paging information
- StreamSinkOperation operation = operationFactory.getInstance(SinkType.getType(sinkType));
+ StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
PageInfo<? extends SinkListResponse> pageInfo = operation.getPageInfo(entityPage);
pageInfo.setTotal(entityPage.getTotal());
@@ -200,7 +200,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
String streamId = request.getInlongStreamId();
String sinkType = request.getSinkType();
- StreamSinkOperation operation = operationFactory.getInstance(SinkType.getType(sinkType));
+ StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
operation.updateOpt(request, operator);
// The inlong group status is [Configuration successful], then asynchronously initiate
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index cea9fd8..18e837e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -82,7 +82,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
Preconditions.checkEmpty(sourceExist, ErrorCodeEnum.SOURCE_ALREADY_EXISTS.getMessage());
// According to the source type, save source information
- StreamSourceOperation operation = operationFactory.getInstance(SourceType.getType(sourceType));
+ StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
int id = operation.saveOpt(request, operator);
LOGGER.info("success to save source info");
@@ -92,7 +92,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Override
public SourceResponse get(Integer id, String sourceType) {
LOGGER.debug("begin to get source by id={}, sourceType={}", id, sourceType);
- StreamSourceOperation operation = operationFactory.getInstance(SourceType.getType(sourceType));
+ StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
SourceResponse sourceResponse = operation.getById(id);
LOGGER.debug("success to get source info");
return sourceResponse;
@@ -134,7 +134,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
Page<StreamSourceEntity> entityPage = (Page<StreamSourceEntity>) sourceMapper.selectByCondition(request);
// Encapsulate the paging query results into the PageInfo object to obtain related paging information
- StreamSourceOperation operation = operationFactory.getInstance(SourceType.getType(sourceType));
+ StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
PageInfo<? extends SourceListResponse> pageInfo = operation.getPageInfo(entityPage);
pageInfo.setTotal(entityPage.getTotal());
@@ -154,7 +154,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
commonOperateService.checkGroupStatus(groupId, operator);
String sourceType = request.getSourceType();
- StreamSourceOperation operation = operationFactory.getInstance(SourceType.getType(sourceType));
+ StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
operation.updateOpt(request, operator);
LOGGER.info("success to update source info");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index cbea893..c555924 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -39,6 +40,11 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SerializationUtils;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SinkInfoUtils;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SortFieldFormatUtils;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SourceInfoUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -47,7 +53,6 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
import org.apache.inlong.sort.protocol.sink.SinkInfo;
import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
import org.apache.inlong.sort.protocol.source.SourceInfo;
@@ -71,6 +76,8 @@ public class CreateSortConfigListener implements SortOperateListener {
private InlongStreamService inlongStreamService;
@Autowired
private StreamSinkService streamSinkService;
+ @Autowired
+ private StreamSourceService streamSourceService;
@Override
public TaskEvent event() {
@@ -119,29 +126,32 @@ public class CreateSortConfigListener implements SortOperateListener {
private DataFlowInfo createDataFlow(StreamBriefResponse streamBriefResponse,
InlongGroupRequest inlongGroupRequest) {
+ //TODO only support one source and one sink
+ final String groupId = streamBriefResponse.getInlongGroupId();
+ final String streamId = streamBriefResponse.getInlongStreamId();
+ final InlongStreamInfo streamInfo = inlongStreamService.get(groupId, streamId);
+ List<SourceResponse> sourceResponses = streamSourceService.listSource(groupId, streamId);
+ if (CollectionUtils.isEmpty(sourceResponses)) {
+ throw new RuntimeException(String.format("No source found by stream=%s", streamBriefResponse));
+ }
+ final SourceResponse sourceResponse = sourceResponses.get(0);
List<SinkBriefResponse> sinkBriefResponses = streamBriefResponse.getSinkList();
if (CollectionUtils.isEmpty(sinkBriefResponses)) {
throw new RuntimeException(String.format("No sink found by stream=%s", streamBriefResponse));
}
- SinkBriefResponse sinkBriefResponse = sinkBriefResponses.get(0);
+ final SinkBriefResponse sinkBriefResponse = sinkBriefResponses.get(0);
String sinkType = sinkBriefResponse.getSinkType();
int sinkId = sinkBriefResponse.getId();
- SinkResponse sinkResponse = streamSinkService.get(sinkId, sinkType);
- SourceInfo sourceInfo = createSourceInfo(inlongGroupRequest, sinkResponse);
- SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sinkResponse);
+ final SinkResponse sinkResponse = streamSinkService.get(sinkId, sinkType);
+ SourceInfo sourceInfo = createSourceInfo(inlongGroupRequest, streamInfo, sourceResponse);
+ SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(streamInfo, sourceResponse, sinkResponse);
return new DataFlowInfo(sinkId, sourceInfo, sinkInfo);
}
- private SourceInfo createSourceInfo(InlongGroupRequest groupRequest, SinkResponse sinkResponse) {
+ private SourceInfo createSourceInfo(InlongGroupRequest groupRequest, InlongStreamInfo streamInfo,
+ SourceResponse sourceResponse) {
String middleWareType = groupRequest.getMiddlewareType();
- String groupId = sinkResponse.getInlongGroupId();
- String streamId = sinkResponse.getInlongStreamId();
- InlongStreamInfo streamInfo = inlongStreamService.get(groupId, streamId);
- DeserializationInfo deserializationInfo = null;
- if (StringUtils.isNotEmpty(streamInfo.getDataSeparator())) {
- char separator = (char) Integer.parseInt(streamInfo.getDataSeparator());
- deserializationInfo = new InLongMsgCsvDeserializationInfo(streamId, separator);
- }
+
List<FieldInfo> fieldInfos = Lists.newArrayList();
if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
fieldInfos = streamInfo.getFieldList().stream().map(inlongStreamFieldInfo -> {
@@ -150,6 +160,8 @@ public class CreateSortConfigListener implements SortOperateListener {
return new FieldInfo(inlongStreamFieldInfo.getFieldName(), formatInfo);
}).collect(Collectors.toList());
}
+ DeserializationInfo deserializationInfo = SerializationUtils.createDeserializationInfo(sourceResponse,
+ streamInfo);
if (Constant.MIDDLEWARE_PULSAR.equals(middleWareType)) {
return createPulsarSourceInfo(groupRequest, streamInfo, deserializationInfo, fieldInfos);
} else if (Constant.MIDDLEWARE_TUBE.equals(middleWareType)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
index 8b2890b..bd49771 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
@@ -35,6 +35,9 @@ import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SinkInfoUtils;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SortFieldFormatUtils;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SourceInfoUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SerializationUtils.java
new file mode 100644
index 0000000..6c632f9
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SerializationUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdparty.sort.utils;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.sort.protocol.deserialization.AvroDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.JsonDeserializationInfo;
+import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.DebeziumSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
+import org.springframework.util.Assert;
+
+public class SerializationUtils {
+
+ public static DeserializationInfo createDeserializationInfo(SourceResponse sourceResponse,
+ InlongStreamInfo streamInfo) {
+ SourceType sourceType = SourceType.forType(sourceResponse.getSourceType());
+ switch (sourceType) {
+ case DB_BINLOG:
+ return forBinlog((BinlogSourceResponse) sourceResponse, streamInfo);
+ case KAFKA:
+ return forKafka((KafkaSourceResponse) sourceResponse, streamInfo);
+ case FILE:
+ return forFile(sourceResponse, streamInfo);
+ default:
+ throw new IllegalArgumentException(String.format("Unsupport sourceType for Inlong:%s", sourceType));
+ }
+ }
+
+ public static SerializationInfo createSerializationInfo(SourceResponse sourceResponse, SinkResponse sinkResponse,
+ InlongStreamInfo inlongStreamInfo) {
+ SinkType sinkType = SinkType.forType(sinkResponse.getSinkType());
+ switch (sinkType) {
+ case HIVE:
+ return null;
+ case KAFKA:
+ return forKafka(sourceResponse, (KafkaSinkResponse) sinkResponse, inlongStreamInfo);
+ default:
+ throw new IllegalArgumentException(String.format("Unsupport sinkType for Inlong:%s", sinkType));
+ }
+ }
+
+ public static DeserializationInfo forBinlog(BinlogSourceResponse binlogSourceResponse,
+ InlongStreamInfo streamInfo) {
+ return new DebeziumDeserializationInfo(true, binlogSourceResponse.getTimestampFormatStandard());
+ }
+
+ public static DeserializationInfo forKafka(KafkaSourceResponse kafkaSourceResponse,
+ InlongStreamInfo streamInfo) {
+ String serializationType = kafkaSourceResponse.getSerializationType();
+ DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
+ switch (dataType) {
+ case CSV:
+ char seperator = streamInfo.getDataSeparator().toCharArray()[0];
+ return new CsvDeserializationInfo(seperator);
+ case AVRO:
+ return new AvroDeserializationInfo();
+ case JSON:
+ return new JsonDeserializationInfo();
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupport serializationType for Kafka source:%s", serializationType));
+ }
+ }
+
+ public static SerializationInfo forKafka(SourceResponse sourceResponse, KafkaSinkResponse kafkaSinkResponse,
+ InlongStreamInfo streamInfo) {
+ String serializationType = kafkaSinkResponse.getSerializationType();
+ DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
+ switch (dataType) {
+ case AVRO:
+ return new AvroSerializationInfo();
+ case JSON:
+ return new JsonSerializationInfo();
+ case CANAL:
+ Assert.isInstanceOf(BinlogSourceResponse.class, sourceResponse,
+ "Unsupport serializationType for Kafka;");
+ BinlogSourceResponse binlogSourceResponse = (BinlogSourceResponse) sourceResponse;
+ return new CanalSerializationInfo(binlogSourceResponse.getTimestampFormatStandard(),
+ "FAIL", "", false);
+ case DEBEZIUM_JSON:
+ Assert.isInstanceOf(BinlogSourceResponse.class, sourceResponse,
+ "Unsupport serializationType for Kafka;");
+ binlogSourceResponse = (BinlogSourceResponse) sourceResponse;
+ return new DebeziumSerializationInfo(binlogSourceResponse.getTimestampFormatStandard(),
+ "FAIL", "", false);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupport serializationType for Kafka sink:%s", serializationType));
+ }
+ }
+
+ public static DeserializationInfo forFile(SourceResponse sourceResponse,
+ InlongStreamInfo streamInfo) {
+ String serializationType = sourceResponse.getSerializationType();
+ DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
+ switch (dataType) {
+ case CSV:
+ char seperator = streamInfo.getDataSeparator().toCharArray()[0];
+ return new CsvDeserializationInfo(seperator);
+ case AVRO:
+ return new AvroDeserializationInfo();
+ case JSON:
+ return new JsonDeserializationInfo();
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupport type for File source:%s", serializationType));
+ }
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SinkInfoUtils.java
similarity index 81%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SinkInfoUtils.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SinkInfoUtils.java
index fcd650a..3db4900 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SinkInfoUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.thirdparty.sort;
+package org.apache.inlong.manager.service.thirdparty.sort.utils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -29,13 +29,18 @@ import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFileFormat;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveTimePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
import org.apache.inlong.sort.protocol.sink.SinkInfo;
public class SinkInfoUtils {
@@ -55,9 +60,20 @@ public class SinkInfoUtils {
}
public static SinkInfo createSinkInfo(SinkResponse sinkResponse) {
+ return createSinkInfo(null, null, sinkResponse);
+ }
+
+ public static SinkInfo createSinkInfo(SourceResponse sourceResponse, SinkResponse sinkResponse) {
+ return createSinkInfo(null, sourceResponse, sinkResponse);
+ }
+
+ public static SinkInfo createSinkInfo(InlongStreamInfo inlongStreamInfo, SourceResponse sourceResponse,
+ SinkResponse sinkResponse) {
String sinkType = sinkResponse.getSinkType();
- if (SinkType.getType(sinkType) == SinkType.HIVE) {
+ if (SinkType.forType(sinkType) == SinkType.HIVE) {
return createHiveSinkInfo((HiveSinkResponse) sinkResponse);
+ } else if (SinkType.forType(sinkType) == SinkType.KAFKA) {
+ return createKafkaSinkInfo(inlongStreamInfo, sourceResponse, (KafkaSinkResponse) sinkResponse);
} else {
//todo clickhouse and iceberg is wait to support
throw new RuntimeException(
@@ -65,6 +81,16 @@ public class SinkInfoUtils {
}
}
+ private static KafkaSinkInfo createKafkaSinkInfo(InlongStreamInfo inlongStreamInfo, SourceResponse sourceResponse,
+ KafkaSinkResponse kafkaSinkResponse) {
+ List<FieldInfo> fieldInfoList = getSinkFields(kafkaSinkResponse.getFieldList(), null);
+ String addressUrl = kafkaSinkResponse.getAddress();
+ String topicName = kafkaSinkResponse.getTopicName();
+ SerializationInfo serializationInfo = SerializationUtils.createSerializationInfo(sourceResponse,
+ kafkaSinkResponse, inlongStreamInfo);
+ return new KafkaSinkInfo(fieldInfoList.toArray(new FieldInfo[0]), addressUrl, topicName, serializationInfo);
+ }
+
private static HiveSinkInfo createHiveSinkInfo(HiveSinkResponse hiveInfo) {
if (hiveInfo.getJdbcUrl() == null) {
throw new RuntimeException(String.format("hiveSink={} server url cannot be empty", hiveInfo));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SortFieldFormatUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SortFieldFormatUtils.java
similarity index 98%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SortFieldFormatUtils.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SortFieldFormatUtils.java
index 0e6be39..43cf09b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SortFieldFormatUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SortFieldFormatUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.thirdparty.sort;
+package org.apache.inlong.manager.service.thirdparty.sort.utils;
import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SourceInfoUtils.java
similarity index 98%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SourceInfoUtils.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SourceInfoUtils.java
index e7ff4a9..66bfb86 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SourceInfoUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.thirdparty.sort;
+package org.apache.inlong.manager.service.thirdparty.sort.utils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;