You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/28 11:00:26 UTC

[incubator-inlong] branch master updated: [INLONG-2771][Manager] Add stream source API in manager client module (#2780)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 59afacf  [INLONG-2771][Manager] Add stream source API in manager client module (#2780)
59afacf is described below

commit 59afacf63df37147fd1e3528ef59941d8f1bf3c2
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Mon Feb 28 19:00:22 2022 +0800

    [INLONG-2771][Manager] Add stream source API in manager client module (#2780)
---
 .../apache/inlong/common/enums/DataTypeEnum.java   |   3 +-
 .../inlong/manager/client/api/DataFormat.java      |  20 ++--
 .../inlong/manager/client/api/DataSeparator.java   |   2 +-
 .../inlong/manager/client/api/InlongClient.java    |   2 +-
 .../inlong/manager/client/api/StreamSink.java      |  13 ++-
 .../inlong/manager/client/api/StreamSource.java    |  16 ++-
 .../api/impl/DefaultInlongStreamBuilder.java       |  59 ++++++++--
 .../manager/client/api/impl/InlongClientImpl.java  |   2 +-
 .../manager/client/api/impl/InlongGroupImpl.java   |  11 +-
 .../manager/client/api/impl/InlongStreamImpl.java  |   9 +-
 .../client/api/inner/InnerInlongManagerClient.java | 101 ++++++++++++++++-
 .../client/api/inner/InnerStreamContext.java       |   5 +-
 .../manager/client/api/{ => sink}/HiveSink.java    |   9 +-
 .../manager/client/api/source/KafkaSource.java     |  64 +++++++++++
 .../MySQLBinlogSource.java}                        |  39 ++++---
 .../client/api/{ => source}/MySQLSource.java       |   6 +-
 .../manager/client/api/util/InlongParser.java      |  12 +-
 .../api/util/InlongStreamSourceTransfer.java       | 122 +++++++++++++++++++++
 .../client/api/util/InlongStreamTransfer.java      |   9 +-
 .../common/pojo/source/binlog/BinlogSourceDTO.java |   3 +
 .../source/binlog/BinlogSourceListResponse.java    |   5 +
 .../pojo/source/kafka/KafkaSourceListResponse.java |   6 +
 22 files changed, 451 insertions(+), 67 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index 7d32aa3..1916a87 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.common.enums;
 
+import java.util.Locale;
 import lombok.Getter;
 
 public enum DataTypeEnum {
@@ -35,7 +36,7 @@ public enum DataTypeEnum {
 
     public static DataTypeEnum forName(String name) {
         for (DataTypeEnum dataType : values()) {
-            if (dataType.getName().equals(name)) {
+            if (dataType.getName().equals(name.toLowerCase(Locale.ROOT))) {
                 return dataType;
             }
         }
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
similarity index 72%
copy from inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
index 7d32aa3..2d9960e 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
@@ -15,30 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.enums;
+package org.apache.inlong.manager.client.api;
 
+import java.util.Locale;
 import lombok.Getter;
 
-public enum DataTypeEnum {
+public enum DataFormat {
     CSV("csv"),
     AVRO("avro"),
-    JSON("json"),
     CANAL("canal"),
-    DEBEZIUM_JSON("debezium_json");
+    NONE("none");
 
     @Getter
     private String name;
 
-    DataTypeEnum(String name) {
+    DataFormat(String name) {
         this.name = name;
     }
 
-    public static DataTypeEnum forName(String name) {
-        for (DataTypeEnum dataType : values()) {
-            if (dataType.getName().equals(name)) {
-                return dataType;
+    public static DataFormat forName(String name) {
+        for (DataFormat dataFormat : values()) {
+            if (dataFormat.getName().equals(name.toLowerCase(Locale.ROOT))) {
+                return dataFormat;
             }
         }
-        throw new IllegalArgumentException(String.format("Unsupport dataType for Inlong:%s", name));
+        throw new IllegalArgumentException(String.format("Unsupport dataformat=%s for Inlong", name));
     }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
index c111931..96b0dfa 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
@@ -39,7 +39,7 @@ public enum DataSeparator {
         return this.asciiCode;
     }
 
-    private DataSeparator(String seperator, int asciiCode) {
+    DataSeparator(String seperator, int asciiCode) {
         this.asciiCode = asciiCode;
         this.seperator = seperator;
     }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index f736ee2..f844701 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -64,7 +64,7 @@ public interface InlongClient {
      * @return the inlong group
      * @throws Exception the exception
      */
-    InlongGroup createGroup(InlongGroupConf groupConf) throws Exception;
+    InlongGroup forGroup(InlongGroupConf groupConf) throws Exception;
 
 
     /**
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
index 224f12c..689ed7c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
@@ -26,11 +26,22 @@ import lombok.Data;
 public abstract class StreamSink {
 
     public enum SinkType {
-        HIVE, ES
+        HIVE, ES, KAFKA;
+
+        public static SinkType forType(String type) {
+            for (SinkType sinkType : values()) {
+                if (sinkType.name().equals(type)) {
+                    return sinkType;
+                }
+            }
+            throw new IllegalArgumentException(String.format("Illegal sink type=%s for Inlong", type));
+        }
     }
 
     public abstract SinkType getSinkType();
 
     public abstract List<StreamField> getStreamFields();
 
+    public abstract DataFormat getDataFormat();
+
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
index 767f9a0..b588e66 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
@@ -25,14 +25,26 @@ import lombok.Data;
 public abstract class StreamSource {
 
     public enum SourceType {
-        FILE, KAFKA, DB, BINLOG
+        FILE, KAFKA, DB, BINLOG;
+
+        public static SourceType forType(String type) {
+            for (SourceType sourceType : values()) {
+                if (sourceType.name().equals(type)) {
+                    return sourceType;
+                }
+            }
+            throw new IllegalArgumentException(
+                    String.format("Unsupport source type=%s for Inlong", type));
+        }
     }
 
     public enum SyncType {
-        FULL,INCREMENT
+        FULL, INCREMENT
     }
 
     public abstract SourceType getSourceType();
 
     public abstract SyncType getSyncType();
+
+    public abstract DataFormat getDataFormat();
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index e211363..bcd1819 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -29,13 +29,17 @@ import org.apache.inlong.manager.client.api.StreamField;
 import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.client.api.StreamSink.SinkType;
 import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.StreamSource.SourceType;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
 import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
 import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
 import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
 import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
@@ -71,7 +75,10 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
 
     @Override
     public InlongStreamBuilder source(StreamSource source) {
-        //todo create SourceRequest
+        inlongStream.setStreamSource(source);
+        SourceRequest sourceRequest = InlongStreamSourceTransfer.createSourceRequest(source,
+                streamContext.getStreamInfo());
+        streamContext.setSourceRequest(sourceRequest);
         return this;
     }
 
@@ -97,8 +104,11 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         InlongStreamInfo streamInfo = streamContext.getStreamInfo();
         String streamIndex = managerClient.createStreamInfo(streamInfo);
         streamInfo.setId(Double.valueOf(streamIndex).intValue());
-        //todo save source
-
+        //Create source and update index
+        SourceRequest sourceRequest = streamContext.getSourceRequest();
+        String sourceIndex = managerClient.createSource(sourceRequest);
+        sourceRequest.setId(Double.valueOf(sourceIndex).intValue());
+        //Create sink and update index
         SinkRequest sinkRequest = streamContext.getSinkRequest();
         String sinkIndex = managerClient.createSink(sinkRequest);
         sinkRequest.setId(Double.valueOf(sinkIndex).intValue());
@@ -112,10 +122,10 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         if (existMsg.getKey()) {
             Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);
             if (updateMsg.getKey()) {
-                //todo init or update source
-
+                SourceRequest sourceRequest = streamContext.getSourceRequest();
+                sourceRequest.setId(initOrUpdateSource(sourceRequest));
                 SinkRequest sinkRequest = streamContext.getSinkRequest();
-                sinkRequest.setId(initOrUpdateStorage(sinkRequest));
+                sinkRequest.setId(initOrUpdateSink(sinkRequest));
             } else {
                 throw new RuntimeException(String.format("Update data stream failed:%s", updateMsg.getValue()));
             }
@@ -125,28 +135,53 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         }
     }
 
-    private int initOrUpdateStorage(SinkRequest sinkRequest) {
+    private int initOrUpdateSource(SourceRequest sourceRequest) {
+        String sourceType = sourceRequest.getSourceType();
+        if (SourceType.KAFKA.name().equals(sourceType) || SourceType.BINLOG.name().equals(sourceType)) {
+            List<SourceListResponse> responses = managerClient.listSources(sourceRequest.getInlongGroupId(),
+                    sourceRequest.getInlongStreamId(), sourceRequest.getSourceType());
+            if (CollectionUtils.isEmpty(responses)) {
+                String sourceIndex = managerClient.createSource(sourceRequest);
+                return Double.valueOf(sourceIndex).intValue();
+            } else {
+                SourceListResponse response = responses.get(0);
+                sourceRequest.setId(response.getId());
+                Pair<Boolean, String> updateMsg = managerClient.updateSource(sourceRequest);
+                if (updateMsg.getKey()) {
+                    return response.getId();
+                } else {
+                    throw new RuntimeException(
+                            String.format("Update source:%s failed with ex:%s", GsonUtil.toJson(sourceRequest),
+                                    updateMsg.getValue()));
+                }
+            }
+        } else {
+            throw new IllegalArgumentException(String.format("Unsupported source type:%s", sourceType));
+        }
+    }
+
+    private int initOrUpdateSink(SinkRequest sinkRequest) {
         String sinkType = sinkRequest.getSinkType();
         if (SinkType.HIVE.name().equals(sinkType)) {
-            List<SinkListResponse> responses = managerClient.listHiveStorage(sinkRequest.getInlongGroupId(),
-                    sinkRequest.getInlongStreamId());
+            List<SinkListResponse> responses = managerClient.listSinks(sinkRequest.getInlongGroupId(),
+                    sinkRequest.getInlongStreamId(), sinkRequest.getSinkType());
             if (CollectionUtils.isEmpty(responses)) {
                 String storageIndex = managerClient.createSink(sinkRequest);
                 return Double.valueOf(storageIndex).intValue();
             } else {
                 SinkListResponse response = responses.get(0);
                 sinkRequest.setId(response.getId());
-                Pair<Boolean, String> updateMsg = managerClient.updateStorage(sinkRequest);
+                Pair<Boolean, String> updateMsg = managerClient.updateSink(sinkRequest);
                 if (updateMsg.getKey()) {
                     return response.getId();
                 } else {
                     throw new RuntimeException(
-                            String.format("Update storage:%s failed with ex:%s", GsonUtil.toJson(sinkRequest),
+                            String.format("Update sink:%s failed with ex:%s", GsonUtil.toJson(sinkRequest),
                                     updateMsg.getValue()));
                 }
             }
         } else {
-            throw new IllegalArgumentException(String.format("Unsupported storage type:%s", sinkType));
+            throw new IllegalArgumentException(String.format("Unsupported sink type:%s", sinkType));
         }
     }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 01d2a66..59f1f62 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -69,7 +69,7 @@ public class InlongClientImpl implements InlongClient {
     }
 
     @Override
-    public InlongGroup createGroup(InlongGroupConf groupConf) throws Exception {
+    public InlongGroup forGroup(InlongGroupConf groupConf) throws Exception {
         return new InlongGroupImpl(groupConf, this);
     }
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index bc82c4c..dba26be 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -35,10 +35,12 @@ import org.apache.inlong.manager.client.api.util.AssertUtil;
 import org.apache.inlong.manager.client.api.util.GsonUtil;
 import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
 import org.apache.inlong.manager.client.api.util.InlongParser;
+import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
@@ -153,9 +155,16 @@ public class InlongGroupImpl implements InlongGroup {
         List<InlongStream> streamList = new ArrayList<>();
         if (CollectionUtils.isNotEmpty(streamResponses)) {
             streamList = streamResponses.stream().map(fullStreamResponse -> {
+                List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId,
+                        fullStreamResponse.getStreamInfo().getInlongStreamId());
                 String streamName = fullStreamResponse.getStreamInfo().getName();
                 InlongStream stream = groupContext.getStream(streamName);
-                return new InlongStreamImpl(fullStreamResponse, stream);
+                InlongStreamImpl inlongStream = new InlongStreamImpl(fullStreamResponse, stream);
+                if (CollectionUtils.isNotEmpty(sourceListResponses)) {
+                    inlongStream.setStreamSource(
+                            InlongStreamSourceTransfer.parseStreamSource(sourceListResponses.get(0)));
+                }
+                return inlongStream;
             }).collect(Collectors.toList());
         }
         return streamList;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index dd85857..8065a2f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -17,9 +17,11 @@
 
 package org.apache.inlong.manager.client.api.impl;
 
-import lombok.AllArgsConstructor;
+import java.util.List;
+import java.util.stream.Collectors;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.client.api.InlongStream;
 import org.apache.inlong.manager.client.api.StreamField;
@@ -32,12 +34,9 @@ import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
-import java.util.List;
-import java.util.stream.Collectors;
-
 @Data
 @EqualsAndHashCode(callSuper = true)
-@AllArgsConstructor
+@NoArgsConstructor
 public class InlongStreamImpl extends InlongStream {
 
     private String name;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index 6667781..81ae88f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -43,6 +43,8 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -337,6 +339,88 @@ public class InnerInlongManagerClient {
         }
     }
 
+    public String createSource(SourceRequest sourceRequest) {
+        String path = HTTP_PATH + "/source/save";
+        final String source = GsonUtil.toJson(sourceRequest);
+        final RequestBody sourceBody = RequestBody.create(MediaType.parse("application/json"), source);
+        final String url = formatUrl(path);
+        Request request = new Request.Builder()
+                .url(url)
+                .method("POST", sourceBody)
+                .build();
+
+        Call call = httpClient.newCall(request);
+        try {
+            Response response = call.execute();
+            assert response.body() != null;
+            String body = response.body().string();
+            AssertUtil.isTrue(response.isSuccessful(), String.format("Inlong request failed: %s", body));
+            org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
+            AssertUtil.isTrue(responseBody.getErrMsg() == null,
+                    String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+            return responseBody.getData().toString();
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Inlong source save failed: %s", e.getMessage()), e);
+        }
+    }
+
+    public List<SourceListResponse> listSources(String groupId, String streamId) {
+        return listSources(groupId, streamId, null);
+    }
+
+    public List<SourceListResponse> listSources(String groupId, String streamId, String sourceType) {
+        final String path = HTTP_PATH + "/source/list";
+        String url = formatUrl(path);
+        url = String.format("%s&inlongGroupId=%s&inlongStreamId=%s", url, groupId, streamId);
+        if (StringUtils.isNotEmpty(sourceType)) {
+            url = String.format("%s&sourceType=%s", url, sourceType);
+        }
+        Request request = new Request.Builder().get()
+                .url(url)
+                .build();
+
+        Call call = httpClient.newCall(request);
+        try {
+            Response response = call.execute();
+            String body = response.body().string();
+            AssertUtil.isTrue(response.isSuccessful(), String.format("Inlong request failed:%s", body));
+            org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
+            AssertUtil.isTrue(responseBody.getErrMsg() == null,
+                    String.format("Inlong request failed:%s", responseBody.getErrMsg()));
+            PageInfo<SourceListResponse> sourceListResponsePageInfo = InlongParser.parseSourceList(
+                    responseBody);
+            return sourceListResponsePageInfo.getList();
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Inlong source list failed with ex:%s", e.getMessage()), e);
+        }
+    }
+
+    public Pair<Boolean, String> updateSource(SourceRequest sourceRequest) {
+        final String path = HTTP_PATH + "/source/update";
+        final String url = formatUrl(path);
+        final String storage = GsonUtil.toJson(sourceRequest);
+        final RequestBody storageBody = RequestBody.create(MediaType.parse("application/json"), storage);
+        Request request = new Request.Builder()
+                .method("POST", storageBody)
+                .url(url)
+                .build();
+
+        Call call = httpClient.newCall(request);
+        try {
+            Response response = call.execute();
+            String body = response.body().string();
+            AssertUtil.isTrue(response.isSuccessful(), String.format("Inlong request failed:%s", body));
+            org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
+            if (responseBody.getData() != null) {
+                return Pair.of(Boolean.valueOf(responseBody.getData().toString()), responseBody.getErrMsg());
+            } else {
+                return Pair.of(false, responseBody.getErrMsg());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Inlong source update failed with ex:%s", e.getMessage()), e);
+        }
+    }
+
     public String createSink(SinkRequest sinkRequest) {
         String path = HTTP_PATH + "/sink/save";
         final String sink = GsonUtil.toJson(sinkRequest);
@@ -362,10 +446,17 @@ public class InnerInlongManagerClient {
         }
     }
 
-    public List<SinkListResponse> listHiveStorage(String groupId, String streamId) {
+    public List<SinkListResponse> listSinks(String groupId, String streamId) {
+        return listSinks(groupId, streamId, null);
+    }
+
+    public List<SinkListResponse> listSinks(String groupId, String streamId, String sinkType) {
         final String path = HTTP_PATH + "/sink/list";
         String url = formatUrl(path);
-        url = String.format("%s&inlongGroupId=%s&inlongStreamId=%s&sinkType=HIVE", url, groupId, streamId);
+        url = String.format("%s&inlongGroupId=%s&inlongStreamId=%s", url, groupId, streamId);
+        if (StringUtils.isNotEmpty(sinkType)) {
+            url = String.format("%s&sinkType=%s", url, sinkType);
+        }
         Request request = new Request.Builder().get()
                 .url(url)
                 .build();
@@ -378,15 +469,15 @@ public class InnerInlongManagerClient {
             org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
             AssertUtil.isTrue(responseBody.getErrMsg() == null,
                     String.format("Inlong request failed:%s", responseBody.getErrMsg()));
-            PageInfo<SinkListResponse> hiveStorageListResponsePageInfo = InlongParser.parseHiveSinkList(
+            PageInfo<SinkListResponse> sinkListResponsePageInfo = InlongParser.parseSinkList(
                     responseBody);
-            return hiveStorageListResponsePageInfo.getList();
+            return sinkListResponsePageInfo.getList();
         } catch (Exception e) {
             throw new RuntimeException(String.format("Inlong storage list failed with ex:%s", e.getMessage()), e);
         }
     }
 
-    public Pair<Boolean, String> updateStorage(SinkRequest sinkRequest) {
+    public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
         final String path = HTTP_PATH + "/sink/update";
         final String url = formatUrl(path);
         final String storage = GsonUtil.toJson(sinkRequest);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
index 7dac32b..c74bb83 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
@@ -21,6 +21,7 @@ import java.util.List;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
@@ -28,10 +29,10 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 @NoArgsConstructor
 public class InnerStreamContext {
 
-    //todo add SourceRequest
-
     private InlongStreamInfo streamInfo;
 
+    private SourceRequest sourceRequest;
+
     private SinkRequest sinkRequest;
 
     public InnerStreamContext(InlongStreamInfo streamInfo) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/HiveSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
similarity index 89%
rename from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/HiveSink.java
rename to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
index debd21f..df1667c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/HiveSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.client.api.sink;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
@@ -27,6 +27,10 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.DataSeparator;
+import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 
 @Data
@@ -93,5 +97,8 @@ public class HiveSink extends StreamSink {
 
     @ApiModelProperty("Other properties if need")
     private Map<String, String> properties;
+
+    @ApiModelProperty("Data format type for stream sink")
+    private DataFormat dataFormat;
 }
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
new file mode 100644
index 0000000..f447f9d
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.source;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel("Base configuration for Kafka collection")
+public class KafkaSource extends StreamSource {
+
+    @ApiModelProperty(value = "DataSource type", required = true)
+    private SourceType sourceType = SourceType.KAFKA;
+
+    @ApiModelProperty("SyncType for Kafka")
+    private SyncType syncType;
+
+    @ApiModelProperty("Data format type for kafka")
+    private DataFormat dataFormat;
+
+    @ApiModelProperty("Kafka topic")
+    private String topic;
+
+    @ApiModelProperty("Kafka consumer group")
+    private String consumerGroup;
+
+    @ApiModelProperty("Kafka servers address, such as: 127.0.0.1:9092")
+    private String bootstrapServers;
+
+    @ApiModelProperty(value = "Limit the amount of data read per second",
+            notes = "Greater than or equal to 0, equal to zero means no limit")
+    private String recordSpeedLimit;
+
+    @ApiModelProperty(value = "Limit the number of bytes read per second",
+            notes = "Greater than or equal to 0, equal to zero means no limit")
+    private String byteSpeedLimit;
+
+    @ApiModelProperty(value = "Topic partition offset",
+            notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
+    private String topicPartitionOffset;
+
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
similarity index 51%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index a93a767..be68273 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -15,45 +15,48 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.client.api.source;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
 
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
-@ApiModel("Base configuration for MySQL collection")
-public class MySQLSource extends StreamSource {
+@ApiModel("Base configuration for MySQL binlog collection")
+public class MySQLBinlogSource extends StreamSource {
 
     @ApiModelProperty(value = "DataSource type", required = true)
-    private SourceType sourceType = SourceType.DB;
+    private SourceType sourceType = SourceType.BINLOG;
 
     @ApiModelProperty("SyncType for MySQL")
     private SyncType syncType;
 
-    @ApiModelProperty("Database name")
-    private String dbName;
+    @ApiModelProperty("Data format type for binlog")
+    private DataFormat dataFormat = DataFormat.NONE;
 
-    @ApiModelProperty("Data table name, required for increment")
-    private String tableName;
+    @ApiModelProperty("Username of the DB server")
+    private String user;
 
-    @ApiModelProperty("Db server username")
-    private String username;
-
-    @ApiModelProperty("Db password")
+    @ApiModelProperty("Password of the DB server")
     private String password;
 
-    @ApiModelProperty("DB Server IP")
-    private String dbServerIp;
+    @ApiModelProperty("Hostname of the DB server")
+    private String hostname;
 
-    @ApiModelProperty("DB Server port")
-    private int port;
+    @ApiModelProperty(value = "List of DBs to be collected, for example: db1.tb1,db2.tb2",
+            notes = "DBs not in this list are excluded. By default, all DBs are monitored")
+    private List<String> dbNames;
 
-    @ApiModelProperty("SQL statement to collect source data, required for full amount")
-    private String dataSql;
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String timeZone = "UTF";
 
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    private String timestampFormatStandard = "SQL";
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
similarity index 88%
rename from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java
rename to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
index a93a767..7395293 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.client.api.source;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
 
 @Data
 @AllArgsConstructor
@@ -56,4 +58,6 @@ public class MySQLSource extends StreamSource {
     @ApiModelProperty("SQL statement to collect source data, required for full amount")
     private String dataSql;
 
+    @ApiModelProperty("Data format type of source")
+    private DataFormat dataFormat;
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 03e65fa..2381c4d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -81,7 +82,16 @@ public class InlongParser {
         return pageInfo;
     }
 
-    public static PageInfo<SinkListResponse> parseHiveSinkList(Response response) {
+    public static PageInfo<SourceListResponse> parseSourceList(Response response) {
+        Object data = response.getData();
+        String pageInfoJson = GsonUtil.toJson(data);
+        PageInfo<SourceListResponse> pageInfo = GsonUtil.fromJson(pageInfoJson,
+                new TypeToken<PageInfo<SourceListResponse>>() {
+                }.getType());
+        return pageInfo;
+    }
+
+    public static PageInfo<SinkListResponse> parseSinkList(Response response) {
         Object data = response.getData();
         String pageInfoJson = GsonUtil.toJson(data);
         PageInfo<SinkListResponse> pageInfo = GsonUtil.fromJson(pageInfoJson,
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
new file mode 100644
index 0000000..273464c
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.util;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import java.util.List;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.StreamSource.SourceType;
+import org.apache.inlong.manager.client.api.StreamSource.SyncType;
+import org.apache.inlong.manager.client.api.source.KafkaSource;
+import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+
+public class InlongStreamSourceTransfer {
+
+    public static SourceRequest createSourceRequest(StreamSource streamSource, InlongStreamInfo streamInfo) {
+        SourceType sourceType = streamSource.getSourceType();
+        switch (sourceType) {
+            case KAFKA:
+                return createKafkaSourceRequest((KafkaSource) streamSource, streamInfo);
+            case BINLOG:
+                return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
+            default:
+                throw new RuntimeException(String.format("Unsupport source=%s for Inlong", sourceType));
+        }
+    }
+
+    public static StreamSource parseStreamSource(SourceListResponse sourceListResponse) {
+        String type = sourceListResponse.getSourceType();
+        SourceType sourceType = SourceType.forType(type);
+        if (sourceType == SourceType.BINLOG) {
+            return parseKafkaSource((KafkaSourceListResponse) sourceListResponse);
+        } else if (sourceType == SourceType.KAFKA) {
+            return parseMySQLBinlogSource((BinlogSourceListResponse) sourceListResponse);
+        } else {
+            throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
+        }
+    }
+
+    private static KafkaSource parseKafkaSource(KafkaSourceListResponse kafkaSourceResponse) {
+        KafkaSource kafkaSource = new KafkaSource();
+        kafkaSource.setConsumerGroup(kafkaSourceResponse.getGroupId());
+        DataFormat dataFormat = DataFormat.forName(kafkaSourceResponse.getSerializationType());
+        kafkaSource.setDataFormat(dataFormat);
+        kafkaSource.setTopic(kafkaSourceResponse.getTopic());
+        kafkaSource.setBootstrapServers(kafkaSourceResponse.getBootstrapServers());
+        kafkaSource.setByteSpeedLimit(kafkaSourceResponse.getByteSpeedLimit());
+        kafkaSource.setTopicPartitionOffset(kafkaSourceResponse.getTopicPartitionOffset());
+        kafkaSource.setRecordSpeedLimit(kafkaSourceResponse.getRecordSpeedLimit());
+        kafkaSource.setSyncType(SyncType.FULL);
+        return kafkaSource;
+    }
+
+    private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceListResponse binlogSourceResponse) {
+        MySQLBinlogSource binlogSource = new MySQLBinlogSource();
+        binlogSource.setHostname(binlogSourceResponse.getHostname());
+        binlogSource.setDataFormat(DataFormat.CANAL);
+        binlogSource.setPassword(binlogSourceResponse.getPassword());
+        binlogSource.setUser(binlogSourceResponse.getUser());
+        binlogSource.setTimeZone(binlogSourceResponse.getTimeZone());
+        binlogSource.setTimestampFormatStandard(binlogSourceResponse.getTimestampFormatStandard());
+        List<String> dbs = Splitter.on(",").splitToList(binlogSourceResponse.getWhitelist());
+        binlogSource.setDbNames(dbs);
+        return binlogSource;
+    }
+
+    private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo streamInfo) {
+        KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
+        sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+        sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+        sourceRequest.setSourceType(kafkaSource.getSourceType().name());
+        sourceRequest.setBootstrapServers(kafkaSource.getBootstrapServers());
+        sourceRequest.setTopic(kafkaSource.getTopic());
+        sourceRequest.setRecordSpeedLimit(kafkaSource.getRecordSpeedLimit());
+        sourceRequest.setByteSpeedLimit(kafkaSource.getByteSpeedLimit());
+        sourceRequest.setTopicPartitionOffset(kafkaSource.getTopicPartitionOffset());
+        sourceRequest.setGroupId(kafkaSource.getConsumerGroup());
+        sourceRequest.setSerializationType(kafkaSource.getDataFormat().getName());
+        return sourceRequest;
+    }
+
+    private static BinlogSourceRequest createBinlogSourceRequest(MySQLBinlogSource binlogSource,
+            InlongStreamInfo streamInfo) {
+        BinlogSourceRequest binlogSourceRequest = new BinlogSourceRequest();
+        binlogSourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+        binlogSourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+        binlogSourceRequest.setSourceType(binlogSource.getSourceType().name());
+        binlogSourceRequest.setPassword(binlogSource.getPassword());
+        binlogSourceRequest.setUser(binlogSource.getUser());
+        binlogSourceRequest.setHostname(binlogSource.getHostname());
+        String dbNames = Joiner.on(",").join(binlogSource.getDbNames());
+        binlogSourceRequest.setWhitelist(dbNames);
+        binlogSourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
+        binlogSourceRequest.setTimeZone(binlogSource.getTimeZone());
+        binlogSourceRequest.setSnapshotMode("initial");
+        binlogSourceRequest.setIntervalMs("500");
+        return binlogSourceRequest;
+    }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index db702cb..e3f4234 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.api.util;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.client.api.HiveSink;
+import org.apache.inlong.manager.client.api.sink.HiveSink;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
 import org.apache.inlong.manager.client.api.StreamField;
 import org.apache.inlong.manager.client.api.StreamField.FieldType;
@@ -80,11 +80,12 @@ public class InlongStreamTransfer {
     }
 
     public static StreamSink parseStreamSink(SinkResponse sinkResponse, StreamSink streamSink) {
-        String sinkType = sinkResponse.getSinkType();
-        if ("HIVE".equals(sinkType)) {
+        String type = sinkResponse.getSinkType();
+        SinkType sinkType = SinkType.forType(type);
+        if (sinkType == SinkType.HIVE) {
             return parseHiveSink(sinkResponse, (HiveSink) streamSink);
         } else {
-            throw new IllegalArgumentException(String.format("Unsupport storage type : %s for Inlong", sinkType));
+            throw new IllegalArgumentException(String.format("Unsupport sink type : %s for Inlong", sinkType));
         }
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
index a7b54c1..e639869 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
@@ -81,6 +81,9 @@ public class BinlogSourceDTO {
     @ApiModelProperty("The file path to store history info")
     private String storeHistoryFilename;
 
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    private String timestampFormatStandard = "SQL";
+
     /**
      * Get the dto instance from the request
      */
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index dbee39d..5f66e42 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -37,6 +37,9 @@ public class BinlogSourceListResponse extends SourceListResponse {
     @ApiModelProperty("Hostname of the DB server")
     private String hostname;
 
+    @ApiModelProperty("Password of the DB server")
+    private String password;
+
     @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions")
     private String whitelist;
 
@@ -52,5 +55,7 @@ public class BinlogSourceListResponse extends SourceListResponse {
     @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
     private String snapshotMode;
 
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    private String timestampFormatStandard = "SQL";
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index 96b8e4d..6c1c235 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -46,4 +46,10 @@ public class KafkaSourceListResponse extends SourceListResponse {
     @ApiModelProperty("Limit the number of bytes read per second")
     private String byteSpeedLimit;
 
+    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+    private String serializationType = "none";
+
+    @ApiModelProperty(value = "Topic partition offset",
+            notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
+    private String topicPartitionOffset;
 }