You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/14 09:45:53 UTC
[incubator-inlong] branch master updated: [INLONG-3683][Manager] Add AUTO_PUSH source stream in Manager (#3694)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fba0bfd02 [INLONG-3683][Manager] Add AUTO_PUSH source stream in Manager (#3694)
fba0bfd02 is described below
commit fba0bfd02a6587793b0929d641dc07ce2f648c23
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Apr 14 17:45:48 2022 +0800
[INLONG-3683][Manager] Add AUTO_PUSH source stream in Manager (#3694)
---
.../manager/client/api/source/AutoPushSource.java | 49 ++++++++++++
.../manager/client/api/util/InlongParser.java | 12 +++
.../api/util/InlongStreamSourceTransfer.java | 44 +++++++++++
.../inlong/manager/common/enums/SourceType.java | 2 +
.../pojo/source/autopush/AutoPushSourceDTO.java | 61 +++++++++++++++
.../autopush/AutoPushSourceListResponse.java | 34 +++++++++
.../source/autopush/AutoPushSourceRequest.java | 47 ++++++++++++
.../source/autopush/AutoPushSourceResponse.java | 44 +++++++++++
.../manager/service/CommonOperateService.java | 1 -
.../source/autopush/AutoPushSourceOperation.java | 88 ++++++++++++++++++++++
.../thirdparty/sort/util/SerializationUtils.java | 22 ++++++
11 files changed, 403 insertions(+), 1 deletion(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java
new file mode 100644
index 000000000..b1c016ec0
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.source;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.common.enums.SourceType;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel("Base configuration for DataProxy SDK collection")
+public class AutoPushSource extends StreamSource {
+
+ @ApiModelProperty(value = "DataSource type", required = true)
+ private SourceType sourceType = SourceType.AUTO_PUSH;
+
+ @ApiModelProperty("SyncType for Kafka")
+ private SyncType syncType = SyncType.INCREMENT;
+
+ @ApiModelProperty("Data format type for kafka")
+ private DataFormat dataFormat = DataFormat.NONE;
+
+ @ApiModelProperty(value = "DataProxy group name, "
+ + "the name used for local configuration when the user enables local configuration")
+ private String dataProxyGroup;
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index af45811cd..3fb14328c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -39,6 +39,9 @@ import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
import org.apache.inlong.manager.common.pojo.source.file.FileSourceListResponse;
@@ -140,6 +143,11 @@ public class InlongParser {
FileSourceResponse.class);
sourceResponses.add(fileSourceResponse);
break;
+ case AUTO_PUSH:
+ AutoPushSourceResponse autoPushSourceResponse = GsonUtil.fromJson(sourceJson.toString(),
+ AutoPushSourceRequest.class);
+ sourceResponses.add(autoPushSourceResponse);
+ break;
default:
throw new RuntimeException(String.format("Unsupport sourceType=%s for Inlong", sourceType));
}
@@ -204,6 +212,10 @@ public class InlongParser {
return GsonUtil.fromJson(pageInfoJson,
new TypeToken<PageInfo<FileSourceListResponse>>() {
}.getType());
+ case AUTO_PUSH:
+ return GsonUtil.fromJson(pageInfoJson,
+ new TypeToken<PageInfo<AutoPushSourceListResponse>>() {
+ }.getType());
default:
throw new IllegalArgumentException(
String.format("Unsupported sourceType=%s for Inlong", sourceType));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index ef2e4e564..2977df9ae 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -27,12 +27,16 @@ import org.apache.inlong.manager.client.api.StreamSource.State;
import org.apache.inlong.manager.client.api.StreamSource.SyncType;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.source.AgentFileSource;
+import org.apache.inlong.manager.client.api.source.AutoPushSource;
import org.apache.inlong.manager.client.api.source.KafkaSource;
import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
@@ -60,6 +64,8 @@ public class InlongStreamSourceTransfer {
return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
case FILE:
return createFileSourceRequest((AgentFileSource) streamSource, streamInfo);
+ case AUTO_PUSH:
+ return createAutoPushSourceRequest((AutoPushSource) streamSource, streamInfo);
default:
throw new RuntimeException(String.format("Unsupported source=%s for Inlong", sourceType));
}
@@ -77,6 +83,9 @@ public class InlongStreamSourceTransfer {
if (sourceType == SourceType.FILE && sourceResponse instanceof FileSourceResponse) {
return parseAgentFileSource((FileSourceResponse) sourceResponse);
}
+ if (sourceType == SourceType.AUTO_PUSH && sourceResponse instanceof AutoPushSourceResponse) {
+ return parseAutoPushSource((AutoPushSourceResponse) sourceResponse);
+ }
throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
}
@@ -92,6 +101,9 @@ public class InlongStreamSourceTransfer {
if (sourceType == SourceType.FILE && sourceListResponse instanceof FileSourceListResponse) {
return parseAgentFileSource((FileSourceListResponse) sourceListResponse);
}
+ if (sourceType == SourceType.AUTO_PUSH && sourceListResponse instanceof AutoPushSourceListResponse) {
+ return parseAutoPushSource((AutoPushSourceListResponse) sourceListResponse);
+ }
throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
}
@@ -210,6 +222,24 @@ public class InlongStreamSourceTransfer {
return fileSource;
}
+ private static AutoPushSource parseAutoPushSource(AutoPushSourceResponse response) {
+ AutoPushSource autoPushSource = new AutoPushSource();
+ autoPushSource.setSourceName(response.getSourceName());
+ autoPushSource.setState(State.parseByStatus(response.getStatus()));
+ autoPushSource.setDataFormat(DataFormat.NONE);
+ autoPushSource.setDataProxyGroup(response.getDataProxyGroup());
+ return autoPushSource;
+ }
+
+ private static AutoPushSource parseAutoPushSource(AutoPushSourceListResponse response) {
+ AutoPushSource autoPushSource = new AutoPushSource();
+ autoPushSource.setSourceName(response.getSourceName());
+ autoPushSource.setState(State.parseByStatus(response.getStatus()));
+ autoPushSource.setDataFormat(DataFormat.NONE);
+ autoPushSource.setDataProxyGroup(response.getDataProxyGroup());
+ return autoPushSource;
+ }
+
private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo stream) {
KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
sourceRequest.setSourceName(kafkaSource.getSourceName());
@@ -280,4 +310,18 @@ public class InlongStreamSourceTransfer {
sourceRequest.setTimeOffset(fileSource.getTimeOffset());
return sourceRequest;
}
+
+ private static AutoPushSourceRequest createAutoPushSourceRequest(AutoPushSource source,
+ InlongStreamInfo streamInfo) {
+ AutoPushSourceRequest sourceRequest = new AutoPushSourceRequest();
+ sourceRequest.setSourceName(source.getSourceName());
+ if (StringUtils.isEmpty(sourceRequest.getSourceName())) {
+ sourceRequest.setSourceName(streamInfo.getName());
+ }
+ sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+ sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+ sourceRequest.setSourceType(source.getSourceType().getType());
+ sourceRequest.setDataProxyGroup(source.getDataProxyGroup());
+ return sourceRequest;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 42bfd9b91..9d6415ca9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -33,10 +33,12 @@ public enum SourceType {
BINLOG("BINLOG", TaskTypeEnum.BINLOG),
KAFKA("KAFKA", TaskTypeEnum.KAFKA);
+ public static final String SOURCE_AUTO_PUSH = "AUTO_PUSH";
public static final String SOURCE_FILE = "FILE";
public static final String SOURCE_SQL = "SQL";
public static final String SOURCE_BINLOG = "BINLOG";
public static final String SOURCE_KAFKA = "KAFKA";
+
public static final String SOURCE_TYPE_IS_EMPTY = "Source type is empty";
public static final String SOURCE_TYPE_NOT_SAME = "Expected source type is %s, but found %s";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
new file mode 100644
index 000000000..af60f86d1
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.autopush;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * SDK source info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class AutoPushSourceDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @ApiModelProperty(value = "DataProxy group name, "
+ + "the name used for local configuration when the user enables local configuration")
+ private String dataProxyGroup;
+
+ public static AutoPushSourceDTO getFromRequest(AutoPushSourceRequest request) {
+ return AutoPushSourceDTO.builder()
+ .dataProxyGroup(request.getDataProxyGroup())
+ .build();
+ }
+
+ public static AutoPushSourceDTO getFromJson(@NotNull String extParams) {
+ try {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return OBJECT_MAPPER.readValue(extParams, AutoPushSourceDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java
new file mode 100644
index 000000000..e4746c603
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.autopush;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of DataProxy SDK source paging list")
+public class AutoPushSourceListResponse extends SourceListResponse {
+
+ @ApiModelProperty(value = "DataProxy group name, "
+ + "the name used for local configuration when the user enables local configuration")
+ private String dataProxyGroup;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceRequest.java
new file mode 100644
index 000000000..07b580c13
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.autopush;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of DataProxy SDK source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the binlog source info")
+@JsonTypeDefine(value = SourceType.SOURCE_AUTO_PUSH)
+public class AutoPushSourceRequest extends SourceRequest {
+
+ @ApiModelProperty(value = "DataProxy group name, "
+ + "the name used for local configuration when the user enables local configuration")
+ private String dataProxyGroup;
+
+ public AutoPushSourceRequest() {
+ this.setSourceType(SourceType.AUTO_PUSH.toString());
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceResponse.java
new file mode 100644
index 000000000..43760e2dd
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceResponse.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.autopush;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+
+/**
+ * Response for DataProxy SDK source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Response of DataProxy SDK source")
+public class AutoPushSourceResponse extends SourceResponse {
+
+ public AutoPushSourceResponse() {
+ this.setSourceType(SourceType.AUTO_PUSH.name());
+ }
+
+ @ApiModelProperty(value = "DataProxy group name, "
+ + "the name used for local configuration when the user enables local configuration")
+ private String dataProxyGroup;
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index efc54fa83..e446d7db7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -203,7 +203,6 @@ public class CommonOperateService {
public DataFlowInfo createDataFlow(InlongGroupInfo groupInfo, SinkResponse sinkResponse) {
String groupId = sinkResponse.getInlongGroupId();
String streamId = sinkResponse.getInlongStreamId();
- // TODO Support all source type, include AUTO_PUSH.
List<SourceResponse> sourceList = streamSourceService.listSource(groupId, streamId);
if (CollectionUtils.isEmpty(sourceList)) {
throw new WorkflowListenerException(String.format("Source not found by groupId=%s and streamId=%s",
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java
new file mode 100644
index 000000000..0726d9f8d
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.autopush;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.function.Supplier;
+
+/**
+ * DataProxy SDK source operation
+ */
+@Service
+public class AutoPushSourceOperation extends AbstractSourceOperation {
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+ AutoPushSourceRequest sourceRequest = (AutoPushSourceRequest) request;
+ CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+ try {
+ AutoPushSourceDTO dto = AutoPushSourceDTO.getFromRequest(sourceRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+ @Override
+ protected String getSourceType() {
+ return SourceType.AUTO_PUSH.getType();
+ }
+
+ @Override
+ protected SourceResponse getResponse() {
+ return new AutoPushSourceResponse();
+ }
+
+ @Override
+ public Boolean accept(SourceType sourceType) {
+ return SourceType.AUTO_PUSH == sourceType;
+ }
+
+ @Override
+ public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
+ T result = target.get();
+ if (entity == null) {
+ return result;
+ }
+ String existType = entity.getSourceType();
+ Preconditions.checkTrue(getSourceType().equals(existType),
+ String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+ AutoPushSourceDTO dto = AutoPushSourceDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(entity, result, true);
+ CommonBeanUtils.copyProperties(dto, result, true);
+ return result;
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index 1c48e479b..e845c3c71 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -57,6 +57,8 @@ public class SerializationUtils {
return deserializeForKafka((KafkaSourceResponse) sourceResponse, streamInfo);
case FILE:
return deserializeForFile(streamInfo);
+ case AUTO_PUSH:
+ return deserializeForAutoPush(streamInfo);
default:
throw new IllegalArgumentException(String.format("Unsupported sourceType: %s", sourceType));
}
@@ -151,4 +153,24 @@ public class SerializationUtils {
String.format("Unsupported data type for File source: %s", dataType));
}
}
+
+ /**
+ * Get deserialization info for DataProxy SDK source
+ */
+ private static DeserializationInfo deserializeForAutoPush(InlongStreamInfo streamInfo) {
+ String dataType = streamInfo.getDataType();
+ DataTypeEnum typeEnum = DataTypeEnum.forName(dataType);
+ switch (typeEnum) {
+ case CSV:
+ char separator = streamInfo.getDataSeparator().toCharArray()[0];
+ return new CsvDeserializationInfo(separator);
+ case AVRO:
+ return new AvroDeserializationInfo();
+ case JSON:
+ return new JsonDeserializationInfo();
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported data type for DataProxy SDK source: %s", dataType));
+ }
+ }
}