You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/14 09:45:53 UTC

[incubator-inlong] branch master updated: [INLONG-3683][Manager] Add AUTO_PUSH source stream in Manager (#3694)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fba0bfd02 [INLONG-3683][Manager] Add AUTO_PUSH source stream in Manager (#3694)
fba0bfd02 is described below

commit fba0bfd02a6587793b0929d641dc07ce2f648c23
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Apr 14 17:45:48 2022 +0800

    [INLONG-3683][Manager] Add AUTO_PUSH source stream in Manager (#3694)
---
 .../manager/client/api/source/AutoPushSource.java  | 49 ++++++++++++
 .../manager/client/api/util/InlongParser.java      | 12 +++
 .../api/util/InlongStreamSourceTransfer.java       | 44 +++++++++++
 .../inlong/manager/common/enums/SourceType.java    |  2 +
 .../pojo/source/autopush/AutoPushSourceDTO.java    | 61 +++++++++++++++
 .../autopush/AutoPushSourceListResponse.java       | 34 +++++++++
 .../source/autopush/AutoPushSourceRequest.java     | 47 ++++++++++++
 .../source/autopush/AutoPushSourceResponse.java    | 44 +++++++++++
 .../manager/service/CommonOperateService.java      |  1 -
 .../source/autopush/AutoPushSourceOperation.java   | 88 ++++++++++++++++++++++
 .../thirdparty/sort/util/SerializationUtils.java   | 22 ++++++
 11 files changed, 403 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java
new file mode 100644
index 000000000..b1c016ec0
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AutoPushSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.source;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.common.enums.SourceType;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel("Base configuration for DataProxy SDK collection")
+public class AutoPushSource extends StreamSource {
+
+    @ApiModelProperty(value = "DataSource type", required = true)
+    private SourceType sourceType = SourceType.AUTO_PUSH;
+
+    @ApiModelProperty("SyncType for Kafka")
+    private SyncType syncType = SyncType.INCREMENT;
+
+    @ApiModelProperty("Data format type for kafka")
+    private DataFormat dataFormat = DataFormat.NONE;
+
+    @ApiModelProperty(value = "DataProxy group name, "
+            + "the name used for local configuration when the user enables local configuration")
+    private String dataProxyGroup;
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index af45811cd..3fb14328c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -39,6 +39,9 @@ import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.file.FileSourceListResponse;
@@ -140,6 +143,11 @@ public class InlongParser {
                                 FileSourceResponse.class);
                         sourceResponses.add(fileSourceResponse);
                         break;
+                    case AUTO_PUSH:
+                        AutoPushSourceResponse autoPushSourceResponse = GsonUtil.fromJson(sourceJson.toString(),
+                                AutoPushSourceRequest.class);
+                        sourceResponses.add(autoPushSourceResponse);
+                        break;
                     default:
                         throw new RuntimeException(String.format("Unsupport sourceType=%s for Inlong", sourceType));
                 }
@@ -204,6 +212,10 @@ public class InlongParser {
                     return GsonUtil.fromJson(pageInfoJson,
                             new TypeToken<PageInfo<FileSourceListResponse>>() {
                             }.getType());
+                case AUTO_PUSH:
+                    return GsonUtil.fromJson(pageInfoJson,
+                            new TypeToken<PageInfo<AutoPushSourceListResponse>>() {
+                            }.getType());
                 default:
                     throw new IllegalArgumentException(
                             String.format("Unsupported sourceType=%s for Inlong", sourceType));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index ef2e4e564..2977df9ae 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -27,12 +27,16 @@ import org.apache.inlong.manager.client.api.StreamSource.State;
 import org.apache.inlong.manager.client.api.StreamSource.SyncType;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.client.api.source.AgentFileSource;
+import org.apache.inlong.manager.client.api.source.AutoPushSource;
 import org.apache.inlong.manager.client.api.source.KafkaSource;
 import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
@@ -60,6 +64,8 @@ public class InlongStreamSourceTransfer {
                 return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
             case FILE:
                 return createFileSourceRequest((AgentFileSource) streamSource, streamInfo);
+            case AUTO_PUSH:
+                return createAutoPushSourceRequest((AutoPushSource) streamSource, streamInfo);
             default:
                 throw new RuntimeException(String.format("Unsupported source=%s for Inlong", sourceType));
         }
@@ -77,6 +83,9 @@ public class InlongStreamSourceTransfer {
         if (sourceType == SourceType.FILE && sourceResponse instanceof FileSourceResponse) {
             return parseAgentFileSource((FileSourceResponse) sourceResponse);
         }
+        if (sourceType == SourceType.AUTO_PUSH && sourceResponse instanceof AutoPushSourceResponse) {
+            return parseAutoPushSource((AutoPushSourceResponse) sourceResponse);
+        }
         throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
     }
 
@@ -92,6 +101,9 @@ public class InlongStreamSourceTransfer {
         if (sourceType == SourceType.FILE && sourceListResponse instanceof FileSourceListResponse) {
             return parseAgentFileSource((FileSourceListResponse) sourceListResponse);
         }
+        if (sourceType == SourceType.AUTO_PUSH && sourceListResponse instanceof AutoPushSourceListResponse) {
+            return parseAutoPushSource((AutoPushSourceListResponse) sourceListResponse);
+        }
         throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
     }
 
@@ -210,6 +222,24 @@ public class InlongStreamSourceTransfer {
         return fileSource;
     }
 
+    private static AutoPushSource parseAutoPushSource(AutoPushSourceResponse response) {
+        AutoPushSource autoPushSource = new AutoPushSource();
+        autoPushSource.setSourceName(response.getSourceName());
+        autoPushSource.setState(State.parseByStatus(response.getStatus()));
+        autoPushSource.setDataFormat(DataFormat.NONE);
+        autoPushSource.setDataProxyGroup(response.getDataProxyGroup());
+        return autoPushSource;
+    }
+
+    private static AutoPushSource parseAutoPushSource(AutoPushSourceListResponse response) {
+        AutoPushSource autoPushSource = new AutoPushSource();
+        autoPushSource.setSourceName(response.getSourceName());
+        autoPushSource.setState(State.parseByStatus(response.getStatus()));
+        autoPushSource.setDataFormat(DataFormat.NONE);
+        autoPushSource.setDataProxyGroup(response.getDataProxyGroup());
+        return autoPushSource;
+    }
+
     private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo stream) {
         KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
         sourceRequest.setSourceName(kafkaSource.getSourceName());
@@ -280,4 +310,18 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setTimeOffset(fileSource.getTimeOffset());
         return sourceRequest;
     }
+
+    private static AutoPushSourceRequest createAutoPushSourceRequest(AutoPushSource source,
+            InlongStreamInfo streamInfo) {
+        AutoPushSourceRequest sourceRequest = new AutoPushSourceRequest();
+        sourceRequest.setSourceName(source.getSourceName());
+        if (StringUtils.isEmpty(sourceRequest.getSourceName())) {
+            sourceRequest.setSourceName(streamInfo.getName());
+        }
+        sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+        sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+        sourceRequest.setSourceType(source.getSourceType().getType());
+        sourceRequest.setDataProxyGroup(source.getDataProxyGroup());
+        return sourceRequest;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 42bfd9b91..9d6415ca9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -33,10 +33,12 @@ public enum SourceType {
     BINLOG("BINLOG", TaskTypeEnum.BINLOG),
     KAFKA("KAFKA", TaskTypeEnum.KAFKA);
 
+    public static final String SOURCE_AUTO_PUSH = "AUTO_PUSH";
     public static final String SOURCE_FILE = "FILE";
     public static final String SOURCE_SQL = "SQL";
     public static final String SOURCE_BINLOG = "BINLOG";
     public static final String SOURCE_KAFKA = "KAFKA";
+
     public static final String SOURCE_TYPE_IS_EMPTY = "Source type is empty";
     public static final String SOURCE_TYPE_NOT_SAME = "Expected source type is %s, but found %s";
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
new file mode 100644
index 000000000..af60f86d1
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.autopush;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * SDK source info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class AutoPushSourceDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty(value = "DataProxy group name, "
+            + "the name used for local configuration when the user enables local configuration")
+    private String dataProxyGroup;
+
+    public static AutoPushSourceDTO getFromRequest(AutoPushSourceRequest request) {
+        return AutoPushSourceDTO.builder()
+                .dataProxyGroup(request.getDataProxyGroup())
+                .build();
+    }
+
+    public static AutoPushSourceDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, AutoPushSourceDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java
new file mode 100644
index 000000000..e4746c603
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceListResponse.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.autopush;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of DataProxy SDK source paging list")
+public class AutoPushSourceListResponse extends SourceListResponse {
+
+    @ApiModelProperty(value = "DataProxy group name, "
+            + "the name used for local configuration when the user enables local configuration")
+    private String dataProxyGroup;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceRequest.java
new file mode 100644
index 000000000..07b580c13
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.autopush;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of DataProxy SDK source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the binlog source info")
+@JsonTypeDefine(value = SourceType.SOURCE_AUTO_PUSH)
+public class AutoPushSourceRequest extends SourceRequest {
+
+    @ApiModelProperty(value = "DataProxy group name, "
+            + "the name used for local configuration when the user enables local configuration")
+    private String dataProxyGroup;
+
+    public AutoPushSourceRequest() {
+        this.setSourceType(SourceType.AUTO_PUSH.toString());
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceResponse.java
new file mode 100644
index 000000000..43760e2dd
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceResponse.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.autopush;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+
+/**
+ * Response for DataProxy SDK source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Response of DataProxy SDK source")
+public class AutoPushSourceResponse extends SourceResponse {
+
+    public AutoPushSourceResponse() {
+        this.setSourceType(SourceType.AUTO_PUSH.name());
+    }
+
+    @ApiModelProperty(value = "DataProxy group name, "
+            + "the name used for local configuration when the user enables local configuration")
+    private String dataProxyGroup;
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index efc54fa83..e446d7db7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -203,7 +203,6 @@ public class CommonOperateService {
     public DataFlowInfo createDataFlow(InlongGroupInfo groupInfo, SinkResponse sinkResponse) {
         String groupId = sinkResponse.getInlongGroupId();
         String streamId = sinkResponse.getInlongStreamId();
-        // TODO Support all source type, include AUTO_PUSH.
         List<SourceResponse> sourceList = streamSourceService.listSource(groupId, streamId);
         if (CollectionUtils.isEmpty(sourceList)) {
             throw new WorkflowListenerException(String.format("Source not found by groupId=%s and streamId=%s",
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java
new file mode 100644
index 000000000..0726d9f8d
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.autopush;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.function.Supplier;
+
+/**
+ * DataProxy SDK source operation
+ */
+@Service
+public class AutoPushSourceOperation extends AbstractSourceOperation {
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+        AutoPushSourceRequest sourceRequest = (AutoPushSourceRequest) request;
+        CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+        try {
+            AutoPushSourceDTO dto = AutoPushSourceDTO.getFromRequest(sourceRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+    @Override
+    protected String getSourceType() {
+        return SourceType.AUTO_PUSH.getType();
+    }
+
+    @Override
+    protected SourceResponse getResponse() {
+        return new AutoPushSourceResponse();
+    }
+
+    @Override
+    public Boolean accept(SourceType sourceType) {
+        return SourceType.AUTO_PUSH == sourceType;
+    }
+
+    @Override
+    public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
+        T result = target.get();
+        if (entity == null) {
+            return result;
+        }
+        String existType = entity.getSourceType();
+        Preconditions.checkTrue(getSourceType().equals(existType),
+                String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+        AutoPushSourceDTO dto = AutoPushSourceDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, result, true);
+        CommonBeanUtils.copyProperties(dto, result, true);
+        return result;
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index 1c48e479b..e845c3c71 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -57,6 +57,8 @@ public class SerializationUtils {
                 return deserializeForKafka((KafkaSourceResponse) sourceResponse, streamInfo);
             case FILE:
                 return deserializeForFile(streamInfo);
+            case AUTO_PUSH:
+                return deserializeForAutoPush(streamInfo);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported sourceType: %s", sourceType));
         }
@@ -151,4 +153,24 @@ public class SerializationUtils {
                         String.format("Unsupported data type for File source: %s", dataType));
         }
     }
+
+    /**
+     * Get deserialization info for DataProxy SDK source
+     */
+    private static DeserializationInfo deserializeForAutoPush(InlongStreamInfo streamInfo) {
+        String dataType = streamInfo.getDataType();
+        DataTypeEnum typeEnum = DataTypeEnum.forName(dataType);
+        switch (typeEnum) {
+            case CSV:
+                char separator = streamInfo.getDataSeparator().toCharArray()[0];
+                return new CsvDeserializationInfo(separator);
+            case AVRO:
+                return new AvroDeserializationInfo();
+            case JSON:
+                return new JsonDeserializationInfo();
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unsupported data type for DataProxy SDK source: %s", dataType));
+        }
+    }
 }